diff --git a/routes/pipeline.js b/routes/pipeline.js index c12508f..af6d233 100644 --- a/routes/pipeline.js +++ b/routes/pipeline.js @@ -1,5 +1,5 @@ const express = require("express"); -const { createJob, updateJob, getJob, listJobs, canStartJob } = require("../src/pipelineJobs"); +const { createJob, updateJob, getJob, listJobs, canStartJob, cancelJob } = require("../src/pipelineJobs"); const { runKytPipelineJob } = require("../src/runKytPipelineJob"); const { getToken } = require("../tokenStore"); const { getSource, listSources } = require("../src/business-logic/import-pipeline/sources"); @@ -65,4 +65,12 @@ router.get("/jobs", (req, res) => { return res.json({ jobs: listJobs() }); }); +router.post("/cancel/:jobId", (req, res) => { + const job = cancelJob(req.params.jobId); + if (!job) { + return res.status(404).json({ error: "Job not found or already finished." }); + } + return res.json({ ok: true, jobId: job.id, status: job.status }); +}); + module.exports = router; diff --git a/src/business-logic/import-pipeline/runSourcePipeline.js b/src/business-logic/import-pipeline/runSourcePipeline.js index 2317a11..a7d5fb6 100644 --- a/src/business-logic/import-pipeline/runSourcePipeline.js +++ b/src/business-logic/import-pipeline/runSourcePipeline.js @@ -281,6 +281,13 @@ async function upsertShopifyProductsFromConverted({ return metrics; } +class PipelineCancelledError extends Error { + constructor() { + super("Pipeline cancelled by user"); + this.name = "PipelineCancelledError"; + } +} + async function runFullSourcePipeline(options = {}) { loadDotEnvFile(".env"); @@ -290,14 +297,18 @@ async function runFullSourcePipeline(options = {}) { const runLogger = initRunLogger(runPaths.logsDir); const limit = options.limit || cli.limit || null; const onProgress = typeof options.onProgress === "function" ? options.onProgress : null; + const isCancelled = typeof options.isCancelled === "function" ? options.isCancelled : () => false; + const emitProgress = (stepIndex, stepKey, message) => { if (onProgress) { - onProgress({ - stepIndex, - totalSteps: 6, - stepKey, - message, - }); + onProgress({ stepIndex, totalSteps: 6, stepKey, message }); + } + }; + + const checkCancelled = (stageName) => { + if (isCancelled()) { + console.log(`[PIPELINE] Cancellation detected before stage: ${stageName}`); + throw new PipelineCancelledError(); } }; @@ -305,6 +316,7 @@ async function runFullSourcePipeline(options = {}) { const imageBaseUrl = process.env[source.envImageBaseUrl] || process.env.SOURCE_IMAGE_BASE_URL || source.defaultImageBaseUrl || ""; const brand = process.env.SOURCE_BRAND || process.env.SHOPIFY_BRAND || source.defaultBrand || source.label; + checkCancelled("fetchWebsiteData"); emitProgress(1, "fetchWebsiteData", `Fetching ${source.label} website data`); console.log(`[PIPELINE 1/6] Fetching ${source.label} website data...`); const dataSummary = await source.fetchWebsiteData({ paths: runPaths }); @@ -322,6 +334,7 @@ async function runFullSourcePipeline(options = {}) { ); } + checkCancelled("downloadImages"); emitProgress(2, "downloadImages", "Downloading product images"); console.log("\n[PIPELINE 2/6] Downloading product images..."); const downloadSummary = source.downloadImages @@ -345,6 +358,7 @@ async function runFullSourcePipeline(options = {}) { products: downloadSummary?.productsCount ?? 0, }); + checkCancelled("watermarkImages"); emitProgress(3, "watermarkImages", "Applying watermark to downloaded images"); console.log("\n[PIPELINE 3/6] Applying watermark in-place..."); const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png"; @@ -397,6 +411,7 @@ async function runFullSourcePipeline(options = {}) { concurrency: watermarkSummary?.concurrency ?? "", }); + checkCancelled("uploadImagesToShopifyFiles"); emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files"); console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files..."); const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true); @@ -455,6 +470,7 @@ async function runFullSourcePipeline(options = {}) { concurrency: imageUploadSummary?.concurrency ?? "", }); + checkCancelled("convertToShopifyReady"); emitProgress(5, "convertToShopifyReady", `Converting ${source.label} data to Shopify-ready products`); console.log(`\n[PIPELINE 5/6] Converting ${source.label} data to Shopify-ready products...`); const conversionSummary = await convertAggregatedToShopifyReady({ @@ -469,6 +485,7 @@ async function runFullSourcePipeline(options = {}) { total: conversionSummary?.totalProducts ?? 0, }); + checkCancelled("upsertToShopify"); emitProgress(6, "upsertToShopify", "Upserting products to Shopify"); console.log("\n[PIPELINE 6/6] Upserting products to Shopify..."); const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({ @@ -523,6 +540,7 @@ module.exports = { runFullSourcePipeline, convertAggregatedToShopifyReady, upsertShopifyProductsFromConverted, + PipelineCancelledError, }; if (require.main === module) { diff --git a/src/pipelineJobs.js b/src/pipelineJobs.js index 2b1a1fb..8529ab6 100644 --- a/src/pipelineJobs.js +++ b/src/pipelineJobs.js @@ -96,6 +96,25 @@ function appendJobLog(jobId, line, extraPatch = {}) { return jobs[jobId]; } +function cancelJob(jobId) { + const job = jobs[jobId]; + if (!job) return null; + if (job.status === "done" || job.status === "error" || job.status === "cancelled") return job; + + jobs[jobId] = { + ...job, + cancelled: true, + status: "cancelling", + detail: "Cancellation requested — stopping after current step...", + updatedAt: new Date().toISOString(), + }; + return jobs[jobId]; +} + +function isJobCancelled(jobId) { + return Boolean(jobs[jobId]?.cancelled); +} + module.exports = { listJobs, getJob, @@ -104,4 +123,6 @@ module.exports = { updateJob, appendJobLog, getJobIdForPayload, + cancelJob, + isJobCancelled, }; diff --git a/src/runKytPipelineJob.js b/src/runKytPipelineJob.js index ff6b460..88d162d 100644 --- a/src/runKytPipelineJob.js +++ b/src/runKytPipelineJob.js @@ -1,7 +1,7 @@ const { getToken } = require("../tokenStore"); const { log } = require("../logger"); -const { runFullSourcePipeline } = require("./business-logic/import-pipeline/runSourcePipeline"); -const { updateJob, appendJobLog } = require("./pipelineJobs"); +const { runFullSourcePipeline, PipelineCancelledError } = require("./business-logic/import-pipeline/runSourcePipeline"); +const { updateJob, appendJobLog, isJobCancelled } = require("./pipelineJobs"); function stringifyLogArgs(args) { return args.map((value) => { @@ -250,6 +250,7 @@ async function runKytPipelineJob(job) { sourceKey: source, limit, argv: [], + isCancelled: () => isJobCancelled(job.id), onProgress(progress) { updateJob(job.id, { status: "running", @@ -277,12 +278,22 @@ async function runKytPipelineJob(job) { console.error = originalConsole.error; } } catch (error) { - updateJob(job.id, { - status: "error", - error: error.message, - detail: `Pipeline failed: ${error.message}`, - }); - log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`); + if (error instanceof PipelineCancelledError) { + updateJob(job.id, { + status: "cancelled", + step: "cancelled", + detail: "Import was cancelled by the user.", + error: null, + }); + log(shop, `${source} import job ${job.id} was cancelled by the user`); + } else { + updateJob(job.id, { + status: "error", + error: error.message, + detail: `Pipeline failed: ${error.message}`, + }); + log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`); + } } finally { process.argv = originalArgv; process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP;