diff --git a/src/config/env.js b/src/config/env.js index 56e2f97..25d327e 100644 --- a/src/config/env.js +++ b/src/config/env.js @@ -15,7 +15,13 @@ const envSchema = z.object({ WEBHOOK_BASIC_AUTH_USERNAME: z.string().optional(), WEBHOOK_BASIC_AUTH_PASSWORD: z.string().optional(), SQLITE_PATH: z.string().default("./data/uber_wrapper.db"), - WRAPPER_API_KEY: z.string().optional() + WRAPPER_API_KEY: z.string().optional(), + ASYNC_WORKERS_ENABLED: z + .preprocess((value) => String(value ?? "true").toLowerCase(), z.enum(["true", "false"])) + .transform((value) => value === "true"), + ASYNC_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(1000), + ASYNC_WORKER_BATCH_SIZE: z.coerce.number().int().positive().default(5), + REPORT_DOWNLOAD_DIR: z.string().default("./data/reports") }); const parsed = envSchema.safeParse(process.env); @@ -27,5 +33,6 @@ if (!parsed.success) { const env = parsed.data; env.SQLITE_PATH = path.resolve(process.cwd(), env.SQLITE_PATH); +env.REPORT_DOWNLOAD_DIR = path.resolve(process.cwd(), env.REPORT_DOWNLOAD_DIR); module.exports = env; diff --git a/src/db/adapter.js b/src/db/adapter.js index 9cb4c28..c9dd21b 100644 --- a/src/db/adapter.js +++ b/src/db/adapter.js @@ -12,5 +12,7 @@ module.exports = { apiLogRepository: repositories.apiLogRepository, appTokenRepository: repositories.appTokenRepository, tokenRequestLogRepository: repositories.tokenRequestLogRepository, - reportJobRepository: repositories.reportJobRepository + reportJobRepository: repositories.reportJobRepository, + asyncJobRepository: repositories.asyncJobRepository, + reportSectionRepository: repositories.reportSectionRepository }; diff --git a/src/db/repositories.js b/src/db/repositories.js index eae8bb3..cf6a28f 100644 --- a/src/db/repositories.js +++ b/src/db/repositories.js @@ -145,6 +145,10 @@ const uberConnectionRepository = { }; const webhookRepository = { + findById(id) { + return db.prepare("SELECT * FROM webhook_events WHERE id = ? LIMIT 1").get(id); + }, + findByDedupeKey(dedupeKey) { if (!dedupeKey) { return null; @@ -191,6 +195,31 @@ const webhookRepository = { `); stmt.run(row); return row; + }, + + markProcessed(id) { + const timestamp = nowIso(); + db.prepare( + ` + UPDATE webhook_events + SET processing_status = 'processed', + processed_at = ?, + last_error = NULL + WHERE id = ? + ` + ).run(timestamp, id); + }, + + markFailed(id, errorMessage) { + db.prepare( + ` + UPDATE webhook_events + SET processing_status = 'failed', + processed_at = ?, + last_error = ? + WHERE id = ? + ` + ).run(nowIso(), String(errorMessage || "Unknown webhook processing error"), id); } }; @@ -427,6 +456,15 @@ const appTokenRepository = { ` ) .get(provider, grantType, scope); + }, + + deleteByProviderGrantScope({ provider, grantType, scope }) { + db.prepare( + ` + DELETE FROM app_tokens + WHERE provider = ? AND grant_type = ? AND scope = ? + ` + ).run(provider, grantType, scope); } }; @@ -595,6 +633,211 @@ const reportJobRepository = { } }; +const asyncJobRepository = { + enqueue({ + jobType, + payload, + maxAttempts = 7, + nextRunAt = nowIso() + }) { + const timestamp = nowIso(); + const row = { + id: uuidv4(), + job_type: jobType, + payload_json: JSON.stringify(payload || {}), + status: "queued", + attempt_count: 0, + max_attempts: maxAttempts, + next_run_at: nextRunAt, + last_error: null, + lock_token: null, + locked_at: null, + created_at: timestamp, + updated_at: timestamp + }; + + db.prepare( + ` + INSERT INTO async_jobs ( + id, job_type, payload_json, status, attempt_count, max_attempts, next_run_at, + last_error, lock_token, locked_at, created_at, updated_at + ) + VALUES ( + @id, @job_type, @payload_json, @status, @attempt_count, @max_attempts, @next_run_at, + @last_error, @lock_token, @locked_at, @created_at, @updated_at + ) + ` + ).run(row); + + return row; + }, + + claimNext({ workerId, lockTimeoutSeconds = 120 }) { + const timestamp = nowIso(); + const lockCutoffIso = new Date(Date.now() - lockTimeoutSeconds * 1000).toISOString(); + const selectStmt = db.prepare( + ` + SELECT * + FROM async_jobs + WHERE (status = 'queued' OR status = 'retry') + AND next_run_at <= ? + AND (locked_at IS NULL OR locked_at < ?) + ORDER BY created_at ASC + LIMIT 1 + ` + ); + const updateStmt = db.prepare( + ` + UPDATE async_jobs + SET status = 'processing', + lock_token = ?, + locked_at = ?, + attempt_count = attempt_count + 1, + updated_at = ? + WHERE id = ? + ` + ); + + const tx = db.transaction(() => { + const row = selectStmt.get(timestamp, lockCutoffIso); + if (!row) { + return null; + } + updateStmt.run(workerId, timestamp, timestamp, row.id); + return db.prepare("SELECT * FROM async_jobs WHERE id = ? LIMIT 1").get(row.id); + }); + + return tx(); + }, + + markCompleted(id) { + const timestamp = nowIso(); + db.prepare( + ` + UPDATE async_jobs + SET status = 'completed', + lock_token = NULL, + locked_at = NULL, + updated_at = ?, + last_error = NULL + WHERE id = ? + ` + ).run(timestamp, id); + }, + + markFailedOrRetry({ id, attemptCount, maxAttempts, errorMessage }) { + const timestamp = nowIso(); + if (attemptCount >= maxAttempts) { + db.prepare( + ` + UPDATE async_jobs + SET status = 'failed', + lock_token = NULL, + locked_at = NULL, + updated_at = ?, + last_error = ? + WHERE id = ? + ` + ).run(timestamp, String(errorMessage || "Unknown async worker error"), id); + return "failed"; + } + + const backoffSeconds = Math.min(300, Math.max(1, 2 ** Math.max(0, attemptCount - 1))); + const nextRunAt = new Date(Date.now() + backoffSeconds * 1000).toISOString(); + db.prepare( + ` + UPDATE async_jobs + SET status = 'retry', + next_run_at = ?, + lock_token = NULL, + locked_at = NULL, + updated_at = ?, + last_error = ? + WHERE id = ? + ` + ).run(nextRunAt, timestamp, String(errorMessage || "Unknown async worker error"), id); + return "retry"; + } +}; + +const reportSectionRepository = { + upsert({ + workflowId, + sectionIndex, + sectionName, + downloadUrl, + filePath, + contentType, + rowCount, + headers, + sha256, + status, + errorMessage + }) { + const existing = db + .prepare( + ` + SELECT * FROM report_sections + WHERE workflow_id = ? AND section_index = ? + LIMIT 1 + ` + ) + .get(workflowId, sectionIndex); + const timestamp = nowIso(); + const row = { + id: existing?.id || uuidv4(), + workflow_id: workflowId, + section_index: sectionIndex, + section_name: sectionName || null, + download_url: downloadUrl || null, + file_path: filePath || existing?.file_path || null, + content_type: contentType || null, + row_count: Number.isFinite(rowCount) ? rowCount : null, + headers_json: headers ? JSON.stringify(headers) : existing?.headers_json || null, + sha256: sha256 || null, + status: status || existing?.status || "queued", + error_message: errorMessage || null, + created_at: existing?.created_at || timestamp, + updated_at: timestamp + }; + + db.prepare( + ` + INSERT INTO report_sections ( + id, workflow_id, section_index, section_name, download_url, file_path, content_type, + row_count, headers_json, sha256, status, error_message, created_at, updated_at + ) + VALUES ( + @id, @workflow_id, @section_index, @section_name, @download_url, @file_path, @content_type, + @row_count, @headers_json, @sha256, @status, @error_message, @created_at, @updated_at + ) + ON CONFLICT(workflow_id, section_index) DO UPDATE SET + section_name = excluded.section_name, + download_url = excluded.download_url, + file_path = excluded.file_path, + content_type = excluded.content_type, + row_count = excluded.row_count, + headers_json = excluded.headers_json, + sha256 = excluded.sha256, + status = excluded.status, + error_message = excluded.error_message, + updated_at = excluded.updated_at + ` + ).run(row); + + return db + .prepare( + ` + SELECT * + FROM report_sections + WHERE workflow_id = ? AND section_index = ? + LIMIT 1 + ` + ) + .get(workflowId, sectionIndex); + } +}; + module.exports = { merchantRepository, uberConnectionRepository, @@ -602,5 +845,7 @@ module.exports = { apiLogRepository, appTokenRepository, tokenRequestLogRepository, - reportJobRepository + reportJobRepository, + asyncJobRepository, + reportSectionRepository }; diff --git a/src/db/sqlite.js b/src/db/sqlite.js index 3787557..c343946 100644 --- a/src/db/sqlite.js +++ b/src/db/sqlite.js @@ -58,6 +58,7 @@ function initSchema() { headers_json TEXT, received_at TEXT NOT NULL, processed_at TEXT, + last_error TEXT, processing_status TEXT NOT NULL DEFAULT 'received', FOREIGN KEY(merchant_id) REFERENCES merchants(id) ); @@ -117,6 +118,44 @@ function initSchema() { updated_at TEXT NOT NULL, FOREIGN KEY(merchant_id) REFERENCES merchants(id) ); + + CREATE TABLE IF NOT EXISTS async_jobs ( + id TEXT PRIMARY KEY, + job_type TEXT NOT NULL, + payload_json TEXT NOT NULL, + status TEXT NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 7, + next_run_at TEXT NOT NULL, + last_error TEXT, + lock_token TEXT, + locked_at TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_async_jobs_run_window + ON async_jobs(status, next_run_at, created_at); + + CREATE TABLE IF NOT EXISTS report_sections ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + section_index INTEGER NOT NULL, + section_name TEXT, + download_url TEXT, + file_path TEXT, + content_type TEXT, + row_count INTEGER, + headers_json TEXT, + sha256 TEXT, + status TEXT NOT NULL, + error_message TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE UNIQUE INDEX IF NOT EXISTS idx_report_sections_workflow_section + ON report_sections(workflow_id, section_index); `); if (!tableHasColumn("webhook_events", "resource_id")) { @@ -163,6 +202,17 @@ function initSchema() { if (!tableHasColumn("report_jobs", "webhook_payload_json")) { db.exec("ALTER TABLE report_jobs ADD COLUMN webhook_payload_json TEXT"); } + + if (!tableHasColumn("webhook_events", "processed_at")) { + db.exec("ALTER TABLE webhook_events ADD COLUMN processed_at TEXT"); + } + if (!tableHasColumn("webhook_events", "processing_status")) { + db.exec("ALTER TABLE webhook_events ADD COLUMN processing_status TEXT"); + db.exec("UPDATE webhook_events SET processing_status = 'received' WHERE processing_status IS NULL"); + } + if (!tableHasColumn("webhook_events", "last_error")) { + db.exec("ALTER TABLE webhook_events ADD COLUMN last_error TEXT"); + } } module.exports = { diff --git a/src/modules/auth/auth.service.js b/src/modules/auth/auth.service.js index a340fa6..b8823d2 100644 --- a/src/modules/auth/auth.service.js +++ b/src/modules/auth/auth.service.js @@ -121,21 +121,23 @@ function parseExpiresAt(expiresInSeconds) { return new Date(Date.now() + seconds * 1000).toISOString(); } -async function getCachedClientCredentialsToken({ scope }) { +async function getCachedClientCredentialsToken({ scope, forceRefresh = false } = {}) { const normalizedScope = (scope || AUTH_SCOPES.ORDER).trim().split(/\s+/).sort().join(" "); - const cached = appTokenRepository.findValid({ - provider: "uber", - grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS, - scope: normalizedScope - }); - if (cached) { - return { - access_token: cached.access_token, - token_type: cached.token_type, - scope: cached.scope, - expires_in: Math.max(0, Math.floor((new Date(cached.expires_at).getTime() - Date.now()) / 1000)), - source: "cache" - }; + if (!forceRefresh) { + const cached = appTokenRepository.findValid({ + provider: "uber", + grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS, + scope: normalizedScope + }); + if (cached) { + return { + access_token: cached.access_token, + token_type: cached.token_type, + scope: cached.scope, + expires_in: Math.max(0, Math.floor((new Date(cached.expires_at).getTime() - Date.now()) / 1000)), + source: "cache" + }; + } } const requestsLastHour = tokenRequestLogRepository.countInLastHour({ diff --git a/src/modules/proxy/proxy.service.js b/src/modules/proxy/proxy.service.js index 89af300..1df37be 100644 --- a/src/modules/proxy/proxy.service.js +++ b/src/modules/proxy/proxy.service.js @@ -3,8 +3,13 @@ const zlib = require("zlib"); const { promisify } = require("util"); const env = require("../../config/env"); const uberEndpoints = require("../../config/uberEndpoints"); -const { uberConnectionRepository, apiLogRepository } = require("../../db/adapter"); -const { getCachedClientCredentialsToken, AUTH_SCOPES } = require("../auth/auth.service"); +const { uberConnectionRepository, apiLogRepository, appTokenRepository } = require("../../db/adapter"); +const { + getCachedClientCredentialsToken, + refreshToken, + AUTH_SCOPES, + AUTH_GRANT_TYPES +} = require("../auth/auth.service"); const { normalizeUberError, isRetryableUberError } = require("../common/errors/uberError"); const { withExponentialBackoffRetry } = require("../common/http/retry"); @@ -59,6 +64,43 @@ async function resolveAuthToken({ authMode = "app", merchantId, scopes }) { }; } +function isUnauthorizedError(error) { + return Number(error?.response?.status || 0) === 401; +} + +async function refreshMerchantConnectionToken(merchantId) { + const connection = uberConnectionRepository.findByMerchantId(merchantId); + if (!connection) { + const error = new Error("Uber merchant connection not found for token refresh."); + error.status = 404; + throw error; + } + if (!connection.refresh_token) { + const error = new Error("Refresh token is not available for merchant OAuth connection."); + error.status = 401; + throw error; + } + + const tokenData = await refreshToken(connection.refresh_token); + const expiresAt = tokenData.expires_in + ? new Date(Date.now() + Number(tokenData.expires_in) * 1000).toISOString() + : connection.expires_at; + + const updated = uberConnectionRepository.upsertByMerchantId(merchantId, { + accessToken: tokenData.access_token, + refreshToken: tokenData.refresh_token || connection.refresh_token, + tokenType: tokenData.token_type || connection.token_type || "Bearer", + scope: tokenData.scope || connection.scope || null, + expiresAt, + status: "active" + }); + + return { + tokenType: updated.token_type || "Bearer", + accessToken: updated.access_token + }; +} + async function callUberApi({ merchantId, method, @@ -71,10 +113,11 @@ async function callUberApi({ scopes, headers }) { - const resolvedAuth = await resolveAuthToken({ authMode, merchantId, scopes }); - - try { - const response = await withExponentialBackoffRetry({ + const scopeKey = scopes || AUTH_SCOPES.ORDER; + const normalizedScope = scopeKey.trim().split(/\s+/).sort().join(" "); + let resolvedAuth = await resolveAuthToken({ authMode, merchantId, scopes: normalizedScope }); + const buildRequest = () => + withExponentialBackoffRetry({ fn: async () => uberApiClient.request({ method, @@ -91,6 +134,35 @@ async function callUberApi({ baseDelayMs: 300, shouldRetry: (error) => isRetryableUberError(error) }); + try { + let response; + try { + response = await buildRequest(); + } catch (error) { + if (!isUnauthorizedError(error)) { + throw error; + } + + if (authMode === "merchant" && merchantId) { + resolvedAuth = await refreshMerchantConnectionToken(merchantId); + } else { + appTokenRepository.deleteByProviderGrantScope({ + provider: "uber", + grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS, + scope: normalizedScope + }); + const freshToken = await getCachedClientCredentialsToken({ + scope: normalizedScope, + forceRefresh: true + }); + resolvedAuth = { + tokenType: freshToken.token_type || "Bearer", + accessToken: freshToken.access_token + }; + } + + response = await buildRequest(); + } apiLogRepository.insert({ merchantId, diff --git a/src/modules/reporting/reporting.service.js b/src/modules/reporting/reporting.service.js index 1b42836..aef7ce2 100644 --- a/src/modules/reporting/reporting.service.js +++ b/src/modules/reporting/reporting.service.js @@ -1,7 +1,11 @@ const axios = require("axios"); const env = require("../../config/env"); -const { apiLogRepository, reportJobRepository } = require("../../db/adapter"); -const { getCachedClientCredentialsToken, AUTH_SCOPES } = require("../auth/auth.service"); +const { apiLogRepository, reportJobRepository, appTokenRepository } = require("../../db/adapter"); +const { + getCachedClientCredentialsToken, + AUTH_SCOPES, + AUTH_GRANT_TYPES +} = require("../auth/auth.service"); const { normalizeUberError, isRetryableUberError } = require("../common/errors/uberError"); const { withExponentialBackoffRetry } = require("../common/http/retry"); @@ -14,6 +18,43 @@ function buildAuthorizationHeader(tokenType, accessToken) { return `${tokenType || "Bearer"} ${accessToken}`; } +function isUnauthorizedError(error) { + return Number(error?.response?.status || 0) === 401; +} + +async function executeWithReportTokenRetry({ requestFactory }) { + const scope = AUTH_SCOPES.REPORT; + let token = await getCachedClientCredentialsToken({ scope }); + + const run = async () => + withExponentialBackoffRetry({ + fn: async () => requestFactory(token), + maxAttempts: 4, + baseDelayMs: 400, + shouldRetry: (error) => isRetryableUberError(error) + }); + + try { + return await run(); + } catch (error) { + if (!isUnauthorizedError(error)) { + throw error; + } + + appTokenRepository.deleteByProviderGrantScope({ + provider: "uber", + grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS, + scope + }); + token = await getCachedClientCredentialsToken({ + scope, + forceRefresh: true + }); + + return run(); + } +} + function parseCsvLine(line) { const output = []; let current = ""; @@ -89,14 +130,11 @@ async function fetchReport({ requiredHeaders = [], wrapperRoute = "/api/v1/uber/reporting/fetch" }) { - const token = await getCachedClientCredentialsToken({ - scope: AUTH_SCOPES.REPORT - }); const requestMethod = String(method || "GET").toUpperCase(); try { - const response = await withExponentialBackoffRetry({ - fn: async () => + const response = await executeWithReportTokenRetry({ + requestFactory: async (token) => reportingClient.request({ method: requestMethod, url: upstreamPath, @@ -107,10 +145,7 @@ async function fetchReport({ Authorization: buildAuthorizationHeader(token.token_type, token.access_token), "Content-Type": "application/json" } - }), - maxAttempts: 4, - baseDelayMs: 400, - shouldRetry: (error) => isRetryableUberError(error) + }) }); const textData = @@ -163,10 +198,6 @@ module.exports = { startDate, endDate }) { - const token = await getCachedClientCredentialsToken({ - scope: AUTH_SCOPES.REPORT - }); - const body = { report_type: reportType, store_uuids: storeUuids || [], @@ -176,8 +207,8 @@ module.exports = { }; try { - const response = await withExponentialBackoffRetry({ - fn: async () => + const response = await executeWithReportTokenRetry({ + requestFactory: async (token) => reportingClient.request({ method: "POST", url: "/v1/eats/report", @@ -186,10 +217,7 @@ module.exports = { Authorization: buildAuthorizationHeader(token.token_type, token.access_token), "Content-Type": "application/json" } - }), - maxAttempts: 4, - baseDelayMs: 400, - shouldRetry: (error) => isRetryableUberError(error) + }) }); const workflowId = response?.data?.workflow_id; diff --git a/src/modules/webhooks/webhookProcessor.js b/src/modules/webhooks/webhookProcessor.js new file mode 100644 index 0000000..90f0cd1 --- /dev/null +++ b/src/modules/webhooks/webhookProcessor.js @@ -0,0 +1,134 @@ +const { + webhookRepository, + uberConnectionRepository, + reportJobRepository, + asyncJobRepository +} = require("../../db/adapter"); + +function parseJson(rawValue) { + if (!rawValue) { + return {}; + } + if (typeof rawValue === "object") { + return rawValue; + } + try { + return JSON.parse(rawValue); + } catch (error) { + return {}; + } +} + +function extractStoreId(payload) { + return payload?.user_id || payload?.store_id || payload?.resource_id || payload?.store?.id || null; +} + +function pickSectionUrl(section = {}) { + return ( + section.url || + section.download_url || + section.signed_url || + section.resource_url || + section.href || + null + ); +} + +function enqueueReportSectionJobs(workflowId, sections) { + (sections || []).forEach((section, index) => { + const url = pickSectionUrl(section); + if (!url) { + return; + } + asyncJobRepository.enqueue({ + jobType: "report_section_download", + payload: { + workflowId, + sectionIndex: index, + section + }, + maxAttempts: 7 + }); + }); +} + +function applyProvisioningStateFromWebhook(eventType, payload) { + if (eventType !== "store.provisioned" && eventType !== "store.deprovisioned") { + return; + } + const storeId = extractStoreId(payload); + if (!storeId) { + return; + } + + const connection = uberConnectionRepository.findByUberStoreId(String(storeId)); + if (!connection) { + return; + } + + const nextStatus = eventType === "store.provisioned" ? "active" : "deprovisioned"; + uberConnectionRepository.setStatusByMerchantId(connection.merchant_id, nextStatus); +} + +function applyMenuRefreshRequestFromWebhook(eventType, payload, headers) { + if (eventType !== "store.menu_refresh_request") { + return; + } + + const storeId = extractStoreId(payload); + if (!storeId) { + return; + } + + uberConnectionRepository.markMenuRefreshRequestedByStoreId(String(storeId), { + requestedAt: new Date().toISOString(), + webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null, + environment: headers?.["x-environment"] || null + }); +} + +function applyReportSuccessFromWebhook(eventType, payload) { + if (eventType !== "eats.report.success") { + return; + } + + const workflowId = payload?.job_id; + if (!workflowId) { + return; + } + + const sections = payload?.report_metadata?.sections || []; + reportJobRepository.markSuccessFromWebhook({ + workflowId: String(workflowId), + eventId: payload?.event_id || null, + webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null, + reportType: payload?.report_type || null, + sections, + payload + }); + enqueueReportSectionJobs(String(workflowId), sections); +} + +async function processWebhookEventById(eventId) { + const event = webhookRepository.findById(eventId); + if (!event) { + return; + } + if (event.processing_status === "processed") { + return; + } + + const payload = parseJson(event.payload_json); + const headers = parseJson(event.headers_json); + const eventType = event.event_type || payload?.event_type || payload?.type || "unknown"; + + applyProvisioningStateFromWebhook(eventType, payload); + applyMenuRefreshRequestFromWebhook(eventType, payload, headers); + applyReportSuccessFromWebhook(eventType, payload); + webhookRepository.markProcessed(eventId); +} + +module.exports = { + processWebhookEventById, + pickSectionUrl +}; diff --git a/src/modules/webhooks/webhooks.controller.js b/src/modules/webhooks/webhooks.controller.js index e38621c..ef3be13 100644 --- a/src/modules/webhooks/webhooks.controller.js +++ b/src/modules/webhooks/webhooks.controller.js @@ -2,8 +2,7 @@ const crypto = require("crypto"); const env = require("../../config/env"); const { webhookRepository, - uberConnectionRepository, - reportJobRepository + asyncJobRepository } = require("../../db/adapter"); function getSignatureFromHeaders(headers) { @@ -69,65 +68,6 @@ function buildDedupeKey(signature, req) { return crypto.createHash("sha256").update(basis).digest("hex"); } -function extractStoreId(payload) { - return payload?.user_id || payload?.store_id || payload?.resource_id || payload?.store?.id || null; -} - -function applyProvisioningStateFromWebhook(eventType, payload) { - if (eventType !== "store.provisioned" && eventType !== "store.deprovisioned") { - return; - } - const storeId = extractStoreId(payload); - if (!storeId) { - return; - } - - const connection = uberConnectionRepository.findByUberStoreId(String(storeId)); - if (!connection) { - return; - } - - const nextStatus = eventType === "store.provisioned" ? "active" : "deprovisioned"; - uberConnectionRepository.setStatusByMerchantId(connection.merchant_id, nextStatus); -} - -function applyMenuRefreshRequestFromWebhook(eventType, payload, headers) { - if (eventType !== "store.menu_refresh_request") { - return; - } - - const storeId = extractStoreId(payload); - if (!storeId) { - return; - } - - uberConnectionRepository.markMenuRefreshRequestedByStoreId(String(storeId), { - requestedAt: new Date().toISOString(), - webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null, - environment: headers?.["x-environment"] || null - }); -} - -function applyReportSuccessFromWebhook(eventType, payload) { - if (eventType !== "eats.report.success") { - return; - } - - const workflowId = payload?.job_id; - if (!workflowId) { - return; - } - - reportJobRepository.markSuccessFromWebhook({ - workflowId: String(workflowId), - eventId: payload?.event_id || null, - webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null, - reportType: payload?.report_type || null, - sections: payload?.report_metadata?.sections || [], - payload - }); -} - async function handleUberWebhook(req, res) { if (!verifyBasicAuthIfConfigured(req)) { return res.status(401).json({ @@ -161,7 +101,7 @@ async function handleUberWebhook(req, res) { return res.status(200).end(); } - webhookRepository.insert({ + const inserted = webhookRepository.insert({ provider: "uber", merchantId, eventType, @@ -173,9 +113,13 @@ async function handleUberWebhook(req, res) { headersJson: req.headers }); - applyProvisioningStateFromWebhook(eventType, req.body || {}); - applyMenuRefreshRequestFromWebhook(eventType, req.body || {}, req.headers || {}); - applyReportSuccessFromWebhook(eventType, req.body || {}); + asyncJobRepository.enqueue({ + jobType: "uber_webhook_event", + payload: { + eventId: inserted.id + }, + maxAttempts: 7 + }); return res.status(200).end(); } diff --git a/src/server.js b/src/server.js index 4f21b9a..1c94f44 100644 --- a/src/server.js +++ b/src/server.js @@ -1,11 +1,12 @@ const app = require("./app"); const env = require("./config/env"); const { initSchema } = require("./db/sqlite"); +const { startAsyncWorkers } = require("./workers/asyncWorkers"); initSchema(); +startAsyncWorkers(); app.listen(env.PORT, () => { // eslint-disable-next-line no-console console.log(`Uber Wrapper listening on http://localhost:${env.PORT}`); }); - diff --git a/src/workers/asyncWorkers.js b/src/workers/asyncWorkers.js new file mode 100644 index 0000000..cc537b5 --- /dev/null +++ b/src/workers/asyncWorkers.js @@ -0,0 +1,198 @@ +const fs = require("fs"); +const path = require("path"); +const crypto = require("crypto"); +const axios = require("axios"); +const { v4: uuidv4 } = require("uuid"); +const env = require("../config/env"); +const { asyncJobRepository, reportSectionRepository, webhookRepository } = require("../db/adapter"); +const { processWebhookEventById, pickSectionUrl } = require("../modules/webhooks/webhookProcessor"); +const { parseCsvByHeader } = require("../modules/reporting/reporting.service"); + +let workerTimer = null; +let isProcessing = false; + +function ensureDir(dirPath) { + if (!fs.existsSync(dirPath)) { + fs.mkdirSync(dirPath, { recursive: true }); + } +} + +function parseJson(raw) { + if (!raw) { + return {}; + } + if (typeof raw === "object") { + return raw; + } + try { + return JSON.parse(raw); + } catch (error) { + return {}; + } +} + +function safeSectionName(input, index) { + const fallback = `section-${index + 1}`; + const name = String(input || fallback) + .trim() + .replace(/[^a-zA-Z0-9._-]+/g, "_") + .slice(0, 80); + return name || fallback; +} + +function computeSha256(buffer) { + return crypto.createHash("sha256").update(buffer).digest("hex"); +} + +async function handleWebhookJob(payload) { + const eventId = payload?.eventId; + if (!eventId) { + throw new Error("Missing eventId in webhook async job payload."); + } + await processWebhookEventById(String(eventId)); +} + +async function downloadSectionFile({ workflowId, sectionIndex, section }) { + const sectionUrl = pickSectionUrl(section); + if (!sectionUrl) { + throw new Error(`Missing section URL for workflow ${workflowId} section ${sectionIndex}.`); + } + + const baseDir = env.REPORT_DOWNLOAD_DIR; + const workflowDir = path.join(baseDir, String(workflowId)); + ensureDir(baseDir); + ensureDir(workflowDir); + + const sectionName = safeSectionName(section?.name || section?.section_name, sectionIndex); + const filePath = path.join(workflowDir, `${String(sectionIndex + 1).padStart(2, "0")}-${sectionName}.csv`); + + const response = await axios.get(sectionUrl, { + responseType: "arraybuffer", + timeout: 60000 + }); + + const bodyBuffer = Buffer.from(response.data || []); + fs.writeFileSync(filePath, bodyBuffer); + + const csvText = bodyBuffer.toString("utf8"); + const parsed = parseCsvByHeader(csvText); + const sha256 = computeSha256(bodyBuffer); + const contentType = response.headers?.["content-type"] || "text/csv"; + + reportSectionRepository.upsert({ + workflowId: String(workflowId), + sectionIndex: Number(sectionIndex), + sectionName, + downloadUrl: sectionUrl, + filePath, + contentType, + rowCount: parsed?.rows?.length || 0, + headers: parsed?.headers || [], + sha256, + status: "completed", + errorMessage: null + }); +} + +async function handleReportSectionJob(payload) { + const workflowId = payload?.workflowId; + const sectionIndex = payload?.sectionIndex; + const section = payload?.section; + if (!workflowId && workflowId !== 0) { + throw new Error("Missing workflowId in report section download job."); + } + if (sectionIndex === undefined || sectionIndex === null) { + throw new Error("Missing sectionIndex in report section download job."); + } + await downloadSectionFile({ + workflowId, + sectionIndex, + section + }); +} + +async function processJob(job) { + const payload = parseJson(job.payload_json); + if (job.job_type === "uber_webhook_event") { + await handleWebhookJob(payload); + return; + } + if (job.job_type === "report_section_download") { + await handleReportSectionJob(payload); + return; + } + throw new Error(`Unsupported async job type: ${job.job_type}`); +} + +async function processBatch() { + if (isProcessing) { + return; + } + isProcessing = true; + const workerId = `worker-${uuidv4()}`; + + try { + for (let i = 0; i < env.ASYNC_WORKER_BATCH_SIZE; i += 1) { + const job = asyncJobRepository.claimNext({ workerId }); + if (!job) { + break; + } + + try { + await processJob(job); + asyncJobRepository.markCompleted(job.id); + } catch (error) { + asyncJobRepository.markFailedOrRetry({ + id: job.id, + attemptCount: Number(job.attempt_count || 0), + maxAttempts: Number(job.max_attempts || 1), + errorMessage: error?.message || String(error) + }); + + const payload = parseJson(job.payload_json); + if (job.job_type === "uber_webhook_event" && payload?.eventId) { + webhookRepository.markFailed(payload.eventId, error?.message || String(error)); + } + if (job.job_type === "report_section_download") { + reportSectionRepository.upsert({ + workflowId: String(payload?.workflowId || ""), + sectionIndex: Number(payload?.sectionIndex || 0), + sectionName: safeSectionName(payload?.section?.name, Number(payload?.sectionIndex || 0)), + downloadUrl: pickSectionUrl(payload?.section || {}), + status: "failed", + errorMessage: error?.message || String(error) + }); + } + } + } + } finally { + isProcessing = false; + } +} + +function startAsyncWorkers() { + if (!env.ASYNC_WORKERS_ENABLED) { + return; + } + if (workerTimer) { + return; + } + workerTimer = setInterval(() => { + processBatch().catch(() => { + // keep worker alive even when one polling cycle fails + }); + }, env.ASYNC_WORKER_POLL_INTERVAL_MS); + workerTimer.unref?.(); +} + +function stopAsyncWorkers() { + if (workerTimer) { + clearInterval(workerTimer); + workerTimer = null; + } +} + +module.exports = { + startAsyncWorkers, + stopAsyncWorkers +};