From 9480832478097c654ae8aabd5e61e434ab6aafa9 Mon Sep 17 00:00:00 2001 From: MOHAN Date: Tue, 14 Apr 2026 13:26:56 +0530 Subject: [PATCH] Add concurrency handling and logging enhancements to KYT pipeline --- data/99_run_logs/2026-04-13T20-34-31-700Z.log | 2 + data/99_run_logs/2026-04-13T20-36-37-795Z.log | 2 + data/99_run_logs/2026-04-13T20-41-34-605Z.log | 2 + data/99_run_logs/2026-04-13T20-45-18-948Z.log | 2 + src/business-logic/kyt-pipeline/00_index.js | 50 +++- .../03_watermark_downloaded_images.js | 38 ++- .../04_shopify_image_file_uploader.js | 36 ++- src/pipelineJobs.js | 33 +++ src/runKytPipelineJob.js | 253 ++++++++++++++++-- 9 files changed, 383 insertions(+), 35 deletions(-) create mode 100644 data/99_run_logs/2026-04-13T20-34-31-700Z.log create mode 100644 data/99_run_logs/2026-04-13T20-36-37-795Z.log create mode 100644 data/99_run_logs/2026-04-13T20-41-34-605Z.log create mode 100644 data/99_run_logs/2026-04-13T20-45-18-948Z.log diff --git a/data/99_run_logs/2026-04-13T20-34-31-700Z.log b/data/99_run_logs/2026-04-13T20-34-31-700Z.log new file mode 100644 index 0000000..1a2c7b7 --- /dev/null +++ b/data/99_run_logs/2026-04-13T20-34-31-700Z.log @@ -0,0 +1,2 @@ +[2026-04-13T20:34:31.705Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-34-31-700Z.log +[2026-04-13T20:34:31.706Z] [LOG] function function diff --git a/data/99_run_logs/2026-04-13T20-36-37-795Z.log b/data/99_run_logs/2026-04-13T20-36-37-795Z.log new file mode 100644 index 0000000..79afcdf --- /dev/null +++ b/data/99_run_logs/2026-04-13T20-36-37-795Z.log @@ -0,0 +1,2 @@ +[2026-04-13T20:36:37.795Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-36-37-795Z.log +[2026-04-13T20:36:37.796Z] [LOG] function function function diff --git a/data/99_run_logs/2026-04-13T20-41-34-605Z.log b/data/99_run_logs/2026-04-13T20-41-34-605Z.log new file mode 100644 index 0000000..c321ecd --- /dev/null +++ b/data/99_run_logs/2026-04-13T20-41-34-605Z.log @@ -0,0 +1,2 @@ +[2026-04-13T20:41:34.605Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-41-34-605Z.log +[2026-04-13T20:41:34.606Z] [LOG] function diff --git a/data/99_run_logs/2026-04-13T20-45-18-948Z.log b/data/99_run_logs/2026-04-13T20-45-18-948Z.log new file mode 100644 index 0000000..2e313fe --- /dev/null +++ b/data/99_run_logs/2026-04-13T20-45-18-948Z.log @@ -0,0 +1,2 @@ +[2026-04-13T20:45:18.949Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T20-45-18-948Z.log +[2026-04-13T20:45:18.951Z] [LOG] backend-status-parser-ok diff --git a/src/business-logic/kyt-pipeline/00_index.js b/src/business-logic/kyt-pipeline/00_index.js index 3aea544..0571755 100644 --- a/src/business-logic/kyt-pipeline/00_index.js +++ b/src/business-logic/kyt-pipeline/00_index.js @@ -141,6 +141,14 @@ function readBooleanEnv(name, fallback = false) { return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase()); } +function logStageSummary(stepKey, summary = {}) { + const parts = Object.entries(summary) + .filter(([, value]) => value !== undefined && value !== null && value !== "") + .map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`); + + console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`); +} + async function convertAggregatedToShopifyReady({ inputPath = DEFAULT_AGGREGATED_JSON, outputPath = DEFAULT_SHOPIFY_READY_JSON, @@ -301,7 +309,15 @@ async function runFullKytPipeline(options = {}) { console.log("\n[PIPELINE 2/6] Downloading product images..."); const downloadSummary = await downloadProductImagesFromAggregatedJson({ jsonPath: aggregatedPathForRun, - outputDir: imagesDirForRun + outputDir: imagesDirForRun, + concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8) + }); + logStageSummary("downloadImages", { + total: downloadSummary?.totalImagesFound ?? 0, + downloaded: downloadSummary?.downloaded ?? 0, + skipped: downloadSummary?.skipped ?? 0, + failed: downloadSummary?.failed ?? 0, + products: downloadSummary?.productsCount ?? 0 }); emitProgress(3, "watermarkImages", "Applying watermark to downloaded images"); @@ -326,7 +342,8 @@ async function runFullKytPipeline(options = {}) { try { watermarkSummary = await applyWatermarkToDownloadedImages({ imagesDir: imagesDirForRun, - watermarkPath: watermarkPathForRun + watermarkPath: watermarkPathForRun, + concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4) }); } catch (error) { if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) { @@ -346,6 +363,13 @@ async function runFullKytPipeline(options = {}) { } } } + logStageSummary("watermarkImages", { + total: watermarkSummary?.totalImagesFound ?? 0, + processed: watermarkSummary?.processed ?? 0, + skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0, + failed: watermarkSummary?.failed ?? 0, + concurrency: watermarkSummary?.concurrency ?? "" + }); emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files"); console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files..."); @@ -372,7 +396,8 @@ async function runFullKytPipeline(options = {}) { aggregatedJsonPath: aggregatedPathForRun, imagesDir: imagesDirForRun, statePath: "data/04_shopify_image_upload_state.json", - mapPath: DEFAULT_UPLOADED_MAP_JSON + mapPath: DEFAULT_UPLOADED_MAP_JSON, + concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3) }); } catch (error) { const message = String(error?.message || "Unknown image upload error"); @@ -392,6 +417,14 @@ async function runFullKytPipeline(options = {}) { console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false)."); } } + logStageSummary("uploadImagesToShopifyFiles", { + total: imageUploadSummary?.totalTasks ?? 0, + processed: imageUploadSummary?.processed ?? 0, + uploaded: imageUploadSummary?.uploaded ?? 0, + skipped: imageUploadSummary?.skipped ?? 0, + failed: imageUploadSummary?.failed ?? 0, + concurrency: imageUploadSummary?.concurrency ?? "" + }); emitProgress(5, "convertToShopifyReady", "Converting KYT data to Shopify-ready products"); console.log("\n[PIPELINE 5/6] Converting KYT data to Shopify-ready products..."); @@ -402,6 +435,9 @@ async function runFullKytPipeline(options = {}) { imageBaseUrl: process.env.KYT_IMAGE_BASE_URL || "", brand: process.env.SHOPIFY_BRAND || "KYT" }); + logStageSummary("convertToShopifyReady", { + total: conversionSummary?.totalProducts ?? 0 + }); emitProgress(6, "upsertToShopify", "Upserting products to Shopify"); console.log("\n[PIPELINE 6/6] Upserting products to Shopify..."); @@ -413,6 +449,14 @@ async function runFullKytPipeline(options = {}) { enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false), apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10" }); + logStageSummary("upsertToShopify", { + total: shopifyUpsertSummary?.total ?? 0, + processed: shopifyUpsertSummary?.processed ?? 0, + created: shopifyUpsertSummary?.created ?? 0, + updated: shopifyUpsertSummary?.updated ?? 0, + failed: shopifyUpsertSummary?.failed ?? 0, + successRate: shopifyUpsertSummary?.successRate ?? 0 + }); const summary = { completedAt: new Date().toISOString(), diff --git a/src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js b/src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js index 2a47ff4..a12a84c 100644 --- a/src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js +++ b/src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js @@ -17,6 +17,31 @@ const IMAGE_EXTENSIONS = new Set([ ".avif" ]); +async function mapWithConcurrency(items, concurrency, worker) { + const results = new Array(items.length); + let index = 0; + + async function runWorker() { + while (true) { + const current = index; + index += 1; + + if (current >= items.length) { + return; + } + + results[current] = await worker(items[current], current); + } + } + + const workers = Array.from( + { length: Math.max(1, Math.min(concurrency, items.length)) }, + () => runWorker() + ); + await Promise.all(workers); + return results; +} + async function getAllImageFilesRecursively(rootDir) { const files = []; @@ -154,7 +179,8 @@ async function applyWatermarkToDownloadedImages(options = {}) { const { imagesDir = DEFAULT_IMAGES_DIR, watermarkPath = DEFAULT_WATERMARK_PATH, - statePath = DEFAULT_STATE_PATH + statePath = DEFAULT_STATE_PATH, + concurrency = Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4) } = options; const absImagesDir = path.resolve(process.cwd(), imagesDir); @@ -182,8 +208,7 @@ async function applyWatermarkToDownloadedImages(options = {}) { let skipped = 0; let failed = 0; - for (let i = 0; i < imageFiles.length; i += 1) { - const imagePath = imageFiles[i]; + await mapWithConcurrency(imageFiles, concurrency, async (imagePath, i) => { const relativePath = path.relative(absImagesDir, imagePath); try { @@ -191,10 +216,10 @@ async function applyWatermarkToDownloadedImages(options = {}) { if (stateFiles[relativePath] === beforeFingerprint) { skipped += 1; } else { - await watermarkImageInPlace(imagePath, absWatermarkPath); + await watermarkImageInPlace(imagePath, absWatermarkPath); const afterFingerprint = await getFileFingerprint(imagePath); stateFiles[relativePath] = afterFingerprint; - processed += 1; + processed += 1; } } catch (error) { failed += 1; @@ -204,7 +229,7 @@ async function applyWatermarkToDownloadedImages(options = {}) { if ((i + 1) % 50 === 0 || i === imageFiles.length - 1) { console.log(`[WATERMARK] ${i + 1}/${imageFiles.length} processed`); } - } + }); // Keep state only for files that still exist. const currentSet = new Set(imageFiles.map((x) => path.relative(absImagesDir, x))); @@ -226,6 +251,7 @@ async function applyWatermarkToDownloadedImages(options = {}) { imagesDir: absImagesDir, watermarkPath: absWatermarkPath, statePath: absStatePath, + concurrency, totalImagesFound: imageFiles.length, processed, skipped, diff --git a/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js b/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js index 36aadb8..e05ba8b 100644 --- a/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js +++ b/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js @@ -6,6 +6,31 @@ const DEFAULT_IMAGES_DIR = path.join("data", "02_downloaded_product_images"); const DEFAULT_STATE_PATH = path.join("data", "04_shopify_image_upload_state.json"); const DEFAULT_MAP_PATH = path.join("data", "04_shopify_uploaded_images_map.json"); +async function mapWithConcurrency(items, concurrency, worker) { + const results = new Array(items.length); + let index = 0; + + async function runWorker() { + while (true) { + const current = index; + index += 1; + + if (current >= items.length) { + return; + } + + results[current] = await worker(items[current], current); + } + } + + const workers = Array.from( + { length: Math.max(1, Math.min(concurrency, items.length)) }, + () => runWorker() + ); + await Promise.all(workers); + return results; +} + function sanitizeName(value) { return String(value || "") .replace(/[<>:"/\\|?*\x00-\x1F]/g, "_") @@ -276,7 +301,8 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) { aggregatedJsonPath = DEFAULT_AGGREGATED_JSON, imagesDir = DEFAULT_IMAGES_DIR, statePath = DEFAULT_STATE_PATH, - mapPath = DEFAULT_MAP_PATH + mapPath = DEFAULT_MAP_PATH, + concurrency = Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3) } = options; if (!shop) throw new Error("Missing shop (or SHOPIFY_SHOP)."); @@ -305,15 +331,14 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) { let skipped = 0; let failed = 0; - for (let i = 0; i < tasks.length; i += 1) { - const task = tasks[i]; + await mapWithConcurrency(tasks, concurrency, async (task, i) => { processed += 1; try { await fs.access(task.localPath); } catch { failed += 1; - continue; + return; } try { @@ -384,7 +409,7 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) { if ((i + 1) % 25 === 0 || i === tasks.length - 1) { console.log(`[IMG-UPLOAD] ${i + 1}/${tasks.length} processed | uploaded=${uploaded} skipped=${skipped} failed=${failed}`); } - } + }); const finalState = { version: 1, @@ -409,6 +434,7 @@ async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) { aggregatedJsonPath: absAggregatedPath, statePath: absStatePath, mapPath: absMapPath, + concurrency, totalTasks: tasks.length, processed, uploaded, diff --git a/src/pipelineJobs.js b/src/pipelineJobs.js index 174d1de..a6e6bb8 100644 --- a/src/pipelineJobs.js +++ b/src/pipelineJobs.js @@ -1,4 +1,5 @@ const jobs = {}; +const MAX_LOG_LINES = 120; function listJobs() { return Object.values(jobs) @@ -33,6 +34,8 @@ function createJob(payload = {}) { detail: null, summary: null, error: null, + logs: [], + liveStats: {}, payload, startedAt: new Date().toISOString(), updatedAt: new Date().toISOString(), @@ -49,6 +52,35 @@ function updateJob(jobId, patch) { jobs[jobId] = { ...current, ...patch, + liveStats: { + ...(current.liveStats || {}), + ...(patch.liveStats || {}), + }, + updatedAt: new Date().toISOString(), + }; + + return jobs[jobId]; +} + +function appendJobLog(jobId, line, extraPatch = {}) { + const current = jobs[jobId]; + if (!current) { + return null; + } + + const nextLogs = [...(current.logs || []), { + at: new Date().toISOString(), + line, + }].slice(-MAX_LOG_LINES); + + jobs[jobId] = { + ...current, + ...extraPatch, + liveStats: { + ...(current.liveStats || {}), + ...(extraPatch.liveStats || {}), + }, + logs: nextLogs, updatedAt: new Date().toISOString(), }; @@ -61,4 +93,5 @@ module.exports = { canStartJob, createJob, updateJob, + appendJobLog, }; diff --git a/src/runKytPipelineJob.js b/src/runKytPipelineJob.js index b8dcdad..bc34969 100644 --- a/src/runKytPipelineJob.js +++ b/src/runKytPipelineJob.js @@ -2,7 +2,191 @@ const path = require("node:path"); const { getToken } = require("../tokenStore"); const { log } = require("../logger"); const { runFullKytPipeline } = require("./business-logic/kyt-pipeline/00_index"); -const { updateJob } = require("./pipelineJobs"); +const { updateJob, appendJobLog } = require("./pipelineJobs"); + +function stringifyLogArgs(args) { + return args.map((value) => { + if (typeof value === "string") { + return value; + } + try { + return JSON.stringify(value); + } catch { + return String(value); + } + }).join(" "); +} + +function deriveLiveStats(job, line) { + const current = job?.liveStats || {}; + const next = { ...current }; + + const stageSummaryMatch = line.match(/^\[STAGE-SUMMARY\]\s+([^\s|]+)\s+\|\s+(.+)$/i); + if (stageSummaryMatch) { + const stageKey = String(stageSummaryMatch[1] || "").trim(); + const rawFields = String(stageSummaryMatch[2] || ""); + const parsedFields = {}; + + for (const chunk of rawFields.split("|")) { + const [rawKey, ...rawValueParts] = chunk.split("="); + const key = String(rawKey || "").trim(); + const value = rawValueParts.join("=").trim(); + if (!key) { + continue; + } + + if (/^-?\d+(\.\d+)?$/.test(value)) { + parsedFields[key] = Number(value); + } else if (value === "true" || value === "false") { + parsedFields[key] = value === "true"; + } else { + parsedFields[key] = value; + } + } + + next.stageSummaries = { + ...(next.stageSummaries || {}), + [stageKey]: parsedFields, + }; + + if (stageKey === "downloadImages") { + next.download = { + ...(next.download || {}), + done: Number(parsedFields.downloaded || parsedFields.total || 0), + total: Number(parsedFields.total || 0), + skipped: Number(parsedFields.skipped || 0), + failed: Number(parsedFields.failed || 0), + label: `${parsedFields.downloaded || 0}/${parsedFields.total || 0}`, + }; + } + + if (stageKey === "watermarkImages") { + next.watermark = { + ...(next.watermark || {}), + done: Number(parsedFields.processed || 0), + total: Number(parsedFields.total || 0), + skipped: Number(parsedFields.skipped || 0), + failed: Number(parsedFields.failed || 0), + label: `${parsedFields.processed || 0}/${parsedFields.total || 0}`, + }; + } + + if (stageKey === "uploadImagesToShopifyFiles") { + next.upload = { + ...(next.upload || {}), + done: Number(parsedFields.processed || 0), + total: Number(parsedFields.total || 0), + uploaded: Number(parsedFields.uploaded || 0), + skipped: Number(parsedFields.skipped || 0), + failed: Number(parsedFields.failed || 0), + label: `${parsedFields.processed || 0}/${parsedFields.total || 0}`, + }; + } + + if (stageKey === "convertToShopifyReady") { + next.convert = { + total: Number(parsedFields.total || 0), + label: String(parsedFields.total || 0), + }; + } + + if (stageKey === "upsertToShopify") { + next.shopify = { + ...(next.shopify || {}), + done: Number(parsedFields.processed || 0), + total: Number(parsedFields.total || 0), + created: Number(parsedFields.created || 0), + updated: Number(parsedFields.updated || 0), + failed: Number(parsedFields.failed || 0), + successRate: Number(parsedFields.successRate || 0), + label: `${parsedFields.processed || 0}/${parsedFields.total || 0}`, + }; + } + } + + const processedMatch = line.match(/\[(WATERMARK|IMAGES|IMG-UPLOAD)\]\s+(\d+)\/(\d+)\s+processed/i); + if (processedMatch) { + const [, key, done, total] = processedMatch; + const bucket = + key.toUpperCase() === "WATERMARK" + ? "watermark" + : key.toUpperCase() === "IMAGES" + ? "download" + : "upload"; + next[bucket] = { + ...(next[bucket] || {}), + done: Number(done), + total: Number(total), + label: `${done}/${total}`, + }; + } + + const uploadSummaryMatch = line.match(/\[IMG-UPLOAD\]\s+(\d+)\/(\d+)\s+processed\s+\|\s+uploaded=(\d+)\s+skipped=(\d+)\s+failed=(\d+)/i); + if (uploadSummaryMatch) { + next.upload = { + ...(next.upload || {}), + done: Number(uploadSummaryMatch[1]), + total: Number(uploadSummaryMatch[2]), + uploaded: Number(uploadSummaryMatch[3]), + skipped: Number(uploadSummaryMatch[4]), + failed: Number(uploadSummaryMatch[5]), + label: `${uploadSummaryMatch[1]}/${uploadSummaryMatch[2]}`, + }; + } + + const imageFailMatch = line.match(/\[IMAGE-FAIL\]/i); + if (imageFailMatch) { + next.download = { + ...(next.download || {}), + failed: Number(next.download?.failed || 0) + 1, + }; + } + + const watermarkFailMatch = line.match(/\[WATERMARK-FAIL\]/i); + if (watermarkFailMatch) { + next.watermark = { + ...(next.watermark || {}), + failed: Number(next.watermark?.failed || 0) + 1, + }; + } + + const uploadOkMatch = line.match(/\[IMG-UPLOAD-OK\].*status=([A-Z]+)/i); + if (uploadOkMatch) { + const uploaded = Number(next.uploadedOk || 0) + 1; + next.uploadedOk = uploaded; + } + + const detailMatch = line.match(/\[DETAIL\]\s+(\d+)\/(\d+)\s+completed/i); + if (detailMatch) { + next.details = { + done: Number(detailMatch[1]), + total: Number(detailMatch[2]), + label: `${detailMatch[1]}/${detailMatch[2]}`, + }; + } + + const shopifyMatch = line.match(/\[SHOPIFY\]\s+(\d+)\/(\d+)\s+processed\s+\|\s+created=(\d+)\s+updated=(\d+)\s+failed=(\d+)/i); + if (shopifyMatch) { + next.shopify = { + done: Number(shopifyMatch[1]), + total: Number(shopifyMatch[2]), + created: Number(shopifyMatch[3]), + updated: Number(shopifyMatch[4]), + failed: Number(shopifyMatch[5]), + label: `${shopifyMatch[1]}/${shopifyMatch[2]}`, + }; + } + + const shopifyFailMatch = line.match(/\[SHOPIFY-FAIL\]/i); + if (shopifyFailMatch) { + next.shopify = { + ...(next.shopify || {}), + failed: Number(next.shopify?.failed || 0) + 1, + }; + } + + return next; +} async function runKytPipelineJob(job) { const { shop, limit } = job.payload || {}; @@ -42,27 +226,54 @@ async function runKytPipelineJob(job) { detail: `Starting KYT pipeline for ${shop}`, }); - const summary = await runFullKytPipeline({ - onProgress(progress) { - updateJob(job.id, { - status: "running", - step: progress.stepKey, - stepIndex: progress.stepIndex, - totalSteps: progress.totalSteps, - detail: progress.message, - }); - }, - }); + const originalConsole = { + log: console.log, + info: console.info, + warn: console.warn, + error: console.error, + }; - updateJob(job.id, { - status: "done", - step: "completed", - stepIndex: 6, - totalSteps: 6, - detail: "Pipeline completed successfully", - summary, - }); - log(shop, `KYT pipeline completed for job ${job.id}`); + const capture = (level) => (...args) => { + const line = stringifyLogArgs(args); + const currentJob = require("./pipelineJobs").getJob(job.id); + const liveStats = deriveLiveStats(currentJob, line); + appendJobLog(job.id, `[${level}] ${line}`, { liveStats }); + originalConsole[level](...args); + }; + + console.log = capture("log"); + console.info = capture("info"); + console.warn = capture("warn"); + console.error = capture("error"); + + try { + const summary = await runFullKytPipeline({ + onProgress(progress) { + updateJob(job.id, { + status: "running", + step: progress.stepKey, + stepIndex: progress.stepIndex, + totalSteps: progress.totalSteps, + detail: progress.message, + }); + }, + }); + + updateJob(job.id, { + status: "done", + step: "completed", + stepIndex: 6, + totalSteps: 6, + detail: "Pipeline completed successfully", + summary, + }); + log(shop, `KYT pipeline completed for job ${job.id}`); + } finally { + console.log = originalConsole.log; + console.info = originalConsole.info; + console.warn = originalConsole.warn; + console.error = originalConsole.error; + } } catch (error) { updateJob(job.id, { status: "error",