const axios = require("axios"); const env = require("../../config/env"); const { apiLogRepository } = require("../../db/adapter"); const { getCachedClientCredentialsToken, AUTH_SCOPES } = require("../auth/auth.service"); const { normalizeUberError, isRetryableUberError } = require("../common/errors/uberError"); const { withExponentialBackoffRetry } = require("../common/http/retry"); const reportingClient = axios.create({ baseURL: env.UBER_API_BASE_URL, timeout: 45000 }); function buildAuthorizationHeader(tokenType, accessToken) { return `${tokenType || "Bearer"} ${accessToken}`; } function parseCsvLine(line) { const output = []; let current = ""; let inQuotes = false; for (let i = 0; i < line.length; i += 1) { const char = line[i]; const next = line[i + 1]; if (char === '"') { if (inQuotes && next === '"') { current += '"'; i += 1; } else { inQuotes = !inQuotes; } continue; } if (char === "," && !inQuotes) { output.push(current); current = ""; continue; } current += char; } output.push(current); return output; } function parseCsvByHeader(csvText, requiredHeaders = []) { const lines = String(csvText || "") .replace(/\r\n/g, "\n") .replace(/\r/g, "\n") .split("\n") .filter((line) => line.trim().length > 0); if (lines.length === 0) { return { headers: [], rows: [], missingRequiredHeaders: requiredHeaders }; } const headers = parseCsvLine(lines[0]).map((value) => value.trim()); const headerSet = new Set(headers); const missingRequiredHeaders = requiredHeaders.filter((header) => !headerSet.has(header)); const rows = lines.slice(1).map((line) => { const values = parseCsvLine(line); const row = {}; headers.forEach((header, index) => { row[header] = values[index] ?? null; }); return row; }); return { headers, rows, missingRequiredHeaders }; } async function fetchReport({ method = "GET", upstreamPath, query, body, parseCsv = true, requiredHeaders = [], wrapperRoute = "/api/v1/uber/reporting/fetch" }) { const token = await getCachedClientCredentialsToken({ scope: AUTH_SCOPES.REPORT }); const requestMethod = String(method || "GET").toUpperCase(); try { const response = await withExponentialBackoffRetry({ fn: async () => reportingClient.request({ method: requestMethod, url: upstreamPath, params: query, data: body, responseType: "text", headers: { Authorization: buildAuthorizationHeader(token.token_type, token.access_token), "Content-Type": "application/json" } }), maxAttempts: 4, baseDelayMs: 400, shouldRetry: (error) => isRetryableUberError(error) }); const textData = typeof response.data === "string" ? response.data : JSON.stringify(response.data || {}); const parsed = parseCsv ? parseCsvByHeader(textData, requiredHeaders) : null; const payload = { rawCsv: textData, parsed }; apiLogRepository.insert({ method: requestMethod, wrapperRoute, uberPath: upstreamPath, responseStatus: response.status, requestBody: body, responseBody: parseCsv ? { headers: parsed?.headers, rowCount: parsed?.rows?.length || 0 } : {} }); return payload; } catch (error) { const normalized = normalizeUberError(error); apiLogRepository.insert({ method: requestMethod, wrapperRoute, uberPath: upstreamPath, responseStatus: normalized.status, requestBody: body, responseBody: { code: normalized.code, message: normalized.message, transient: normalized.transient, details: typeof normalized.details === "string" ? { raw: normalized.details } : normalized.details } }); throw normalized; } } module.exports = { fetchReport, parseCsvByHeader };