Add auto token refresh, async webhook worker queue, and report section downloader

This commit is contained in:
MOHAN 2026-03-29 19:09:55 +05:30
parent 6ff3d800f4
commit a89d67ebee
11 changed files with 793 additions and 110 deletions

View File

@ -15,7 +15,13 @@ const envSchema = z.object({
WEBHOOK_BASIC_AUTH_USERNAME: z.string().optional(),
WEBHOOK_BASIC_AUTH_PASSWORD: z.string().optional(),
SQLITE_PATH: z.string().default("./data/uber_wrapper.db"),
WRAPPER_API_KEY: z.string().optional()
WRAPPER_API_KEY: z.string().optional(),
ASYNC_WORKERS_ENABLED: z
.preprocess((value) => String(value ?? "true").toLowerCase(), z.enum(["true", "false"]))
.transform((value) => value === "true"),
ASYNC_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(1000),
ASYNC_WORKER_BATCH_SIZE: z.coerce.number().int().positive().default(5),
REPORT_DOWNLOAD_DIR: z.string().default("./data/reports")
});
const parsed = envSchema.safeParse(process.env);
@ -27,5 +33,6 @@ if (!parsed.success) {
const env = parsed.data;
env.SQLITE_PATH = path.resolve(process.cwd(), env.SQLITE_PATH);
env.REPORT_DOWNLOAD_DIR = path.resolve(process.cwd(), env.REPORT_DOWNLOAD_DIR);
module.exports = env;

View File

@ -12,5 +12,7 @@ module.exports = {
apiLogRepository: repositories.apiLogRepository,
appTokenRepository: repositories.appTokenRepository,
tokenRequestLogRepository: repositories.tokenRequestLogRepository,
reportJobRepository: repositories.reportJobRepository
reportJobRepository: repositories.reportJobRepository,
asyncJobRepository: repositories.asyncJobRepository,
reportSectionRepository: repositories.reportSectionRepository
};

View File

@ -145,6 +145,10 @@ const uberConnectionRepository = {
};
const webhookRepository = {
findById(id) {
return db.prepare("SELECT * FROM webhook_events WHERE id = ? LIMIT 1").get(id);
},
findByDedupeKey(dedupeKey) {
if (!dedupeKey) {
return null;
@ -191,6 +195,31 @@ const webhookRepository = {
`);
stmt.run(row);
return row;
},
markProcessed(id) {
const timestamp = nowIso();
db.prepare(
`
UPDATE webhook_events
SET processing_status = 'processed',
processed_at = ?,
last_error = NULL
WHERE id = ?
`
).run(timestamp, id);
},
markFailed(id, errorMessage) {
db.prepare(
`
UPDATE webhook_events
SET processing_status = 'failed',
processed_at = ?,
last_error = ?
WHERE id = ?
`
).run(nowIso(), String(errorMessage || "Unknown webhook processing error"), id);
}
};
@ -427,6 +456,15 @@ const appTokenRepository = {
`
)
.get(provider, grantType, scope);
},
deleteByProviderGrantScope({ provider, grantType, scope }) {
db.prepare(
`
DELETE FROM app_tokens
WHERE provider = ? AND grant_type = ? AND scope = ?
`
).run(provider, grantType, scope);
}
};
@ -595,6 +633,211 @@ const reportJobRepository = {
}
};
const asyncJobRepository = {
enqueue({
jobType,
payload,
maxAttempts = 7,
nextRunAt = nowIso()
}) {
const timestamp = nowIso();
const row = {
id: uuidv4(),
job_type: jobType,
payload_json: JSON.stringify(payload || {}),
status: "queued",
attempt_count: 0,
max_attempts: maxAttempts,
next_run_at: nextRunAt,
last_error: null,
lock_token: null,
locked_at: null,
created_at: timestamp,
updated_at: timestamp
};
db.prepare(
`
INSERT INTO async_jobs (
id, job_type, payload_json, status, attempt_count, max_attempts, next_run_at,
last_error, lock_token, locked_at, created_at, updated_at
)
VALUES (
@id, @job_type, @payload_json, @status, @attempt_count, @max_attempts, @next_run_at,
@last_error, @lock_token, @locked_at, @created_at, @updated_at
)
`
).run(row);
return row;
},
claimNext({ workerId, lockTimeoutSeconds = 120 }) {
const timestamp = nowIso();
const lockCutoffIso = new Date(Date.now() - lockTimeoutSeconds * 1000).toISOString();
const selectStmt = db.prepare(
`
SELECT *
FROM async_jobs
WHERE (status = 'queued' OR status = 'retry')
AND next_run_at <= ?
AND (locked_at IS NULL OR locked_at < ?)
ORDER BY created_at ASC
LIMIT 1
`
);
const updateStmt = db.prepare(
`
UPDATE async_jobs
SET status = 'processing',
lock_token = ?,
locked_at = ?,
attempt_count = attempt_count + 1,
updated_at = ?
WHERE id = ?
`
);
const tx = db.transaction(() => {
const row = selectStmt.get(timestamp, lockCutoffIso);
if (!row) {
return null;
}
updateStmt.run(workerId, timestamp, timestamp, row.id);
return db.prepare("SELECT * FROM async_jobs WHERE id = ? LIMIT 1").get(row.id);
});
return tx();
},
markCompleted(id) {
const timestamp = nowIso();
db.prepare(
`
UPDATE async_jobs
SET status = 'completed',
lock_token = NULL,
locked_at = NULL,
updated_at = ?,
last_error = NULL
WHERE id = ?
`
).run(timestamp, id);
},
markFailedOrRetry({ id, attemptCount, maxAttempts, errorMessage }) {
const timestamp = nowIso();
if (attemptCount >= maxAttempts) {
db.prepare(
`
UPDATE async_jobs
SET status = 'failed',
lock_token = NULL,
locked_at = NULL,
updated_at = ?,
last_error = ?
WHERE id = ?
`
).run(timestamp, String(errorMessage || "Unknown async worker error"), id);
return "failed";
}
const backoffSeconds = Math.min(300, Math.max(1, 2 ** Math.max(0, attemptCount - 1)));
const nextRunAt = new Date(Date.now() + backoffSeconds * 1000).toISOString();
db.prepare(
`
UPDATE async_jobs
SET status = 'retry',
next_run_at = ?,
lock_token = NULL,
locked_at = NULL,
updated_at = ?,
last_error = ?
WHERE id = ?
`
).run(nextRunAt, timestamp, String(errorMessage || "Unknown async worker error"), id);
return "retry";
}
};
const reportSectionRepository = {
upsert({
workflowId,
sectionIndex,
sectionName,
downloadUrl,
filePath,
contentType,
rowCount,
headers,
sha256,
status,
errorMessage
}) {
const existing = db
.prepare(
`
SELECT * FROM report_sections
WHERE workflow_id = ? AND section_index = ?
LIMIT 1
`
)
.get(workflowId, sectionIndex);
const timestamp = nowIso();
const row = {
id: existing?.id || uuidv4(),
workflow_id: workflowId,
section_index: sectionIndex,
section_name: sectionName || null,
download_url: downloadUrl || null,
file_path: filePath || existing?.file_path || null,
content_type: contentType || null,
row_count: Number.isFinite(rowCount) ? rowCount : null,
headers_json: headers ? JSON.stringify(headers) : existing?.headers_json || null,
sha256: sha256 || null,
status: status || existing?.status || "queued",
error_message: errorMessage || null,
created_at: existing?.created_at || timestamp,
updated_at: timestamp
};
db.prepare(
`
INSERT INTO report_sections (
id, workflow_id, section_index, section_name, download_url, file_path, content_type,
row_count, headers_json, sha256, status, error_message, created_at, updated_at
)
VALUES (
@id, @workflow_id, @section_index, @section_name, @download_url, @file_path, @content_type,
@row_count, @headers_json, @sha256, @status, @error_message, @created_at, @updated_at
)
ON CONFLICT(workflow_id, section_index) DO UPDATE SET
section_name = excluded.section_name,
download_url = excluded.download_url,
file_path = excluded.file_path,
content_type = excluded.content_type,
row_count = excluded.row_count,
headers_json = excluded.headers_json,
sha256 = excluded.sha256,
status = excluded.status,
error_message = excluded.error_message,
updated_at = excluded.updated_at
`
).run(row);
return db
.prepare(
`
SELECT *
FROM report_sections
WHERE workflow_id = ? AND section_index = ?
LIMIT 1
`
)
.get(workflowId, sectionIndex);
}
};
module.exports = {
merchantRepository,
uberConnectionRepository,
@ -602,5 +845,7 @@ module.exports = {
apiLogRepository,
appTokenRepository,
tokenRequestLogRepository,
reportJobRepository
reportJobRepository,
asyncJobRepository,
reportSectionRepository
};

View File

@ -58,6 +58,7 @@ function initSchema() {
headers_json TEXT,
received_at TEXT NOT NULL,
processed_at TEXT,
last_error TEXT,
processing_status TEXT NOT NULL DEFAULT 'received',
FOREIGN KEY(merchant_id) REFERENCES merchants(id)
);
@ -117,6 +118,44 @@ function initSchema() {
updated_at TEXT NOT NULL,
FOREIGN KEY(merchant_id) REFERENCES merchants(id)
);
CREATE TABLE IF NOT EXISTS async_jobs (
id TEXT PRIMARY KEY,
job_type TEXT NOT NULL,
payload_json TEXT NOT NULL,
status TEXT NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 7,
next_run_at TEXT NOT NULL,
last_error TEXT,
lock_token TEXT,
locked_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_async_jobs_run_window
ON async_jobs(status, next_run_at, created_at);
CREATE TABLE IF NOT EXISTS report_sections (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
section_index INTEGER NOT NULL,
section_name TEXT,
download_url TEXT,
file_path TEXT,
content_type TEXT,
row_count INTEGER,
headers_json TEXT,
sha256 TEXT,
status TEXT NOT NULL,
error_message TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_report_sections_workflow_section
ON report_sections(workflow_id, section_index);
`);
if (!tableHasColumn("webhook_events", "resource_id")) {
@ -163,6 +202,17 @@ function initSchema() {
if (!tableHasColumn("report_jobs", "webhook_payload_json")) {
db.exec("ALTER TABLE report_jobs ADD COLUMN webhook_payload_json TEXT");
}
if (!tableHasColumn("webhook_events", "processed_at")) {
db.exec("ALTER TABLE webhook_events ADD COLUMN processed_at TEXT");
}
if (!tableHasColumn("webhook_events", "processing_status")) {
db.exec("ALTER TABLE webhook_events ADD COLUMN processing_status TEXT");
db.exec("UPDATE webhook_events SET processing_status = 'received' WHERE processing_status IS NULL");
}
if (!tableHasColumn("webhook_events", "last_error")) {
db.exec("ALTER TABLE webhook_events ADD COLUMN last_error TEXT");
}
}
module.exports = {

View File

@ -121,21 +121,23 @@ function parseExpiresAt(expiresInSeconds) {
return new Date(Date.now() + seconds * 1000).toISOString();
}
async function getCachedClientCredentialsToken({ scope }) {
async function getCachedClientCredentialsToken({ scope, forceRefresh = false } = {}) {
const normalizedScope = (scope || AUTH_SCOPES.ORDER).trim().split(/\s+/).sort().join(" ");
const cached = appTokenRepository.findValid({
provider: "uber",
grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS,
scope: normalizedScope
});
if (cached) {
return {
access_token: cached.access_token,
token_type: cached.token_type,
scope: cached.scope,
expires_in: Math.max(0, Math.floor((new Date(cached.expires_at).getTime() - Date.now()) / 1000)),
source: "cache"
};
if (!forceRefresh) {
const cached = appTokenRepository.findValid({
provider: "uber",
grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS,
scope: normalizedScope
});
if (cached) {
return {
access_token: cached.access_token,
token_type: cached.token_type,
scope: cached.scope,
expires_in: Math.max(0, Math.floor((new Date(cached.expires_at).getTime() - Date.now()) / 1000)),
source: "cache"
};
}
}
const requestsLastHour = tokenRequestLogRepository.countInLastHour({

View File

@ -3,8 +3,13 @@ const zlib = require("zlib");
const { promisify } = require("util");
const env = require("../../config/env");
const uberEndpoints = require("../../config/uberEndpoints");
const { uberConnectionRepository, apiLogRepository } = require("../../db/adapter");
const { getCachedClientCredentialsToken, AUTH_SCOPES } = require("../auth/auth.service");
const { uberConnectionRepository, apiLogRepository, appTokenRepository } = require("../../db/adapter");
const {
getCachedClientCredentialsToken,
refreshToken,
AUTH_SCOPES,
AUTH_GRANT_TYPES
} = require("../auth/auth.service");
const { normalizeUberError, isRetryableUberError } = require("../common/errors/uberError");
const { withExponentialBackoffRetry } = require("../common/http/retry");
@ -59,6 +64,43 @@ async function resolveAuthToken({ authMode = "app", merchantId, scopes }) {
};
}
function isUnauthorizedError(error) {
return Number(error?.response?.status || 0) === 401;
}
async function refreshMerchantConnectionToken(merchantId) {
const connection = uberConnectionRepository.findByMerchantId(merchantId);
if (!connection) {
const error = new Error("Uber merchant connection not found for token refresh.");
error.status = 404;
throw error;
}
if (!connection.refresh_token) {
const error = new Error("Refresh token is not available for merchant OAuth connection.");
error.status = 401;
throw error;
}
const tokenData = await refreshToken(connection.refresh_token);
const expiresAt = tokenData.expires_in
? new Date(Date.now() + Number(tokenData.expires_in) * 1000).toISOString()
: connection.expires_at;
const updated = uberConnectionRepository.upsertByMerchantId(merchantId, {
accessToken: tokenData.access_token,
refreshToken: tokenData.refresh_token || connection.refresh_token,
tokenType: tokenData.token_type || connection.token_type || "Bearer",
scope: tokenData.scope || connection.scope || null,
expiresAt,
status: "active"
});
return {
tokenType: updated.token_type || "Bearer",
accessToken: updated.access_token
};
}
async function callUberApi({
merchantId,
method,
@ -71,10 +113,11 @@ async function callUberApi({
scopes,
headers
}) {
const resolvedAuth = await resolveAuthToken({ authMode, merchantId, scopes });
try {
const response = await withExponentialBackoffRetry({
const scopeKey = scopes || AUTH_SCOPES.ORDER;
const normalizedScope = scopeKey.trim().split(/\s+/).sort().join(" ");
let resolvedAuth = await resolveAuthToken({ authMode, merchantId, scopes: normalizedScope });
const buildRequest = () =>
withExponentialBackoffRetry({
fn: async () =>
uberApiClient.request({
method,
@ -91,6 +134,35 @@ async function callUberApi({
baseDelayMs: 300,
shouldRetry: (error) => isRetryableUberError(error)
});
try {
let response;
try {
response = await buildRequest();
} catch (error) {
if (!isUnauthorizedError(error)) {
throw error;
}
if (authMode === "merchant" && merchantId) {
resolvedAuth = await refreshMerchantConnectionToken(merchantId);
} else {
appTokenRepository.deleteByProviderGrantScope({
provider: "uber",
grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS,
scope: normalizedScope
});
const freshToken = await getCachedClientCredentialsToken({
scope: normalizedScope,
forceRefresh: true
});
resolvedAuth = {
tokenType: freshToken.token_type || "Bearer",
accessToken: freshToken.access_token
};
}
response = await buildRequest();
}
apiLogRepository.insert({
merchantId,

View File

@ -1,7 +1,11 @@
const axios = require("axios");
const env = require("../../config/env");
const { apiLogRepository, reportJobRepository } = require("../../db/adapter");
const { getCachedClientCredentialsToken, AUTH_SCOPES } = require("../auth/auth.service");
const { apiLogRepository, reportJobRepository, appTokenRepository } = require("../../db/adapter");
const {
getCachedClientCredentialsToken,
AUTH_SCOPES,
AUTH_GRANT_TYPES
} = require("../auth/auth.service");
const { normalizeUberError, isRetryableUberError } = require("../common/errors/uberError");
const { withExponentialBackoffRetry } = require("../common/http/retry");
@ -14,6 +18,43 @@ function buildAuthorizationHeader(tokenType, accessToken) {
return `${tokenType || "Bearer"} ${accessToken}`;
}
function isUnauthorizedError(error) {
return Number(error?.response?.status || 0) === 401;
}
async function executeWithReportTokenRetry({ requestFactory }) {
const scope = AUTH_SCOPES.REPORT;
let token = await getCachedClientCredentialsToken({ scope });
const run = async () =>
withExponentialBackoffRetry({
fn: async () => requestFactory(token),
maxAttempts: 4,
baseDelayMs: 400,
shouldRetry: (error) => isRetryableUberError(error)
});
try {
return await run();
} catch (error) {
if (!isUnauthorizedError(error)) {
throw error;
}
appTokenRepository.deleteByProviderGrantScope({
provider: "uber",
grantType: AUTH_GRANT_TYPES.CLIENT_CREDENTIALS,
scope
});
token = await getCachedClientCredentialsToken({
scope,
forceRefresh: true
});
return run();
}
}
function parseCsvLine(line) {
const output = [];
let current = "";
@ -89,14 +130,11 @@ async function fetchReport({
requiredHeaders = [],
wrapperRoute = "/api/v1/uber/reporting/fetch"
}) {
const token = await getCachedClientCredentialsToken({
scope: AUTH_SCOPES.REPORT
});
const requestMethod = String(method || "GET").toUpperCase();
try {
const response = await withExponentialBackoffRetry({
fn: async () =>
const response = await executeWithReportTokenRetry({
requestFactory: async (token) =>
reportingClient.request({
method: requestMethod,
url: upstreamPath,
@ -107,10 +145,7 @@ async function fetchReport({
Authorization: buildAuthorizationHeader(token.token_type, token.access_token),
"Content-Type": "application/json"
}
}),
maxAttempts: 4,
baseDelayMs: 400,
shouldRetry: (error) => isRetryableUberError(error)
})
});
const textData =
@ -163,10 +198,6 @@ module.exports = {
startDate,
endDate
}) {
const token = await getCachedClientCredentialsToken({
scope: AUTH_SCOPES.REPORT
});
const body = {
report_type: reportType,
store_uuids: storeUuids || [],
@ -176,8 +207,8 @@ module.exports = {
};
try {
const response = await withExponentialBackoffRetry({
fn: async () =>
const response = await executeWithReportTokenRetry({
requestFactory: async (token) =>
reportingClient.request({
method: "POST",
url: "/v1/eats/report",
@ -186,10 +217,7 @@ module.exports = {
Authorization: buildAuthorizationHeader(token.token_type, token.access_token),
"Content-Type": "application/json"
}
}),
maxAttempts: 4,
baseDelayMs: 400,
shouldRetry: (error) => isRetryableUberError(error)
})
});
const workflowId = response?.data?.workflow_id;

View File

@ -0,0 +1,134 @@
const {
webhookRepository,
uberConnectionRepository,
reportJobRepository,
asyncJobRepository
} = require("../../db/adapter");
function parseJson(rawValue) {
if (!rawValue) {
return {};
}
if (typeof rawValue === "object") {
return rawValue;
}
try {
return JSON.parse(rawValue);
} catch (error) {
return {};
}
}
function extractStoreId(payload) {
return payload?.user_id || payload?.store_id || payload?.resource_id || payload?.store?.id || null;
}
function pickSectionUrl(section = {}) {
return (
section.url ||
section.download_url ||
section.signed_url ||
section.resource_url ||
section.href ||
null
);
}
function enqueueReportSectionJobs(workflowId, sections) {
(sections || []).forEach((section, index) => {
const url = pickSectionUrl(section);
if (!url) {
return;
}
asyncJobRepository.enqueue({
jobType: "report_section_download",
payload: {
workflowId,
sectionIndex: index,
section
},
maxAttempts: 7
});
});
}
function applyProvisioningStateFromWebhook(eventType, payload) {
if (eventType !== "store.provisioned" && eventType !== "store.deprovisioned") {
return;
}
const storeId = extractStoreId(payload);
if (!storeId) {
return;
}
const connection = uberConnectionRepository.findByUberStoreId(String(storeId));
if (!connection) {
return;
}
const nextStatus = eventType === "store.provisioned" ? "active" : "deprovisioned";
uberConnectionRepository.setStatusByMerchantId(connection.merchant_id, nextStatus);
}
function applyMenuRefreshRequestFromWebhook(eventType, payload, headers) {
if (eventType !== "store.menu_refresh_request") {
return;
}
const storeId = extractStoreId(payload);
if (!storeId) {
return;
}
uberConnectionRepository.markMenuRefreshRequestedByStoreId(String(storeId), {
requestedAt: new Date().toISOString(),
webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null,
environment: headers?.["x-environment"] || null
});
}
function applyReportSuccessFromWebhook(eventType, payload) {
if (eventType !== "eats.report.success") {
return;
}
const workflowId = payload?.job_id;
if (!workflowId) {
return;
}
const sections = payload?.report_metadata?.sections || [];
reportJobRepository.markSuccessFromWebhook({
workflowId: String(workflowId),
eventId: payload?.event_id || null,
webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null,
reportType: payload?.report_type || null,
sections,
payload
});
enqueueReportSectionJobs(String(workflowId), sections);
}
async function processWebhookEventById(eventId) {
const event = webhookRepository.findById(eventId);
if (!event) {
return;
}
if (event.processing_status === "processed") {
return;
}
const payload = parseJson(event.payload_json);
const headers = parseJson(event.headers_json);
const eventType = event.event_type || payload?.event_type || payload?.type || "unknown";
applyProvisioningStateFromWebhook(eventType, payload);
applyMenuRefreshRequestFromWebhook(eventType, payload, headers);
applyReportSuccessFromWebhook(eventType, payload);
webhookRepository.markProcessed(eventId);
}
module.exports = {
processWebhookEventById,
pickSectionUrl
};

View File

@ -2,8 +2,7 @@ const crypto = require("crypto");
const env = require("../../config/env");
const {
webhookRepository,
uberConnectionRepository,
reportJobRepository
asyncJobRepository
} = require("../../db/adapter");
function getSignatureFromHeaders(headers) {
@ -69,65 +68,6 @@ function buildDedupeKey(signature, req) {
return crypto.createHash("sha256").update(basis).digest("hex");
}
function extractStoreId(payload) {
return payload?.user_id || payload?.store_id || payload?.resource_id || payload?.store?.id || null;
}
function applyProvisioningStateFromWebhook(eventType, payload) {
if (eventType !== "store.provisioned" && eventType !== "store.deprovisioned") {
return;
}
const storeId = extractStoreId(payload);
if (!storeId) {
return;
}
const connection = uberConnectionRepository.findByUberStoreId(String(storeId));
if (!connection) {
return;
}
const nextStatus = eventType === "store.provisioned" ? "active" : "deprovisioned";
uberConnectionRepository.setStatusByMerchantId(connection.merchant_id, nextStatus);
}
function applyMenuRefreshRequestFromWebhook(eventType, payload, headers) {
if (eventType !== "store.menu_refresh_request") {
return;
}
const storeId = extractStoreId(payload);
if (!storeId) {
return;
}
uberConnectionRepository.markMenuRefreshRequestedByStoreId(String(storeId), {
requestedAt: new Date().toISOString(),
webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null,
environment: headers?.["x-environment"] || null
});
}
function applyReportSuccessFromWebhook(eventType, payload) {
if (eventType !== "eats.report.success") {
return;
}
const workflowId = payload?.job_id;
if (!workflowId) {
return;
}
reportJobRepository.markSuccessFromWebhook({
workflowId: String(workflowId),
eventId: payload?.event_id || null,
webhookMsgUuid: payload?.webhook_meta?.webhook_msg_uuid || null,
reportType: payload?.report_type || null,
sections: payload?.report_metadata?.sections || [],
payload
});
}
async function handleUberWebhook(req, res) {
if (!verifyBasicAuthIfConfigured(req)) {
return res.status(401).json({
@ -161,7 +101,7 @@ async function handleUberWebhook(req, res) {
return res.status(200).end();
}
webhookRepository.insert({
const inserted = webhookRepository.insert({
provider: "uber",
merchantId,
eventType,
@ -173,9 +113,13 @@ async function handleUberWebhook(req, res) {
headersJson: req.headers
});
applyProvisioningStateFromWebhook(eventType, req.body || {});
applyMenuRefreshRequestFromWebhook(eventType, req.body || {}, req.headers || {});
applyReportSuccessFromWebhook(eventType, req.body || {});
asyncJobRepository.enqueue({
jobType: "uber_webhook_event",
payload: {
eventId: inserted.id
},
maxAttempts: 7
});
return res.status(200).end();
}

View File

@ -1,11 +1,12 @@
const app = require("./app");
const env = require("./config/env");
const { initSchema } = require("./db/sqlite");
const { startAsyncWorkers } = require("./workers/asyncWorkers");
initSchema();
startAsyncWorkers();
app.listen(env.PORT, () => {
// eslint-disable-next-line no-console
console.log(`Uber Wrapper listening on http://localhost:${env.PORT}`);
});

198
src/workers/asyncWorkers.js Normal file
View File

@ -0,0 +1,198 @@
const fs = require("fs");
const path = require("path");
const crypto = require("crypto");
const axios = require("axios");
const { v4: uuidv4 } = require("uuid");
const env = require("../config/env");
const { asyncJobRepository, reportSectionRepository, webhookRepository } = require("../db/adapter");
const { processWebhookEventById, pickSectionUrl } = require("../modules/webhooks/webhookProcessor");
const { parseCsvByHeader } = require("../modules/reporting/reporting.service");
let workerTimer = null;
let isProcessing = false;
function ensureDir(dirPath) {
if (!fs.existsSync(dirPath)) {
fs.mkdirSync(dirPath, { recursive: true });
}
}
function parseJson(raw) {
if (!raw) {
return {};
}
if (typeof raw === "object") {
return raw;
}
try {
return JSON.parse(raw);
} catch (error) {
return {};
}
}
function safeSectionName(input, index) {
const fallback = `section-${index + 1}`;
const name = String(input || fallback)
.trim()
.replace(/[^a-zA-Z0-9._-]+/g, "_")
.slice(0, 80);
return name || fallback;
}
function computeSha256(buffer) {
return crypto.createHash("sha256").update(buffer).digest("hex");
}
async function handleWebhookJob(payload) {
const eventId = payload?.eventId;
if (!eventId) {
throw new Error("Missing eventId in webhook async job payload.");
}
await processWebhookEventById(String(eventId));
}
async function downloadSectionFile({ workflowId, sectionIndex, section }) {
const sectionUrl = pickSectionUrl(section);
if (!sectionUrl) {
throw new Error(`Missing section URL for workflow ${workflowId} section ${sectionIndex}.`);
}
const baseDir = env.REPORT_DOWNLOAD_DIR;
const workflowDir = path.join(baseDir, String(workflowId));
ensureDir(baseDir);
ensureDir(workflowDir);
const sectionName = safeSectionName(section?.name || section?.section_name, sectionIndex);
const filePath = path.join(workflowDir, `${String(sectionIndex + 1).padStart(2, "0")}-${sectionName}.csv`);
const response = await axios.get(sectionUrl, {
responseType: "arraybuffer",
timeout: 60000
});
const bodyBuffer = Buffer.from(response.data || []);
fs.writeFileSync(filePath, bodyBuffer);
const csvText = bodyBuffer.toString("utf8");
const parsed = parseCsvByHeader(csvText);
const sha256 = computeSha256(bodyBuffer);
const contentType = response.headers?.["content-type"] || "text/csv";
reportSectionRepository.upsert({
workflowId: String(workflowId),
sectionIndex: Number(sectionIndex),
sectionName,
downloadUrl: sectionUrl,
filePath,
contentType,
rowCount: parsed?.rows?.length || 0,
headers: parsed?.headers || [],
sha256,
status: "completed",
errorMessage: null
});
}
async function handleReportSectionJob(payload) {
const workflowId = payload?.workflowId;
const sectionIndex = payload?.sectionIndex;
const section = payload?.section;
if (!workflowId && workflowId !== 0) {
throw new Error("Missing workflowId in report section download job.");
}
if (sectionIndex === undefined || sectionIndex === null) {
throw new Error("Missing sectionIndex in report section download job.");
}
await downloadSectionFile({
workflowId,
sectionIndex,
section
});
}
async function processJob(job) {
const payload = parseJson(job.payload_json);
if (job.job_type === "uber_webhook_event") {
await handleWebhookJob(payload);
return;
}
if (job.job_type === "report_section_download") {
await handleReportSectionJob(payload);
return;
}
throw new Error(`Unsupported async job type: ${job.job_type}`);
}
async function processBatch() {
if (isProcessing) {
return;
}
isProcessing = true;
const workerId = `worker-${uuidv4()}`;
try {
for (let i = 0; i < env.ASYNC_WORKER_BATCH_SIZE; i += 1) {
const job = asyncJobRepository.claimNext({ workerId });
if (!job) {
break;
}
try {
await processJob(job);
asyncJobRepository.markCompleted(job.id);
} catch (error) {
asyncJobRepository.markFailedOrRetry({
id: job.id,
attemptCount: Number(job.attempt_count || 0),
maxAttempts: Number(job.max_attempts || 1),
errorMessage: error?.message || String(error)
});
const payload = parseJson(job.payload_json);
if (job.job_type === "uber_webhook_event" && payload?.eventId) {
webhookRepository.markFailed(payload.eventId, error?.message || String(error));
}
if (job.job_type === "report_section_download") {
reportSectionRepository.upsert({
workflowId: String(payload?.workflowId || ""),
sectionIndex: Number(payload?.sectionIndex || 0),
sectionName: safeSectionName(payload?.section?.name, Number(payload?.sectionIndex || 0)),
downloadUrl: pickSectionUrl(payload?.section || {}),
status: "failed",
errorMessage: error?.message || String(error)
});
}
}
}
} finally {
isProcessing = false;
}
}
function startAsyncWorkers() {
if (!env.ASYNC_WORKERS_ENABLED) {
return;
}
if (workerTimer) {
return;
}
workerTimer = setInterval(() => {
processBatch().catch(() => {
// keep worker alive even when one polling cycle fails
});
}, env.ASYNC_WORKER_POLL_INTERVAL_MS);
workerTimer.unref?.();
}
function stopAsyncWorkers() {
if (workerTimer) {
clearInterval(workerTimer);
workerTimer = null;
}
}
module.exports = {
startAsyncWorkers,
stopAsyncWorkers
};