Add concurrency handling and logging enhancements to KYT pipeline

This commit is contained in:
MOHAN 2026-04-14 13:26:56 +05:30
parent e87bd907ea
commit 9480832478
9 changed files with 383 additions and 35 deletions

View File

@ -0,0 +1,2 @@
[2026-04-13T20:34:31.705Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-34-31-700Z.log
[2026-04-13T20:34:31.706Z] [LOG] function function

View File

@ -0,0 +1,2 @@
[2026-04-13T20:36:37.795Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-36-37-795Z.log
[2026-04-13T20:36:37.796Z] [LOG] function function function

View File

@ -0,0 +1,2 @@
[2026-04-13T20:41:34.605Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-41-34-605Z.log
[2026-04-13T20:41:34.606Z] [LOG] function

View File

@ -0,0 +1,2 @@
[2026-04-13T20:45:18.949Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-45-18-948Z.log
[2026-04-13T20:45:18.951Z] [LOG] backend-status-parser-ok

View File

@ -141,6 +141,14 @@ function readBooleanEnv(name, fallback = false) {
return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase());
}
function logStageSummary(stepKey, summary = {}) {
const parts = Object.entries(summary)
.filter(([, value]) => value !== undefined && value !== null && value !== "")
.map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`);
console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`);
}
async function convertAggregatedToShopifyReady({
inputPath = DEFAULT_AGGREGATED_JSON,
outputPath = DEFAULT_SHOPIFY_READY_JSON,
@ -301,7 +309,15 @@ async function runFullKytPipeline(options = {}) {
console.log("\n[PIPELINE 2/6] Downloading product images...");
const downloadSummary = await downloadProductImagesFromAggregatedJson({
jsonPath: aggregatedPathForRun,
outputDir: imagesDirForRun
outputDir: imagesDirForRun,
concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8)
});
logStageSummary("downloadImages", {
total: downloadSummary?.totalImagesFound ?? 0,
downloaded: downloadSummary?.downloaded ?? 0,
skipped: downloadSummary?.skipped ?? 0,
failed: downloadSummary?.failed ?? 0,
products: downloadSummary?.productsCount ?? 0
});
emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
@ -326,7 +342,8 @@ async function runFullKytPipeline(options = {}) {
try {
watermarkSummary = await applyWatermarkToDownloadedImages({
imagesDir: imagesDirForRun,
watermarkPath: watermarkPathForRun
watermarkPath: watermarkPathForRun,
concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4)
});
} catch (error) {
if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) {
@ -346,6 +363,13 @@ async function runFullKytPipeline(options = {}) {
}
}
}
logStageSummary("watermarkImages", {
total: watermarkSummary?.totalImagesFound ?? 0,
processed: watermarkSummary?.processed ?? 0,
skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0,
failed: watermarkSummary?.failed ?? 0,
concurrency: watermarkSummary?.concurrency ?? ""
});
emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
@ -372,7 +396,8 @@ async function runFullKytPipeline(options = {}) {
aggregatedJsonPath: aggregatedPathForRun,
imagesDir: imagesDirForRun,
statePath: "data/04_shopify_image_upload_state.json",
mapPath: DEFAULT_UPLOADED_MAP_JSON
mapPath: DEFAULT_UPLOADED_MAP_JSON,
concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3)
});
} catch (error) {
const message = String(error?.message || "Unknown image upload error");
@ -392,6 +417,14 @@ async function runFullKytPipeline(options = {}) {
console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false).");
}
}
logStageSummary("uploadImagesToShopifyFiles", {
total: imageUploadSummary?.totalTasks ?? 0,
processed: imageUploadSummary?.processed ?? 0,
uploaded: imageUploadSummary?.uploaded ?? 0,
skipped: imageUploadSummary?.skipped ?? 0,
failed: imageUploadSummary?.failed ?? 0,
concurrency: imageUploadSummary?.concurrency ?? ""
});
emitProgress(5, "convertToShopifyReady", "Converting KYT data to Shopify-ready products");
console.log("\n[PIPELINE 5/6] Converting KYT data to Shopify-ready products...");
@ -402,6 +435,9 @@ async function runFullKytPipeline(options = {}) {
imageBaseUrl: process.env.KYT_IMAGE_BASE_URL || "",
brand: process.env.SHOPIFY_BRAND || "KYT"
});
logStageSummary("convertToShopifyReady", {
total: conversionSummary?.totalProducts ?? 0
});
emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
@ -413,6 +449,14 @@ async function runFullKytPipeline(options = {}) {
enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10"
});
logStageSummary("upsertToShopify", {
total: shopifyUpsertSummary?.total ?? 0,
processed: shopifyUpsertSummary?.processed ?? 0,
created: shopifyUpsertSummary?.created ?? 0,
updated: shopifyUpsertSummary?.updated ?? 0,
failed: shopifyUpsertSummary?.failed ?? 0,
successRate: shopifyUpsertSummary?.successRate ?? 0
});
const summary = {
completedAt: new Date().toISOString(),

View File

@ -17,6 +17,31 @@ const IMAGE_EXTENSIONS = new Set([
".avif"
]);
async function mapWithConcurrency(items, concurrency, worker) {
const results = new Array(items.length);
let index = 0;
async function runWorker() {
while (true) {
const current = index;
index += 1;
if (current >= items.length) {
return;
}
results[current] = await worker(items[current], current);
}
}
const workers = Array.from(
{ length: Math.max(1, Math.min(concurrency, items.length)) },
() => runWorker()
);
await Promise.all(workers);
return results;
}
async function getAllImageFilesRecursively(rootDir) {
const files = [];
@ -154,7 +179,8 @@ async function applyWatermarkToDownloadedImages(options = {}) {
const {
imagesDir = DEFAULT_IMAGES_DIR,
watermarkPath = DEFAULT_WATERMARK_PATH,
statePath = DEFAULT_STATE_PATH
statePath = DEFAULT_STATE_PATH,
concurrency = Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4)
} = options;
const absImagesDir = path.resolve(process.cwd(), imagesDir);
@ -182,8 +208,7 @@ async function applyWatermarkToDownloadedImages(options = {}) {
let skipped = 0;
let failed = 0;
for (let i = 0; i < imageFiles.length; i += 1) {
const imagePath = imageFiles[i];
await mapWithConcurrency(imageFiles, concurrency, async (imagePath, i) => {
const relativePath = path.relative(absImagesDir, imagePath);
try {
@ -191,10 +216,10 @@ async function applyWatermarkToDownloadedImages(options = {}) {
if (stateFiles[relativePath] === beforeFingerprint) {
skipped += 1;
} else {
await watermarkImageInPlace(imagePath, absWatermarkPath);
await watermarkImageInPlace(imagePath, absWatermarkPath);
const afterFingerprint = await getFileFingerprint(imagePath);
stateFiles[relativePath] = afterFingerprint;
processed += 1;
processed += 1;
}
} catch (error) {
failed += 1;
@ -204,7 +229,7 @@ async function applyWatermarkToDownloadedImages(options = {}) {
if ((i + 1) % 50 === 0 || i === imageFiles.length - 1) {
console.log(`[WATERMARK] ${i + 1}/${imageFiles.length} processed`);
}
}
});
// Keep state only for files that still exist.
const currentSet = new Set(imageFiles.map((x) => path.relative(absImagesDir, x)));
@ -226,6 +251,7 @@ async function applyWatermarkToDownloadedImages(options = {}) {
imagesDir: absImagesDir,
watermarkPath: absWatermarkPath,
statePath: absStatePath,
concurrency,
totalImagesFound: imageFiles.length,
processed,
skipped,

View File

@ -6,6 +6,31 @@ const DEFAULT_IMAGES_DIR = path.join("data", "02_downloaded_product_images");
const DEFAULT_STATE_PATH = path.join("data", "04_shopify_image_upload_state.json");
const DEFAULT_MAP_PATH = path.join("data", "04_shopify_uploaded_images_map.json");
async function mapWithConcurrency(items, concurrency, worker) {
const results = new Array(items.length);
let index = 0;
async function runWorker() {
while (true) {
const current = index;
index += 1;
if (current >= items.length) {
return;
}
results[current] = await worker(items[current], current);
}
}
const workers = Array.from(
{ length: Math.max(1, Math.min(concurrency, items.length)) },
() => runWorker()
);
await Promise.all(workers);
return results;
}
function sanitizeName(value) {
return String(value || "")
.replace(/[<>:"/\\|?*\x00-\x1F]/g, "_")
@ -276,7 +301,8 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) {
aggregatedJsonPath = DEFAULT_AGGREGATED_JSON,
imagesDir = DEFAULT_IMAGES_DIR,
statePath = DEFAULT_STATE_PATH,
mapPath = DEFAULT_MAP_PATH
mapPath = DEFAULT_MAP_PATH,
concurrency = Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3)
} = options;
if (!shop) throw new Error("Missing shop (or SHOPIFY_SHOP).");
@ -305,15 +331,14 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) {
let skipped = 0;
let failed = 0;
for (let i = 0; i < tasks.length; i += 1) {
const task = tasks[i];
await mapWithConcurrency(tasks, concurrency, async (task, i) => {
processed += 1;
try {
await fs.access(task.localPath);
} catch {
failed += 1;
continue;
return;
}
try {
@ -384,7 +409,7 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) {
if ((i + 1) % 25 === 0 || i === tasks.length - 1) {
console.log(`[IMG-UPLOAD] ${i + 1}/${tasks.length} processed | uploaded=${uploaded} skipped=${skipped} failed=${failed}`);
}
}
});
const finalState = {
version: 1,
@ -409,6 +434,7 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) {
aggregatedJsonPath: absAggregatedPath,
statePath: absStatePath,
mapPath: absMapPath,
concurrency,
totalTasks: tasks.length,
processed,
uploaded,

View File

@ -1,4 +1,5 @@
const jobs = {};
const MAX_LOG_LINES = 120;
function listJobs() {
return Object.values(jobs)
@ -33,6 +34,8 @@ function createJob(payload = {}) {
detail: null,
summary: null,
error: null,
logs: [],
liveStats: {},
payload,
startedAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
@ -49,6 +52,35 @@ function updateJob(jobId, patch) {
jobs[jobId] = {
...current,
...patch,
liveStats: {
...(current.liveStats || {}),
...(patch.liveStats || {}),
},
updatedAt: new Date().toISOString(),
};
return jobs[jobId];
}
function appendJobLog(jobId, line, extraPatch = {}) {
const current = jobs[jobId];
if (!current) {
return null;
}
const nextLogs = [...(current.logs || []), {
at: new Date().toISOString(),
line,
}].slice(-MAX_LOG_LINES);
jobs[jobId] = {
...current,
...extraPatch,
liveStats: {
...(current.liveStats || {}),
...(extraPatch.liveStats || {}),
},
logs: nextLogs,
updatedAt: new Date().toISOString(),
};
@ -61,4 +93,5 @@ module.exports = {
canStartJob,
createJob,
updateJob,
appendJobLog,
};

View File

@ -2,7 +2,191 @@ const path = require("node:path");
const { getToken } = require("../tokenStore");
const { log } = require("../logger");
const { runFullKytPipeline } = require("./business-logic/kyt-pipeline/00_index");
const { updateJob } = require("./pipelineJobs");
const { updateJob, appendJobLog } = require("./pipelineJobs");
function stringifyLogArgs(args) {
return args.map((value) => {
if (typeof value === "string") {
return value;
}
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}).join(" ");
}
function deriveLiveStats(job, line) {
const current = job?.liveStats || {};
const next = { ...current };
const stageSummaryMatch = line.match(/^\[STAGE-SUMMARY\]\s+([^\s|]+)\s+\|\s+(.+)$/i);
if (stageSummaryMatch) {
const stageKey = String(stageSummaryMatch[1] || "").trim();
const rawFields = String(stageSummaryMatch[2] || "");
const parsedFields = {};
for (const chunk of rawFields.split("|")) {
const [rawKey, ...rawValueParts] = chunk.split("=");
const key = String(rawKey || "").trim();
const value = rawValueParts.join("=").trim();
if (!key) {
continue;
}
if (/^-?\d+(\.\d+)?$/.test(value)) {
parsedFields[key] = Number(value);
} else if (value === "true" || value === "false") {
parsedFields[key] = value === "true";
} else {
parsedFields[key] = value;
}
}
next.stageSummaries = {
...(next.stageSummaries || {}),
[stageKey]: parsedFields,
};
if (stageKey === "downloadImages") {
next.download = {
...(next.download || {}),
done: Number(parsedFields.downloaded || parsedFields.total || 0),
total: Number(parsedFields.total || 0),
skipped: Number(parsedFields.skipped || 0),
failed: Number(parsedFields.failed || 0),
label: `${parsedFields.downloaded || 0}/${parsedFields.total || 0}`,
};
}
if (stageKey === "watermarkImages") {
next.watermark = {
...(next.watermark || {}),
done: Number(parsedFields.processed || 0),
total: Number(parsedFields.total || 0),
skipped: Number(parsedFields.skipped || 0),
failed: Number(parsedFields.failed || 0),
label: `${parsedFields.processed || 0}/${parsedFields.total || 0}`,
};
}
if (stageKey === "uploadImagesToShopifyFiles") {
next.upload = {
...(next.upload || {}),
done: Number(parsedFields.processed || 0),
total: Number(parsedFields.total || 0),
uploaded: Number(parsedFields.uploaded || 0),
skipped: Number(parsedFields.skipped || 0),
failed: Number(parsedFields.failed || 0),
label: `${parsedFields.processed || 0}/${parsedFields.total || 0}`,
};
}
if (stageKey === "convertToShopifyReady") {
next.convert = {
total: Number(parsedFields.total || 0),
label: String(parsedFields.total || 0),
};
}
if (stageKey === "upsertToShopify") {
next.shopify = {
...(next.shopify || {}),
done: Number(parsedFields.processed || 0),
total: Number(parsedFields.total || 0),
created: Number(parsedFields.created || 0),
updated: Number(parsedFields.updated || 0),
failed: Number(parsedFields.failed || 0),
successRate: Number(parsedFields.successRate || 0),
label: `${parsedFields.processed || 0}/${parsedFields.total || 0}`,
};
}
}
const processedMatch = line.match(/\[(WATERMARK|IMAGES|IMG-UPLOAD)\]\s+(\d+)\/(\d+)\s+processed/i);
if (processedMatch) {
const [, key, done, total] = processedMatch;
const bucket =
key.toUpperCase() === "WATERMARK"
? "watermark"
: key.toUpperCase() === "IMAGES"
? "download"
: "upload";
next[bucket] = {
...(next[bucket] || {}),
done: Number(done),
total: Number(total),
label: `${done}/${total}`,
};
}
const uploadSummaryMatch = line.match(/\[IMG-UPLOAD\]\s+(\d+)\/(\d+)\s+processed\s+\|\s+uploaded=(\d+)\s+skipped=(\d+)\s+failed=(\d+)/i);
if (uploadSummaryMatch) {
next.upload = {
...(next.upload || {}),
done: Number(uploadSummaryMatch[1]),
total: Number(uploadSummaryMatch[2]),
uploaded: Number(uploadSummaryMatch[3]),
skipped: Number(uploadSummaryMatch[4]),
failed: Number(uploadSummaryMatch[5]),
label: `${uploadSummaryMatch[1]}/${uploadSummaryMatch[2]}`,
};
}
const imageFailMatch = line.match(/\[IMAGE-FAIL\]/i);
if (imageFailMatch) {
next.download = {
...(next.download || {}),
failed: Number(next.download?.failed || 0) + 1,
};
}
const watermarkFailMatch = line.match(/\[WATERMARK-FAIL\]/i);
if (watermarkFailMatch) {
next.watermark = {
...(next.watermark || {}),
failed: Number(next.watermark?.failed || 0) + 1,
};
}
const uploadOkMatch = line.match(/\[IMG-UPLOAD-OK\].*status=([A-Z]+)/i);
if (uploadOkMatch) {
const uploaded = Number(next.uploadedOk || 0) + 1;
next.uploadedOk = uploaded;
}
const detailMatch = line.match(/\[DETAIL\]\s+(\d+)\/(\d+)\s+completed/i);
if (detailMatch) {
next.details = {
done: Number(detailMatch[1]),
total: Number(detailMatch[2]),
label: `${detailMatch[1]}/${detailMatch[2]}`,
};
}
const shopifyMatch = line.match(/\[SHOPIFY\]\s+(\d+)\/(\d+)\s+processed\s+\|\s+created=(\d+)\s+updated=(\d+)\s+failed=(\d+)/i);
if (shopifyMatch) {
next.shopify = {
done: Number(shopifyMatch[1]),
total: Number(shopifyMatch[2]),
created: Number(shopifyMatch[3]),
updated: Number(shopifyMatch[4]),
failed: Number(shopifyMatch[5]),
label: `${shopifyMatch[1]}/${shopifyMatch[2]}`,
};
}
const shopifyFailMatch = line.match(/\[SHOPIFY-FAIL\]/i);
if (shopifyFailMatch) {
next.shopify = {
...(next.shopify || {}),
failed: Number(next.shopify?.failed || 0) + 1,
};
}
return next;
}
async function runKytPipelineJob(job) {
const { shop, limit } = job.payload || {};
@ -42,27 +226,54 @@ async function runKytPipelineJob(job) {
detail: `Starting KYT pipeline for ${shop}`,
});
const summary = await runFullKytPipeline({
onProgress(progress) {
updateJob(job.id, {
status: "running",
step: progress.stepKey,
stepIndex: progress.stepIndex,
totalSteps: progress.totalSteps,
detail: progress.message,
});
},
});
const originalConsole = {
log: console.log,
info: console.info,
warn: console.warn,
error: console.error,
};
updateJob(job.id, {
status: "done",
step: "completed",
stepIndex: 6,
totalSteps: 6,
detail: "Pipeline completed successfully",
summary,
});
log(shop, `KYT pipeline completed for job ${job.id}`);
const capture = (level) => (...args) => {
const line = stringifyLogArgs(args);
const currentJob = require("./pipelineJobs").getJob(job.id);
const liveStats = deriveLiveStats(currentJob, line);
appendJobLog(job.id, `[${level}] ${line}`, { liveStats });
originalConsole[level](...args);
};
console.log = capture("log");
console.info = capture("info");
console.warn = capture("warn");
console.error = capture("error");
try {
const summary = await runFullKytPipeline({
onProgress(progress) {
updateJob(job.id, {
status: "running",
step: progress.stepKey,
stepIndex: progress.stepIndex,
totalSteps: progress.totalSteps,
detail: progress.message,
});
},
});
updateJob(job.id, {
status: "done",
step: "completed",
stepIndex: 6,
totalSteps: 6,
detail: "Pipeline completed successfully",
summary,
});
log(shop, `KYT pipeline completed for job ${job.id}`);
} finally {
console.log = originalConsole.log;
console.info = originalConsole.info;
console.warn = originalConsole.warn;
console.error = originalConsole.error;
}
} catch (error) {
updateJob(job.id, {
status: "error",