Add job cancellation — backend pipeline + cancel API route

pipelineJobs.js:
- cancelJob(jobId): marks job as cancelled=true, status=cancelling
- isJobCancelled(jobId): checked by the pipeline between stages

runSourcePipeline.js:
- PipelineCancelledError class
- checkCancelled() called before each of the 6 pipeline stages
- Accepts options.isCancelled() callback from the job runner

runKytPipelineJob.js:
- Passes isCancelled: () => isJobCancelled(job.id) into pipeline
- Catches PipelineCancelledError separately, sets status=cancelled

routes/pipeline.js:
- POST /pipeline/cancel/:jobId — marks job for cancellation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
MOHAN 2026-06-04 16:57:35 +05:30
parent 4e536f08b3
commit 08f21d9bc9
4 changed files with 73 additions and 15 deletions

View File

@ -1,5 +1,5 @@
const express = require("express"); const express = require("express");
const { createJob, updateJob, getJob, listJobs, canStartJob } = require("../src/pipelineJobs"); const { createJob, updateJob, getJob, listJobs, canStartJob, cancelJob } = require("../src/pipelineJobs");
const { runKytPipelineJob } = require("../src/runKytPipelineJob"); const { runKytPipelineJob } = require("../src/runKytPipelineJob");
const { getToken } = require("../tokenStore"); const { getToken } = require("../tokenStore");
const { getSource, listSources } = require("../src/business-logic/import-pipeline/sources"); const { getSource, listSources } = require("../src/business-logic/import-pipeline/sources");
@ -65,4 +65,12 @@ router.get("/jobs", (req, res) => {
return res.json({ jobs: listJobs() }); return res.json({ jobs: listJobs() });
}); });
router.post("/cancel/:jobId", (req, res) => {
const job = cancelJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found or already finished." });
}
return res.json({ ok: true, jobId: job.id, status: job.status });
});
module.exports = router; module.exports = router;

View File

@ -281,6 +281,13 @@ async function upsertShopifyProductsFromConverted({
return metrics; return metrics;
} }
class PipelineCancelledError extends Error {
constructor() {
super("Pipeline cancelled by user");
this.name = "PipelineCancelledError";
}
}
async function runFullSourcePipeline(options = {}) { async function runFullSourcePipeline(options = {}) {
loadDotEnvFile(".env"); loadDotEnvFile(".env");
@ -290,14 +297,18 @@ async function runFullSourcePipeline(options = {}) {
const runLogger = initRunLogger(runPaths.logsDir); const runLogger = initRunLogger(runPaths.logsDir);
const limit = options.limit || cli.limit || null; const limit = options.limit || cli.limit || null;
const onProgress = typeof options.onProgress === "function" ? options.onProgress : null; const onProgress = typeof options.onProgress === "function" ? options.onProgress : null;
const isCancelled = typeof options.isCancelled === "function" ? options.isCancelled : () => false;
const emitProgress = (stepIndex, stepKey, message) => { const emitProgress = (stepIndex, stepKey, message) => {
if (onProgress) { if (onProgress) {
onProgress({ onProgress({ stepIndex, totalSteps: 6, stepKey, message });
stepIndex, }
totalSteps: 6, };
stepKey,
message, const checkCancelled = (stageName) => {
}); if (isCancelled()) {
console.log(`[PIPELINE] Cancellation detected before stage: ${stageName}`);
throw new PipelineCancelledError();
} }
}; };
@ -305,6 +316,7 @@ async function runFullSourcePipeline(options = {}) {
const imageBaseUrl = process.env[source.envImageBaseUrl] || process.env.SOURCE_IMAGE_BASE_URL || source.defaultImageBaseUrl || ""; const imageBaseUrl = process.env[source.envImageBaseUrl] || process.env.SOURCE_IMAGE_BASE_URL || source.defaultImageBaseUrl || "";
const brand = process.env.SOURCE_BRAND || process.env.SHOPIFY_BRAND || source.defaultBrand || source.label; const brand = process.env.SOURCE_BRAND || process.env.SHOPIFY_BRAND || source.defaultBrand || source.label;
checkCancelled("fetchWebsiteData");
emitProgress(1, "fetchWebsiteData", `Fetching ${source.label} website data`); emitProgress(1, "fetchWebsiteData", `Fetching ${source.label} website data`);
console.log(`[PIPELINE 1/6] Fetching ${source.label} website data...`); console.log(`[PIPELINE 1/6] Fetching ${source.label} website data...`);
const dataSummary = await source.fetchWebsiteData({ paths: runPaths }); const dataSummary = await source.fetchWebsiteData({ paths: runPaths });
@ -322,6 +334,7 @@ async function runFullSourcePipeline(options = {}) {
); );
} }
checkCancelled("downloadImages");
emitProgress(2, "downloadImages", "Downloading product images"); emitProgress(2, "downloadImages", "Downloading product images");
console.log("\n[PIPELINE 2/6] Downloading product images..."); console.log("\n[PIPELINE 2/6] Downloading product images...");
const downloadSummary = source.downloadImages const downloadSummary = source.downloadImages
@ -345,6 +358,7 @@ async function runFullSourcePipeline(options = {}) {
products: downloadSummary?.productsCount ?? 0, products: downloadSummary?.productsCount ?? 0,
}); });
checkCancelled("watermarkImages");
emitProgress(3, "watermarkImages", "Applying watermark to downloaded images"); emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
console.log("\n[PIPELINE 3/6] Applying watermark in-place..."); console.log("\n[PIPELINE 3/6] Applying watermark in-place...");
const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png"; const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png";
@ -397,6 +411,7 @@ async function runFullSourcePipeline(options = {}) {
concurrency: watermarkSummary?.concurrency ?? "", concurrency: watermarkSummary?.concurrency ?? "",
}); });
checkCancelled("uploadImagesToShopifyFiles");
emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files"); emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files..."); console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true); const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true);
@ -455,6 +470,7 @@ async function runFullSourcePipeline(options = {}) {
concurrency: imageUploadSummary?.concurrency ?? "", concurrency: imageUploadSummary?.concurrency ?? "",
}); });
checkCancelled("convertToShopifyReady");
emitProgress(5, "convertToShopifyReady", `Converting ${source.label} data to Shopify-ready products`); emitProgress(5, "convertToShopifyReady", `Converting ${source.label} data to Shopify-ready products`);
console.log(`\n[PIPELINE 5/6] Converting ${source.label} data to Shopify-ready products...`); console.log(`\n[PIPELINE 5/6] Converting ${source.label} data to Shopify-ready products...`);
const conversionSummary = await convertAggregatedToShopifyReady({ const conversionSummary = await convertAggregatedToShopifyReady({
@ -469,6 +485,7 @@ async function runFullSourcePipeline(options = {}) {
total: conversionSummary?.totalProducts ?? 0, total: conversionSummary?.totalProducts ?? 0,
}); });
checkCancelled("upsertToShopify");
emitProgress(6, "upsertToShopify", "Upserting products to Shopify"); emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
console.log("\n[PIPELINE 6/6] Upserting products to Shopify..."); console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({ const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({
@ -523,6 +540,7 @@ module.exports = {
runFullSourcePipeline, runFullSourcePipeline,
convertAggregatedToShopifyReady, convertAggregatedToShopifyReady,
upsertShopifyProductsFromConverted, upsertShopifyProductsFromConverted,
PipelineCancelledError,
}; };
if (require.main === module) { if (require.main === module) {

View File

@ -96,6 +96,25 @@ function appendJobLog(jobId, line, extraPatch = {}) {
return jobs[jobId]; return jobs[jobId];
} }
function cancelJob(jobId) {
const job = jobs[jobId];
if (!job) return null;
if (job.status === "done" || job.status === "error" || job.status === "cancelled") return job;
jobs[jobId] = {
...job,
cancelled: true,
status: "cancelling",
detail: "Cancellation requested — stopping after current step...",
updatedAt: new Date().toISOString(),
};
return jobs[jobId];
}
function isJobCancelled(jobId) {
return Boolean(jobs[jobId]?.cancelled);
}
module.exports = { module.exports = {
listJobs, listJobs,
getJob, getJob,
@ -104,4 +123,6 @@ module.exports = {
updateJob, updateJob,
appendJobLog, appendJobLog,
getJobIdForPayload, getJobIdForPayload,
cancelJob,
isJobCancelled,
}; };

View File

@ -1,7 +1,7 @@
const { getToken } = require("../tokenStore"); const { getToken } = require("../tokenStore");
const { log } = require("../logger"); const { log } = require("../logger");
const { runFullSourcePipeline } = require("./business-logic/import-pipeline/runSourcePipeline"); const { runFullSourcePipeline, PipelineCancelledError } = require("./business-logic/import-pipeline/runSourcePipeline");
const { updateJob, appendJobLog } = require("./pipelineJobs"); const { updateJob, appendJobLog, isJobCancelled } = require("./pipelineJobs");
function stringifyLogArgs(args) { function stringifyLogArgs(args) {
return args.map((value) => { return args.map((value) => {
@ -250,6 +250,7 @@ async function runKytPipelineJob(job) {
sourceKey: source, sourceKey: source,
limit, limit,
argv: [], argv: [],
isCancelled: () => isJobCancelled(job.id),
onProgress(progress) { onProgress(progress) {
updateJob(job.id, { updateJob(job.id, {
status: "running", status: "running",
@ -277,12 +278,22 @@ async function runKytPipelineJob(job) {
console.error = originalConsole.error; console.error = originalConsole.error;
} }
} catch (error) { } catch (error) {
updateJob(job.id, { if (error instanceof PipelineCancelledError) {
status: "error", updateJob(job.id, {
error: error.message, status: "cancelled",
detail: `Pipeline failed: ${error.message}`, step: "cancelled",
}); detail: "Import was cancelled by the user.",
log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`); error: null,
});
log(shop, `${source} import job ${job.id} was cancelled by the user`);
} else {
updateJob(job.id, {
status: "error",
error: error.message,
detail: `Pipeline failed: ${error.message}`,
});
log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`);
}
} finally { } finally {
process.argv = originalArgv; process.argv = originalArgv;
process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP; process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP;