diff --git a/.gitignore b/.gitignore
index 477123c..25cedfd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,3 @@
-@"
node_modules/
.env
.env.*
@@ -6,6 +5,13 @@ dist/
build/
.next/
coverage/
+
+# Runtime/generated data
+data/
+logs/
+*.log
+src/business-logic/import-pipeline/sources/outputs/
+
npm-debug.log*
yarn-debug.log*
yarn-error.log*
@@ -13,4 +19,3 @@ yarn-error.log*
Thumbs.db
.vscode/
.idea/
-"@ | Out-File -Encoding utf8 .gitignore
\ No newline at end of file
diff --git a/package-lock.json b/package-lock.json
index c854b53..16af9e7 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -12,6 +12,7 @@
"cors": "^2.8.5",
"dotenv": "^17.2.0",
"express": "^5.1.0",
+ "playwright": "^1.59.1",
"sharp": "^0.33.5",
"uuid": "^11.1.0",
"xlsx": "^0.18.5"
@@ -926,6 +927,20 @@
"node": ">= 0.8"
}
},
+ "node_modules/fsevents": {
+ "version": "2.3.2",
+ "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.2.tgz",
+ "integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==",
+ "hasInstallScript": true,
+ "license": "MIT",
+ "optional": true,
+ "os": [
+ "darwin"
+ ],
+ "engines": {
+ "node": "^8.16.0 || ^10.6.0 || >=11.0.0"
+ }
+ },
"node_modules/function-bind": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.2.tgz",
@@ -1217,6 +1232,36 @@
"url": "https://opencollective.com/express"
}
},
+ "node_modules/playwright": {
+ "version": "1.59.1",
+ "resolved": "https://registry.npmjs.org/playwright/-/playwright-1.59.1.tgz",
+ "integrity": "sha512-C8oWjPR3F81yljW9o5OxcWzfh6avkVwDD2VYdwIGqTkl+OGFISgypqzfu7dOe4QNLL2aqcWBmI3PMtLIK233lw==",
+ "license": "Apache-2.0",
+ "dependencies": {
+ "playwright-core": "1.59.1"
+ },
+ "bin": {
+ "playwright": "cli.js"
+ },
+ "engines": {
+ "node": ">=18"
+ },
+ "optionalDependencies": {
+ "fsevents": "2.3.2"
+ }
+ },
+ "node_modules/playwright-core": {
+ "version": "1.59.1",
+ "resolved": "https://registry.npmjs.org/playwright-core/-/playwright-core-1.59.1.tgz",
+ "integrity": "sha512-HBV/RJg81z5BiiZ9yPzIiClYV/QMsDCKUyogwH9p3MCP6IYjUFu/MActgYAvK0oWyV9NlwM3GLBjADyWgydVyg==",
+ "license": "Apache-2.0",
+ "bin": {
+ "playwright-core": "cli.js"
+ },
+ "engines": {
+ "node": ">=18"
+ }
+ },
"node_modules/proxy-addr": {
"version": "2.0.7",
"resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz",
diff --git a/package.json b/package.json
index 58c1cef..d392530 100644
--- a/package.json
+++ b/package.json
@@ -12,6 +12,7 @@
"cors": "^2.8.5",
"dotenv": "^17.2.0",
"express": "^5.1.0",
+ "playwright": "^1.59.1",
"sharp": "^0.33.5",
"uuid": "^11.1.0",
"xlsx": "^0.18.5"
diff --git a/routes/pipeline.js b/routes/pipeline.js
index c0998f8..c12508f 100644
--- a/routes/pipeline.js
+++ b/routes/pipeline.js
@@ -2,29 +2,37 @@ const express = require("express");
const { createJob, updateJob, getJob, listJobs, canStartJob } = require("../src/pipelineJobs");
const { runKytPipelineJob } = require("../src/runKytPipelineJob");
const { getToken } = require("../tokenStore");
+const { getSource, listSources } = require("../src/business-logic/import-pipeline/sources");
const router = express.Router();
router.post("/run", async (req, res) => {
- const { shop, limit = null } = req.body || {};
+ const { shop, limit = null, source = "kyt" } = req.body || {};
if (!shop) {
return res.status(400).json({ error: "Missing shop." });
}
+ let sourceConfig;
+ try {
+ sourceConfig = getSource(source);
+ } catch (error) {
+ return res.status(400).json({ error: error.message });
+ }
+
if (!getToken(shop)) {
return res.status(400).json({ error: "No stored Shopify token found for this shop. Complete app auth first." });
}
- if (!canStartJob(shop)) {
- return res.status(409).json({ error: `A KYT pipeline job is already running for ${shop}.` });
+ if (!canStartJob(shop, sourceConfig.sourceKey)) {
+ return res.status(409).json({ error: `An import job is already running for ${shop} and source ${sourceConfig.sourceKey}.` });
}
- const job = createJob({ shop, limit });
+ const job = createJob({ shop, limit, source: sourceConfig.sourceKey });
updateJob(job.id, {
status: "queued",
step: "queued",
- detail: "Job queued",
+ detail: `${sourceConfig.label} import job queued`,
});
setImmediate(() => {
@@ -35,10 +43,15 @@ router.post("/run", async (req, res) => {
jobId: job.id,
status: "queued",
shop,
+ source: sourceConfig.sourceKey,
limit,
});
});
+router.get("/sources", (req, res) => {
+ return res.json({ sources: listSources() });
+});
+
router.get("/status/:jobId", (req, res) => {
const job = getJob(req.params.jobId);
if (!job) {
diff --git a/src/business-logic/_backup/kyt-pipeline/00_index.js b/src/business-logic/_backup/kyt-pipeline/00_index.js
new file mode 100644
index 0000000..5951e65
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/00_index.js
@@ -0,0 +1,26 @@
+const {
+ runFullSourcePipeline,
+ convertAggregatedToShopifyReady,
+ upsertShopifyProductsFromConverted,
+} = require("../import-pipeline/runSourcePipeline");
+
+function runFullKytPipeline(options = {}) {
+ return runFullSourcePipeline({
+ ...options,
+ sourceKey: "kyt",
+ });
+}
+
+module.exports = {
+ runFullKytPipeline,
+ runFullSourcePipeline,
+ convertAggregatedToShopifyReady,
+ upsertShopifyProductsFromConverted,
+};
+
+if (require.main === module) {
+ runFullKytPipeline().catch((error) => {
+ console.error("Full pipeline failed:", error.message);
+ process.exitCode = 1;
+ });
+}
diff --git a/src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js b/src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js
new file mode 100644
index 0000000..2c42565
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js
@@ -0,0 +1,7 @@
+module.exports = require("../import-pipeline/sources/kyt/scraper");
+
+if (require.main === module) {
+ module.exports.getKytIndiaWebsiteData().catch(() => {
+ process.exitCode = 1;
+ });
+}
diff --git a/src/business-logic/_backup/kyt-pipeline/02_download_product_images.js b/src/business-logic/_backup/kyt-pipeline/02_download_product_images.js
new file mode 100644
index 0000000..10f73c5
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/02_download_product_images.js
@@ -0,0 +1,13 @@
+module.exports = require("../import-pipeline/shared/downloadProductImages");
+
+if (require.main === module) {
+ module.exports.downloadProductImagesFromAggregatedJson()
+ .then((summary) => {
+ console.log("\nDownload summary:");
+ console.log(JSON.stringify(summary, null, 2));
+ })
+ .catch((error) => {
+ console.error("Image download failed:", error.message);
+ process.exitCode = 1;
+ });
+}
diff --git a/src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js b/src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js
new file mode 100644
index 0000000..6d64994
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js
@@ -0,0 +1,13 @@
+module.exports = require("../import-pipeline/shared/watermarkImages");
+
+if (require.main === module) {
+ module.exports.applyWatermarkToDownloadedImages()
+ .then((summary) => {
+ console.log("\nWatermark summary:");
+ console.log(JSON.stringify(summary, null, 2));
+ })
+ .catch((error) => {
+ console.error("Watermark run failed:", error.message);
+ process.exitCode = 1;
+ });
+}
diff --git a/src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js b/src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js
new file mode 100644
index 0000000..6ba6902
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js
@@ -0,0 +1,8 @@
+module.exports = require("../import-pipeline/shared/shopifyImageUploader");
+
+if (require.main === module) {
+ module.exports.runStandaloneImageUpload().catch((error) => {
+ console.error("Image upload pipeline failed:", error.message);
+ process.exitCode = 1;
+ });
+}
diff --git a/src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js b/src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js
new file mode 100644
index 0000000..3edb67a
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js
@@ -0,0 +1 @@
+module.exports = require("../import-pipeline/sources/kyt/converter");
diff --git a/src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js b/src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js
new file mode 100644
index 0000000..4569b07
--- /dev/null
+++ b/src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js
@@ -0,0 +1,8 @@
+module.exports = require("../import-pipeline/shared/shopifyProductUpsert");
+
+if (require.main === module) {
+ module.exports.runStandaloneSelfTest().catch((err) => {
+ console.error("Self-test failed:", err.message);
+ process.exit(1);
+ });
+}
diff --git a/src/business-logic/import-pipeline/runSourcePipeline.js b/src/business-logic/import-pipeline/runSourcePipeline.js
new file mode 100644
index 0000000..2317a11
--- /dev/null
+++ b/src/business-logic/import-pipeline/runSourcePipeline.js
@@ -0,0 +1,533 @@
+const fs = require("node:fs/promises");
+const path = require("node:path");
+const fsSync = require("node:fs");
+const { downloadProductImagesFromAggregatedJson } = require("./shared/downloadProductImages");
+const { applyWatermarkToDownloadedImages } = require("./shared/watermarkImages");
+const { uploadKytWatermarkedImagesToShopifyFiles } = require("./shared/shopifyImageUploader");
+const { upsertShopifyProductFull } = require("./shared/shopifyProductUpsert");
+const { getSource } = require("./sources");
+
+function nowIsoLocal() {
+ return new Date().toISOString();
+}
+
+function initRunLogger(logsDir) {
+ const absLogsDir = path.resolve(process.cwd(), logsDir);
+ fsSync.mkdirSync(absLogsDir, { recursive: true });
+
+ const stamp = new Date().toISOString().replace(/[:.]/g, "-");
+ const logPath = path.join(absLogsDir, `${stamp}.log`);
+ const stream = fsSync.createWriteStream(logPath, { flags: "a" });
+
+ const original = {
+ log: console.log.bind(console),
+ info: console.info.bind(console),
+ warn: console.warn.bind(console),
+ error: console.error.bind(console),
+ };
+
+ function writeLine(level, args) {
+ const text = args.map((a) => {
+ if (typeof a === "string") return a;
+ try {
+ return JSON.stringify(a);
+ } catch {
+ return String(a);
+ }
+ }).join(" ");
+ stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`);
+ }
+
+ console.log = (...args) => {
+ writeLine("LOG", args);
+ original.log(...args);
+ };
+ console.info = (...args) => {
+ writeLine("INFO", args);
+ original.info(...args);
+ };
+ console.warn = (...args) => {
+ writeLine("WARN", args);
+ original.warn(...args);
+ };
+ console.error = (...args) => {
+ writeLine("ERROR", args);
+ original.error(...args);
+ };
+
+ console.log(`[RUN-LOG] Writing logs to ${logPath}`);
+ return { logPath, stream, original };
+}
+
+function restoreRunLogger(logger) {
+ if (!logger) return;
+ console.log = logger.original.log;
+ console.info = logger.original.info;
+ console.warn = logger.original.warn;
+ console.error = logger.original.error;
+ logger.stream.end();
+}
+
+function loadDotEnvFile(filePath = ".env") {
+ const abs = path.resolve(process.cwd(), filePath);
+ if (!fsSync.existsSync(abs)) return;
+
+ const raw = fsSync.readFileSync(abs, "utf8");
+ const lines = raw.split(/\r?\n/);
+ for (const line of lines) {
+ const trimmed = line.trim();
+ if (!trimmed || trimmed.startsWith("#")) continue;
+
+ const eq = trimmed.indexOf("=");
+ if (eq <= 0) continue;
+
+ const key = trimmed.slice(0, eq).trim();
+ const value = trimmed.slice(eq + 1).trim();
+ if (key && process.env[key] == null) {
+ process.env[key] = value;
+ }
+ }
+}
+
+function parseCliArgs(argv = process.argv.slice(2)) {
+ const out = {
+ limit: null,
+ source: null,
+ };
+
+ for (let i = 0; i < argv.length; i += 1) {
+ const a = argv[i];
+ if ((a === "--limit" || a === "-n") && argv[i + 1]) {
+ const n = Number.parseInt(argv[i + 1], 10);
+ if (Number.isFinite(n) && n > 0) out.limit = n;
+ }
+ if ((a === "--source" || a === "-s") && argv[i + 1]) {
+ out.source = argv[i + 1];
+ }
+ }
+
+ return out;
+}
+
+async function buildLimitedAggregatedJson(inputPath, limit, outputDir) {
+ const absInputPath = path.resolve(process.cwd(), inputPath);
+ const raw = await fs.readFile(absInputPath, "utf8");
+ const parsed = JSON.parse(raw);
+ const products = Array.isArray(parsed?.products) ? parsed.products : [];
+ const limited = products.slice(0, limit);
+
+ const outPath = path.resolve(process.cwd(), outputDir, `01_products_aggregated.limit_${limit}.json`);
+ const payload = {
+ ...parsed,
+ generatedAt: new Date().toISOString(),
+ products: limited,
+ analysis: {
+ ...(parsed.analysis || {}),
+ totalProductsUnique: limited.length,
+ limitedFrom: products.length,
+ limitApplied: limit,
+ },
+ };
+
+ await fs.mkdir(path.dirname(outPath), { recursive: true });
+ await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8");
+ return {
+ inputPath: absInputPath,
+ outputPath: outPath,
+ totalBefore: products.length,
+ totalAfter: limited.length,
+ };
+}
+
+function readBooleanEnv(name, fallback = false) {
+ const val = process.env[name];
+ if (val == null) return fallback;
+ return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase());
+}
+
+function logStageSummary(stepKey, summary = {}) {
+ const parts = Object.entries(summary)
+ .filter(([, value]) => value !== undefined && value !== null && value !== "")
+ .map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`);
+
+ console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`);
+}
+
+async function convertAggregatedToShopifyReady({
+ source,
+ inputPath,
+ outputPath,
+ uploadedImageMapPath,
+ imageBaseUrl,
+ brand,
+}) {
+ const absInputPath = path.resolve(process.cwd(), inputPath);
+ const absOutputPath = path.resolve(process.cwd(), outputPath);
+ const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath);
+
+ const raw = await fs.readFile(absInputPath, "utf8");
+ const parsed = JSON.parse(raw);
+ let uploadedImageMap = null;
+ try {
+ const mapRaw = await fs.readFile(absUploadedMapPath, "utf8");
+ uploadedImageMap = JSON.parse(mapRaw);
+ } catch {
+ uploadedImageMap = null;
+ }
+
+ const convertedProducts = source.convertToShopifyProducts(parsed, {
+ imageBaseUrl,
+ brand,
+ uploadedImageMap,
+ });
+
+ const payload = {
+ generatedAt: new Date().toISOString(),
+ sourceKey: source.sourceKey,
+ sourceLabel: source.label,
+ sourceFile: absInputPath,
+ totalProducts: convertedProducts.length,
+ products: convertedProducts,
+ };
+
+ await fs.mkdir(path.dirname(absOutputPath), { recursive: true });
+ await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8");
+
+ return {
+ inputPath: absInputPath,
+ outputPath: absOutputPath,
+ uploadedImageMapPath: absUploadedMapPath,
+ totalProducts: convertedProducts.length,
+ };
+}
+
+async function upsertShopifyProductsFromConverted({
+ convertedPath,
+ shop = process.env.SHOPIFY_SHOP,
+ accessToken = process.env.SHOPIFY_ACCESS_TOKEN,
+ locationId = process.env.SHOPIFY_LOCATION_ID || null,
+ enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
+ apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10",
+} = {}) {
+ if (!shop) {
+ throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage.");
+ }
+ if (!accessToken) {
+ throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage.");
+ }
+
+ const absConvertedPath = path.resolve(process.cwd(), convertedPath);
+ const raw = await fs.readFile(absConvertedPath, "utf8");
+ const parsed = JSON.parse(raw);
+ const products = Array.isArray(parsed?.products) ? parsed.products : [];
+
+ const startedAtMs = Date.now();
+ const metrics = {
+ sourcePath: absConvertedPath,
+ total: products.length,
+ processed: 0,
+ created: 0,
+ updated: 0,
+ failed: 0,
+ errors: [],
+ startedAt: new Date(startedAtMs).toISOString(),
+ finishedAt: null,
+ durationSeconds: 0,
+ successRate: 0,
+ };
+
+ for (let i = 0; i < products.length; i += 1) {
+ const product = products[i];
+ const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`;
+
+ try {
+ const result = await upsertShopifyProductFull({
+ shop,
+ accessToken,
+ product,
+ locationId,
+ enableSeo,
+ apiVersion,
+ });
+
+ metrics.processed += 1;
+ if (result.action === "created") metrics.created += 1;
+ if (result.action === "updated") metrics.updated += 1;
+
+ if ((i + 1) % 10 === 0 || i === products.length - 1) {
+ console.log(
+ `[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}`
+ );
+ }
+ } catch (error) {
+ metrics.processed += 1;
+ metrics.failed += 1;
+ metrics.errors.push({
+ index: i + 1,
+ product: label,
+ error: error.message,
+ });
+ console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`);
+ }
+ }
+
+ const endedAtMs = Date.now();
+ metrics.finishedAt = new Date(endedAtMs).toISOString();
+ metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2));
+ metrics.successRate = metrics.total > 0
+ ? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2))
+ : 0;
+
+ return metrics;
+}
+
+async function runFullSourcePipeline(options = {}) {
+ loadDotEnvFile(".env");
+
+ const cli = parseCliArgs(options.argv || process.argv.slice(2));
+ const source = getSource(options.sourceKey || cli.source || "kyt");
+ const runPaths = source.paths();
+ const runLogger = initRunLogger(runPaths.logsDir);
+ const limit = options.limit || cli.limit || null;
+ const onProgress = typeof options.onProgress === "function" ? options.onProgress : null;
+ const emitProgress = (stepIndex, stepKey, message) => {
+ if (onProgress) {
+ onProgress({
+ stepIndex,
+ totalSteps: 6,
+ stepKey,
+ message,
+ });
+ }
+ };
+
+ try {
+ const imageBaseUrl = process.env[source.envImageBaseUrl] || process.env.SOURCE_IMAGE_BASE_URL || source.defaultImageBaseUrl || "";
+ const brand = process.env.SOURCE_BRAND || process.env.SHOPIFY_BRAND || source.defaultBrand || source.label;
+
+ emitProgress(1, "fetchWebsiteData", `Fetching ${source.label} website data`);
+ console.log(`[PIPELINE 1/6] Fetching ${source.label} website data...`);
+ const dataSummary = await source.fetchWebsiteData({ paths: runPaths });
+
+ let aggregatedPathForRun = runPaths.aggregatedJson;
+ let imagesDirForRun = runPaths.downloadedImagesDir;
+ let limitSummary = null;
+
+ if (limit) {
+ limitSummary = await buildLimitedAggregatedJson(runPaths.aggregatedJson, limit, path.dirname(runPaths.aggregatedJson));
+ aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/");
+ imagesDirForRun = `${runPaths.downloadedImagesDir}.limit_${limit}`;
+ console.log(
+ `[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}`
+ );
+ }
+
+ emitProgress(2, "downloadImages", "Downloading product images");
+ console.log("\n[PIPELINE 2/6] Downloading product images...");
+ const downloadSummary = source.downloadImages
+ ? await source.downloadImages({
+ aggregatedJsonPath: aggregatedPathForRun,
+ imagesDir: imagesDirForRun,
+ imageBaseUrl,
+ concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8),
+ })
+ : await downloadProductImagesFromAggregatedJson({
+ jsonPath: aggregatedPathForRun,
+ outputDir: imagesDirForRun,
+ baseImageUrl: imageBaseUrl,
+ concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8),
+ });
+ logStageSummary("downloadImages", {
+ total: downloadSummary?.totalImagesFound ?? 0,
+ downloaded: downloadSummary?.downloaded ?? 0,
+ skipped: downloadSummary?.skipped ?? 0,
+ failed: downloadSummary?.failed ?? 0,
+ products: downloadSummary?.productsCount ?? 0,
+ });
+
+ emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
+ console.log("\n[PIPELINE 3/6] Applying watermark in-place...");
+ const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png";
+ let watermarkSummary;
+ const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun);
+ if (!fsSync.existsSync(absWatermarkPath)) {
+ watermarkSummary = {
+ skipped: true,
+ reason: `Watermark file not found: ${absWatermarkPath}`,
+ imagesDir: path.resolve(process.cwd(), imagesDirForRun),
+ watermarkPath: absWatermarkPath,
+ totalImagesFound: 0,
+ processed: 0,
+ skippedCount: 0,
+ failed: 0,
+ };
+ console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`);
+ } else {
+ const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false);
+ try {
+ watermarkSummary = await applyWatermarkToDownloadedImages({
+ imagesDir: imagesDirForRun,
+ watermarkPath: watermarkPathForRun,
+ statePath: runPaths.watermarkState,
+ concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4),
+ });
+ } catch (error) {
+ if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) {
+ watermarkSummary = {
+ skipped: true,
+ reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`,
+ imagesDir: path.resolve(process.cwd(), imagesDirForRun),
+ watermarkPath: absWatermarkPath,
+ totalImagesFound: 0,
+ processed: 0,
+ skippedCount: 0,
+ failed: 0,
+ };
+ console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`);
+ } else {
+ throw error;
+ }
+ }
+ }
+ logStageSummary("watermarkImages", {
+ total: watermarkSummary?.totalImagesFound ?? 0,
+ processed: watermarkSummary?.processed ?? 0,
+ skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0,
+ failed: watermarkSummary?.failed ?? 0,
+ concurrency: watermarkSummary?.concurrency ?? "",
+ });
+
+ emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
+ console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
+ const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true);
+ const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false);
+ let imageUploadSummary;
+
+ if (!imageUploadEnabled) {
+ imageUploadSummary = {
+ skipped: true,
+ reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false",
+ totalTasks: 0,
+ processed: 0,
+ uploaded: 0,
+ failed: 0,
+ };
+ console.log("[PIPELINE 4/6] Image upload stage skipped by config.");
+ } else {
+ try {
+ const imageUploadInput = {
+ shop: process.env.SHOPIFY_SHOP,
+ accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
+ apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
+ aggregatedJsonPath: aggregatedPathForRun,
+ imagesDir: imagesDirForRun,
+ statePath: runPaths.imageUploadState,
+ mapPath: runPaths.uploadedMapJson,
+ concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3),
+ };
+ imageUploadSummary = source.uploadImagesToShopifyFiles
+ ? await source.uploadImagesToShopifyFiles(imageUploadInput)
+ : await uploadKytWatermarkedImagesToShopifyFiles(imageUploadInput);
+ } catch (error) {
+ const message = String(error?.message || "Unknown image upload error");
+ imageUploadSummary = {
+ skipped: true,
+ reason: message,
+ totalTasks: 0,
+ processed: 0,
+ uploaded: 0,
+ failed: 0,
+ };
+
+ console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`);
+ if (imageUploadRequired) {
+ throw error;
+ }
+ console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false).");
+ }
+ }
+ logStageSummary("uploadImagesToShopifyFiles", {
+ total: imageUploadSummary?.totalTasks ?? 0,
+ processed: imageUploadSummary?.processed ?? 0,
+ uploaded: imageUploadSummary?.uploaded ?? 0,
+ skipped: imageUploadSummary?.skipped ?? 0,
+ failed: imageUploadSummary?.failed ?? 0,
+ concurrency: imageUploadSummary?.concurrency ?? "",
+ });
+
+ emitProgress(5, "convertToShopifyReady", `Converting ${source.label} data to Shopify-ready products`);
+ console.log(`\n[PIPELINE 5/6] Converting ${source.label} data to Shopify-ready products...`);
+ const conversionSummary = await convertAggregatedToShopifyReady({
+ source,
+ inputPath: aggregatedPathForRun,
+ outputPath: runPaths.shopifyReadyJson,
+ uploadedImageMapPath: runPaths.uploadedMapJson,
+ imageBaseUrl,
+ brand,
+ });
+ logStageSummary("convertToShopifyReady", {
+ total: conversionSummary?.totalProducts ?? 0,
+ });
+
+ emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
+ console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
+ const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({
+ convertedPath: runPaths.shopifyReadyJson,
+ shop: process.env.SHOPIFY_SHOP,
+ accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
+ locationId: process.env.SHOPIFY_LOCATION_ID || null,
+ enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
+ apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
+ });
+ logStageSummary("upsertToShopify", {
+ total: shopifyUpsertSummary?.total ?? 0,
+ processed: shopifyUpsertSummary?.processed ?? 0,
+ created: shopifyUpsertSummary?.created ?? 0,
+ updated: shopifyUpsertSummary?.updated ?? 0,
+ failed: shopifyUpsertSummary?.failed ?? 0,
+ successRate: shopifyUpsertSummary?.successRate ?? 0,
+ });
+
+ const summary = {
+ source: {
+ key: source.sourceKey,
+ label: source.label,
+ },
+ paths: runPaths,
+ completedAt: new Date().toISOString(),
+ runLogPath: runLogger.logPath,
+ limit: limit || null,
+ limitSummary,
+ steps: {
+ fetchWebsiteData: dataSummary,
+ downloadImages: downloadSummary,
+ watermarkImages: watermarkSummary,
+ uploadImagesToShopifyFiles: imageUploadSummary,
+ convertToShopifyReady: conversionSummary,
+ upsertToShopify: shopifyUpsertSummary,
+ },
+ upcomingSteps: [],
+ };
+
+ emitProgress(6, "completed", `${source.label} pipeline completed`);
+ console.log("\n=== FULL PIPELINE SUMMARY ===");
+ console.log(JSON.stringify(summary, null, 2));
+
+ return summary;
+ } finally {
+ restoreRunLogger(runLogger);
+ }
+}
+
+module.exports = {
+ runFullSourcePipeline,
+ convertAggregatedToShopifyReady,
+ upsertShopifyProductsFromConverted,
+};
+
+if (require.main === module) {
+ runFullSourcePipeline().catch((error) => {
+ console.error("Full pipeline failed:", error.message);
+ process.exitCode = 1;
+ });
+}
diff --git a/src/business-logic/kyt-pipeline/02_download_product_images.js b/src/business-logic/import-pipeline/shared/downloadProductImages.js
similarity index 85%
rename from src/business-logic/kyt-pipeline/02_download_product_images.js
rename to src/business-logic/import-pipeline/shared/downloadProductImages.js
index b55db9b..b77efe8 100644
--- a/src/business-logic/kyt-pipeline/02_download_product_images.js
+++ b/src/business-logic/import-pipeline/shared/downloadProductImages.js
@@ -21,7 +21,7 @@ function encodePathKeepingSlashes(relativePath) {
}
function getExtFromPathOrType(imagePath, contentType) {
- const parsed = path.extname(imagePath || "").toLowerCase();
+ const parsed = path.extname(getPathWithoutQuery(imagePath)).toLowerCase();
if (parsed) {
return parsed;
}
@@ -38,6 +38,35 @@ function getExtFromPathOrType(imagePath, contentType) {
return ".jpg";
}
+function isAbsoluteUrl(value) {
+ return /^https?:\/\//i.test(String(value || ""));
+}
+
+function getPathWithoutQuery(value) {
+ const raw = String(value || "");
+ try {
+ return new URL(raw).pathname;
+ } catch {
+ return raw.split(/[?#]/)[0];
+ }
+}
+
+function buildImageUrl(imagePath, baseImageUrl) {
+ if (isAbsoluteUrl(imagePath)) {
+ return String(imagePath);
+ }
+
+ const encoded = encodePathKeepingSlashes(imagePath);
+ return `${String(baseImageUrl || "").replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`;
+}
+
+function getFilePartsFromImagePath(imagePath, fallbackIndex) {
+ const cleanPath = getPathWithoutQuery(imagePath);
+ const originalBase = path.basename(cleanPath, path.extname(cleanPath)) || `image_${fallbackIndex + 1}`;
+ const originalExt = path.extname(cleanPath) || ".png";
+ return { originalBase, originalExt };
+}
+
async function downloadWithRetry(url, retries = 3) {
let lastError;
@@ -129,11 +158,8 @@ async function downloadProductImagesFromAggregatedJson(options = {}) {
for (let idx = 0; idx < imagePaths.length; idx += 1) {
const relPath = imagePaths[idx];
- const encoded = encodePathKeepingSlashes(relPath);
- const imageUrl = `${baseImageUrl.replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`;
-
- const originalBase = path.basename(String(relPath || ""), path.extname(String(relPath || ""))) || `image_${idx + 1}`;
- const originalExt = path.extname(String(relPath || "")) || ".png";
+ const imageUrl = buildImageUrl(relPath, baseImageUrl);
+ const { originalBase, originalExt } = getFilePartsFromImagePath(relPath, idx);
let localBase = sanitizeName(originalBase) || `image_${idx + 1}`;
let fileName = `${localBase}${originalExt}`;
diff --git a/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js b/src/business-logic/import-pipeline/shared/shopifyImageUploader.js
similarity index 97%
rename from src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js
rename to src/business-logic/import-pipeline/shared/shopifyImageUploader.js
index e05ba8b..af01af9 100644
--- a/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js
+++ b/src/business-logic/import-pipeline/shared/shopifyImageUploader.js
@@ -39,6 +39,15 @@ function sanitizeName(value) {
.slice(0, 150);
}
+function getPathWithoutQuery(value) {
+ const raw = String(value || "");
+ try {
+ return new URL(raw).pathname;
+ } catch {
+ return raw.split(/[?#]/)[0];
+ }
+}
+
function getMimeType(filePath) {
const ext = path.extname(filePath).toLowerCase();
if (ext === ".png") return "image/png";
@@ -258,8 +267,9 @@ function buildLocalImageTasks(aggregatedPayload, imagesDir) {
for (let idx = 0; idx < imagePaths.length; idx += 1) {
const sourcePath = imagePaths[idx];
- const originalBase = path.basename(String(sourcePath || ""), path.extname(String(sourcePath || ""))) || `image_${idx + 1}`;
- const originalExt = path.extname(String(sourcePath || "")) || ".png";
+ const cleanSourcePath = getPathWithoutQuery(sourcePath);
+ const originalBase = path.basename(cleanSourcePath, path.extname(cleanSourcePath)) || `image_${idx + 1}`;
+ const originalExt = path.extname(cleanSourcePath) || ".png";
const localBase = sanitizeName(originalBase) || `image_${idx + 1}`;
let fileName = `${localBase}${originalExt}`;
diff --git a/src/business-logic/kyt-pipeline/06_shopify_product_upsert.js b/src/business-logic/import-pipeline/shared/shopifyProductUpsert.js
similarity index 61%
rename from src/business-logic/kyt-pipeline/06_shopify_product_upsert.js
rename to src/business-logic/import-pipeline/shared/shopifyProductUpsert.js
index 3509297..f103bfd 100644
--- a/src/business-logic/kyt-pipeline/06_shopify_product_upsert.js
+++ b/src/business-logic/import-pipeline/shared/shopifyProductUpsert.js
@@ -98,6 +98,207 @@ function normalizeFitmentTags(input) {
return { fitmentMap, fitmentFlat };
}
+function normalizeOptionKey(optionValues = []) {
+ return optionValues
+ .map((item) => `${String(item.name || "").trim().toLowerCase()}=${String(item.value || "").trim().toLowerCase()}`)
+ .sort()
+ .join("|");
+}
+
+function getDesiredOptions(attrs) {
+ const options = Array.isArray(attrs.options) ? attrs.options : [];
+ return options
+ .map((option) => ({
+ name: String(option.name || "").trim(),
+ values: Array.from(new Set((Array.isArray(option.values) ? option.values : []).map((value) => String(value || "").trim()).filter(Boolean))),
+ }))
+ .filter((option) => option.name && option.values.length);
+}
+
+function getDesiredVariants(attrs, finalPrice, compareAtPrice) {
+ const variants = Array.isArray(attrs.variants) ? attrs.variants : [];
+ if (!variants.length) {
+ return [
+ {
+ sku: attrs.part_number || "",
+ price: finalPrice,
+ compareAtPrice,
+ quantity: Number(attrs.total_quantity ?? attrs.quantity ?? 0),
+ optionValues: [],
+ },
+ ];
+ }
+
+ return variants.map((variant, index) => ({
+ sku: String(variant.sku || attrs.part_number || `${attrs.part_number || "variant"}-${index + 1}`).slice(0, 64),
+ price: Number(variant.price ?? finalPrice),
+ compareAtPrice: variant.compare_price != null ? Number(variant.compare_price) : compareAtPrice,
+ quantity: Number(variant.quantity ?? attrs.total_quantity ?? attrs.quantity ?? 0),
+ optionValues: Array.isArray(variant.optionValues)
+ ? variant.optionValues
+ .map((item) => ({
+ name: String(item.name || "").trim(),
+ value: String(item.value || "").trim(),
+ }))
+ .filter((item) => item.name && item.value)
+ : [],
+ }));
+}
+
+function buildVariantBulkInput(variant, attrs, weightValue, variantId = null) {
+ const input = {
+ ...(variantId ? { id: variantId } : {}),
+ price: Number(variant.price || 0).toFixed(2),
+ ...(variant.compareAtPrice !== null && variant.compareAtPrice !== undefined
+ ? { compareAtPrice: Number(variant.compareAtPrice).toFixed(2) }
+ : {}),
+ ...(attrs.barcode ? { barcode: attrs.barcode } : {}),
+ inventoryItem: {
+ sku: variant.sku || attrs.part_number || "",
+ measurement: { weight: { value: weightValue, unit: "POUNDS" } },
+ },
+ };
+
+ if (!variantId && variant.optionValues?.length) {
+ input.optionValues = variant.optionValues.map((optionValue) => ({
+ name: optionValue.value,
+ optionName: optionValue.name,
+ }));
+ }
+
+ return input;
+}
+
+async function ensureProductOptions(client, productId, currentOptions, desiredOptions) {
+ if (!desiredOptions.length) {
+ return currentOptions || [];
+ }
+
+ const existingNames = new Set((currentOptions || []).map((option) => String(option.name || "").toLowerCase()));
+ const missingOptions = desiredOptions.filter((option) => !existingNames.has(option.name.toLowerCase()));
+ if (!missingOptions.length) {
+ return currentOptions || [];
+ }
+
+ const created = await gql(
+ client,
+ `mutation($productId: ID!, $options: [OptionCreateInput!]!, $variantStrategy: ProductOptionCreateVariantStrategy) {
+ productOptionsCreate(productId: $productId, options: $options, variantStrategy: $variantStrategy) {
+ product {
+ id
+ options { id name values position optionValues { id name hasVariants } }
+ }
+ userErrors { field message code }
+ }
+ }`,
+ {
+ productId,
+ options: missingOptions.map((option) => ({
+ name: option.name,
+ values: option.values.map((value) => ({ name: value })),
+ })),
+ variantStrategy: "CREATE",
+ }
+ );
+
+ const errs = created.productOptionsCreate.userErrors || [];
+ if (errs.length) {
+ throw new Error(`productOptionsCreate failed: ${errs.map((e) => e.message).join(", ")}`);
+ }
+
+ return created.productOptionsCreate.product?.options || currentOptions || [];
+}
+
+async function fetchProductVariantState(client, productId) {
+ const data = await gql(
+ client,
+ `query($id: ID!) {
+ product(id: $id) {
+ id
+ options { id name values position optionValues { id name hasVariants } }
+ variants(first: 250) {
+ nodes {
+ id
+ sku
+ selectedOptions { name value }
+ inventoryItem { id }
+ }
+ }
+ }
+ }`,
+ { id: productId }
+ );
+
+ return data.product || { id: productId, options: [], variants: { nodes: [] } };
+}
+
+async function syncProductVariants({ client, productId, attrs, desiredVariants, desiredOptions, weightValue }) {
+ let productState = await fetchProductVariantState(client, productId);
+ await ensureProductOptions(client, productId, productState.options || [], desiredOptions);
+ productState = await fetchProductVariantState(client, productId);
+
+ const existingVariants = productState.variants?.nodes || [];
+ const existingByOptions = new Map();
+ for (const variant of existingVariants) {
+ existingByOptions.set(normalizeOptionKey(variant.selectedOptions || []), variant);
+ }
+
+ const variantsToUpdate = [];
+ const variantsToCreate = [];
+
+ desiredVariants.forEach((variant, index) => {
+ const key = normalizeOptionKey(variant.optionValues || []);
+ const existing = existingByOptions.get(key) || (index === 0 ? existingVariants[0] : null);
+ if (existing?.id) {
+ variantsToUpdate.push(buildVariantBulkInput(variant, attrs, weightValue, existing.id));
+ } else {
+ variantsToCreate.push(buildVariantBulkInput(variant, attrs, weightValue));
+ }
+ });
+
+ const syncedVariants = [];
+
+ if (variantsToUpdate.length) {
+ const updated = await gql(
+ client,
+ `mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) {
+ productVariantsBulkUpdate(productId: $productId, variants: $variants) {
+ productVariants { id sku inventoryItem { id sku } }
+ userErrors { field message }
+ }
+ }`,
+ { productId, variants: variantsToUpdate }
+ );
+
+ if (updated.productVariantsBulkUpdate.userErrors?.length) {
+ throw new Error(`variant update failed: ${updated.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`);
+ }
+
+ syncedVariants.push(...(updated.productVariantsBulkUpdate.productVariants || []));
+ }
+
+ if (variantsToCreate.length) {
+ const created = await gql(
+ client,
+ `mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!, $strategy: ProductVariantsBulkCreateStrategy) {
+ productVariantsBulkCreate(productId: $productId, variants: $variants, strategy: $strategy) {
+ productVariants { id sku inventoryItem { id sku } selectedOptions { name value } }
+ userErrors { field message }
+ }
+ }`,
+ { productId, variants: variantsToCreate, strategy: "REMOVE_STANDALONE_VARIANT" }
+ );
+
+ if (created.productVariantsBulkCreate.userErrors?.length) {
+ throw new Error(`variant create failed: ${created.productVariantsBulkCreate.userErrors.map((e) => e.message).join(", ")}`);
+ }
+
+ syncedVariants.push(...(created.productVariantsBulkCreate.productVariants || []));
+ }
+
+ return syncedVariants;
+}
+
async function generateSeo(attrs, seoClient) {
const seoInput = {
product_name: attrs.product_name,
@@ -305,8 +506,6 @@ async function upsertShopifyProductFull({
const existing = existingSearch.products.nodes?.[0];
let productId;
- let variantId;
- let inventoryItemId;
let action;
if (existing) {
@@ -336,8 +535,6 @@ async function upsertShopifyProductFull({
if (errs.length) throw new Error(`productUpdate failed: ${errs.map((e) => e.message).join(", ")}`);
productId = existing.id;
- variantId = existing.variants?.nodes?.[0]?.id;
- inventoryItemId = existing.variants?.nodes?.[0]?.inventoryItem?.id;
action = "updated";
} else {
const crt = await gql(
@@ -370,13 +567,9 @@ async function upsertShopifyProductFull({
if (errs.length) throw new Error(`productCreate failed: ${errs.map((e) => e.message).join(", ")}`);
productId = crt.productCreate.product.id;
- variantId = crt.productCreate.product.variants?.nodes?.[0]?.id;
- inventoryItemId = crt.productCreate.product.variants?.nodes?.[0]?.inventoryItem?.id;
action = "created";
}
- if (!variantId) throw new Error("No variant ID found after upsert.");
-
const { priceType, percentage } = await getPricingConfig(client);
const basePrice = Number(attrs.price || 0);
let finalPrice = basePrice;
@@ -386,37 +579,25 @@ async function upsertShopifyProductFull({
const compareAtPrice = attrs.compare_price != null ? Number(attrs.compare_price) : null;
const weightValue = Number(attrs.dimensions?.[0]?.weight || 0);
-
- const bulk = await gql(
+ const desiredOptions = getDesiredOptions(attrs);
+ const desiredVariants = getDesiredVariants(attrs, finalPrice, compareAtPrice);
+ const syncedVariants = await syncProductVariants({
client,
- `mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) {
- productVariantsBulkUpdate(productId: $productId, variants: $variants) {
- productVariants { id price barcode inventoryItem { id sku } }
- userErrors { field message }
- }
- }`,
- {
- productId,
- variants: [
- {
- id: variantId,
- price: Number(finalPrice).toFixed(2),
- ...(compareAtPrice !== null ? { compareAtPrice: Number(compareAtPrice).toFixed(2) } : {}),
- ...(attrs.barcode ? { barcode: attrs.barcode } : {}),
- inventoryItem: {
- sku: attrs.part_number || "",
- measurement: { weight: { value: weightValue, unit: "POUNDS" } },
- },
- },
- ],
- }
- );
+ productId,
+ attrs,
+ desiredVariants,
+ desiredOptions,
+ weightValue,
+ });
- if (bulk.productVariantsBulkUpdate.userErrors?.length) {
- throw new Error(`variant update failed: ${bulk.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`);
- }
+ const inventoryTargets = syncedVariants
+ .map((variant, index) => ({
+ inventoryItemId: variant.inventoryItem?.id,
+ quantity: Number(desiredVariants[index]?.quantity ?? attrs.total_quantity ?? attrs.quantity ?? 0),
+ }))
+ .filter((item) => item.inventoryItemId);
- if (inventoryItemId) {
+ for (const inventoryTarget of inventoryTargets) {
const inv = await gql(
client,
`mutation($id: ID!, $input: InventoryItemInput!) {
@@ -426,7 +607,7 @@ async function upsertShopifyProductFull({
}
}`,
{
- id: inventoryItemId,
+ id: inventoryTarget.inventoryItemId,
input: {
cost: Number(attrs.purchase_cost || 0),
tracked: true,
@@ -437,9 +618,11 @@ async function upsertShopifyProductFull({
if (inv.inventoryItemUpdate.userErrors?.length) {
throw new Error(`inventoryItemUpdate failed: ${inv.inventoryItemUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
+ }
- if (locationId) {
- try {
+ if (locationId && inventoryTargets.length) {
+ try {
+ for (const inventoryTarget of inventoryTargets) {
const act = await gql(
client,
`mutation($inventoryItemId: ID!, $locationId: ID!) {
@@ -448,49 +631,46 @@ async function upsertShopifyProductFull({
userErrors { field message }
}
}`,
- { inventoryItemId, locationId }
+ { inventoryItemId: inventoryTarget.inventoryItemId, locationId }
);
if (act.inventoryActivate.userErrors?.length) {
throw new Error(`inventoryActivate failed: ${act.inventoryActivate.userErrors.map((e) => e.message).join(", ")}`);
}
-
- const totalQty = Number(attrs.total_quantity ?? attrs.quantity ?? 0);
- const setQty = await gql(
- client,
- `mutation($input: InventorySetQuantitiesInput!) {
- inventorySetQuantities(input: $input) {
- userErrors { field message }
- }
- }`,
- {
- input: {
- name: "available",
- reason: "correction",
- ignoreCompareQuantity: true,
- quantities: [
- {
- inventoryItemId,
- locationId,
- quantity: totalQty,
- compareQuantity: 0,
- },
- ],
- },
- }
- );
-
- if (setQty.inventorySetQuantities.userErrors?.length) {
- throw new Error(`inventorySetQuantities failed: ${setQty.inventorySetQuantities.userErrors.map((e) => e.message).join(", ")}`);
- }
- } catch (inventoryError) {
- console.log(
- `[INVENTORY-WARN] Skipping inventory activation/set for product ${productId}. Reason: ${inventoryError.message}`
- );
}
- } else {
- console.log(`[INVENTORY-WARN] No locationId provided. Skipping inventory activation/set for product ${productId}.`);
+
+ const setQty = await gql(
+ client,
+ `mutation($input: InventorySetQuantitiesInput!) {
+ inventorySetQuantities(input: $input) {
+ userErrors { field message }
+ }
+ }`,
+ {
+ input: {
+ name: "available",
+ reason: "correction",
+ ignoreCompareQuantity: true,
+ quantities: inventoryTargets.map((inventoryTarget) => ({
+ inventoryItemId: inventoryTarget.inventoryItemId,
+ locationId,
+ quantity: inventoryTarget.quantity,
+ compareQuantity: 0,
+ })),
+ },
+ }
+ );
+
+ if (setQty.inventorySetQuantities.userErrors?.length) {
+ throw new Error(`inventorySetQuantities failed: ${setQty.inventorySetQuantities.userErrors.map((e) => e.message).join(", ")}`);
+ }
+ } catch (inventoryError) {
+ console.log(
+ `[INVENTORY-WARN] Skipping inventory activation/set for product ${productId}. Reason: ${inventoryError.message}`
+ );
}
+ } else if (!locationId) {
+ console.log(`[INVENTORY-WARN] No locationId provided. Skipping inventory activation/set for product ${productId}.`);
}
await publishToOnlineStore(client, productId);
@@ -527,8 +707,9 @@ async function upsertShopifyProductFull({
ok: true,
action,
productId,
- variantId,
- inventoryItemId,
+ variantId: syncedVariants[0]?.id || null,
+ inventoryItemId: syncedVariants[0]?.inventoryItem?.id || null,
+ variantsSynced: syncedVariants.length,
handle,
};
}
diff --git a/src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js b/src/business-logic/import-pipeline/shared/watermarkImages.js
similarity index 100%
rename from src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js
rename to src/business-logic/import-pipeline/shared/watermarkImages.js
diff --git a/src/business-logic/import-pipeline/sources/README.md b/src/business-logic/import-pipeline/sources/README.md
new file mode 100644
index 0000000..56ebc13
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/README.md
@@ -0,0 +1,52 @@
+# Import Source Modules
+
+Each client website gets one source folder here. A source owns the parts that change from website to website:
+
+- where scraped data is stored
+- how website data is fetched
+- how raw website records are converted to Shopify-ready products
+- optionally, how image download/upload tasks are discovered if the raw data shape is different
+
+The shared pipeline still handles:
+
+- job progress
+- image watermarking
+- Shopify Files upload state
+- Shopify product upsert
+- logs and summaries
+
+## Folder Shape
+
+Every client should follow the same shape:
+
+```txt
+sources/
+ client-key/
+ index.js source metadata, paths, fetchWebsiteData orchestration
+ scraper.js website/API scraping only
+ converter.js source-normalized JSON to Shopify-ready products
+```
+
+The source registry loads each folder through its `index.js`.
+
+## Source Contract
+
+`index.js` exports an object like this:
+
+```js
+module.exports = {
+ sourceKey: "client-key",
+ label: "Client Name",
+ defaultBrand: "Client Brand",
+ defaultImageBaseUrl: "https://example.com/images",
+ envImageBaseUrl: "CLIENT_IMAGE_BASE_URL",
+ paths,
+ fetchWebsiteData,
+ convertToShopifyProducts,
+ // optional:
+ // downloadImages,
+ // uploadImagesToShopifyFiles,
+};
+```
+
+Register the new source in `sources/index.js`.
diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/converter.js b/src/business-logic/import-pipeline/sources/brocks-performance/converter.js
new file mode 100644
index 0000000..662bda3
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/brocks-performance/converter.js
@@ -0,0 +1,250 @@
+const path = require("node:path");
+
+function slugify(str) {
+ return String(str || "")
+ .trim()
+ .toLowerCase()
+ .replace(/[^a-z0-9]+/g, "-")
+ .replace(/^-+|-+$/g, "");
+}
+
+function escapeHtml(input) {
+ return String(input || "")
+ .replace(/&/g, "&")
+ .replace(//g, ">")
+ .replace(/"/g, """)
+ .replace(/'/g, "'");
+}
+
+function cleanText(value) {
+ return String(value || "")
+ .replace(/\u2026/g, "...")
+ .replace(/\s+\n/g, "\n")
+ .replace(/\n\s+/g, "\n")
+ .replace(/[ \t]+/g, " ")
+ .replace(/\n{3,}/g, "\n\n")
+ .trim();
+}
+
+function getUploadedImageUrl(imgPath, uploadedImageMap) {
+ if (!uploadedImageMap || typeof uploadedImageMap !== "object") return null;
+ const bySourcePath = uploadedImageMap.bySourcePath && typeof uploadedImageMap.bySourcePath === "object"
+ ? uploadedImageMap.bySourcePath
+ : {};
+ return bySourcePath[String(imgPath)]?.url || null;
+}
+
+function getImageFileName(imagePath) {
+ try {
+ return path.basename(new URL(imagePath).pathname);
+ } catch {
+ return path.basename(String(imagePath || "").split(/[?#]/)[0]);
+ }
+}
+
+function descriptionToHtml(description, sections = []) {
+ const blocks = [];
+ const clean = cleanText(description);
+ if (clean) {
+ blocks.push(
+ ...clean
+ .split(/\n{2,}/)
+ .map((block) => `
${escapeHtml(block).replace(/\n/g, "
")}
`)
+ );
+ }
+
+ for (const section of sections) {
+ const title = cleanText(section.title);
+ const paragraphs = Array.isArray(section.paragraphs) ? section.paragraphs.map(cleanText).filter(Boolean) : [];
+ const bullets = Array.isArray(section.bullets) ? section.bullets.map(cleanText).filter(Boolean) : [];
+
+ if (title) {
+ blocks.push(`${escapeHtml(title)}
`);
+ }
+ for (const paragraph of paragraphs) {
+ blocks.push(`${escapeHtml(paragraph)}
`);
+ }
+ if (bullets.length) {
+ blocks.push(`${bullets.map((bullet) => `- ${escapeHtml(bullet)}
`).join("")}
`);
+ }
+ }
+
+ return blocks.join("\n");
+}
+
+function normalizeOptionName(name) {
+ const clean = cleanText(name).replace(/^select\s+/i, "");
+ return clean || "Option";
+}
+
+function getSourceOptions(record, data) {
+ const options = Array.isArray(record?.sourceOptions) ? record.sourceOptions : Array.isArray(data?.options) ? data.options : [];
+ return options
+ .map((option) => {
+ const name = normalizeOptionName(option.name || option.label);
+ const values = Array.isArray(option.values) ? option.values : [];
+ return {
+ name,
+ values: values
+ .map((value) => ({
+ label: cleanText(value.label || value.name),
+ sourceValue: value.value || null,
+ selected: Boolean(value.selected),
+ }))
+ .filter((value) => value.label && !/^choose options$/i.test(value.label)),
+ };
+ })
+ .filter((option) => option.values.length);
+}
+
+function buildOptionCombinations(options) {
+ if (!options.length) {
+ return [];
+ }
+
+ return options.reduce((combinations, option) => {
+ const next = [];
+ for (const combo of combinations.length ? combinations : [[]]) {
+ for (const value of option.values) {
+ next.push([...combo, { name: option.name, value: value.label, sourceValue: value.sourceValue }]);
+ }
+ }
+ return next;
+ }, []);
+}
+
+function buildVariants({ options, baseSku, price, quantity, compareAtPrice = null }) {
+ const combinations = buildOptionCombinations(options);
+ if (!combinations.length) {
+ return [
+ {
+ sku: baseSku,
+ price,
+ compare_price: compareAtPrice,
+ quantity,
+ optionValues: [],
+ },
+ ];
+ }
+
+ return combinations.map((combo, index) => {
+ const suffix = combo
+ .map((item) => slugify(item.value).slice(0, 28))
+ .filter(Boolean)
+ .join("-");
+
+ return {
+ sku: index === 0 ? baseSku : `${baseSku}-${suffix || index + 1}`.slice(0, 64),
+ price,
+ compare_price: compareAtPrice,
+ quantity,
+ optionValues: combo,
+ };
+ });
+}
+
+function convertToShopifyProducts(input, options = {}) {
+ const records = Array.isArray(input?.products) ? input.products : [];
+ const brand = options.brand || "Brock's Performance";
+
+ return records.map((record, index) => {
+ const data = record?.productDetails?.data || {};
+ const sourceRecord = record?.sourceRecord || {};
+ const productId = record?.productId || data.id || slugify(sourceRecord.url || data.name || `brocks-${index + 1}`);
+ const title = cleanText(data.name || sourceRecord.title || `Brock's Performance Product ${index + 1}`);
+ const sku = cleanText(data.code || sourceRecord.sku || productId);
+ const imagePaths = Array.isArray(data.img) ? data.img : [];
+ const sourceOptions = getSourceOptions(record, data);
+ const quantity = Number(data?.stocks?.total_units ?? sourceRecord?.stock?.currentStock ?? 0) || 0;
+ const descriptionHtml = descriptionToHtml(
+ data?.details?.description || sourceRecord.description,
+ data?.details?.descriptionSections || record?.descriptionSections || sourceRecord.descriptionSections || []
+ );
+ const price = Number(data?.cost?.mrp ?? sourceRecord.price ?? 0) || 0;
+ const files = imagePaths
+ .map((imagePath) => ({
+ type: "Image",
+ url: getUploadedImageUrl(imagePath, options.uploadedImageMap) || imagePath,
+ media_content: getImageFileName(imagePath),
+ source_path: imagePath,
+ }))
+ .filter((file) => file.url);
+ const variants = buildVariants({
+ options: sourceOptions,
+ baseSku: sku,
+ price,
+ quantity,
+ });
+
+ const category = "Exhaust";
+ const subcategory = "Titanium Full Systems, Suzuki";
+ const tags = [
+ brand,
+ category,
+ "Titanium Full Systems",
+ "Suzuki",
+ "Motorcycle Exhaust",
+ sku,
+ ].filter(Boolean);
+
+ return {
+ id: productId,
+ source: {
+ productId,
+ sourceKey: "brocks-performance",
+ url: sourceRecord.url || data.url || null,
+ image_paths: imagePaths,
+ },
+ attributes: {
+ product_name: title,
+ brand,
+ category,
+ subcategory,
+ part_number: sku,
+ mfr_part_number: sku,
+ price,
+ compare_price: null,
+ purchase_cost: null,
+ barcode: "",
+ price_group: null,
+ units_per_sku: null,
+ part_description: descriptionHtml,
+ descriptions: descriptionHtml
+ ? [{ type: "Market Description", description: descriptionHtml }]
+ : [],
+ files,
+ image_paths: imagePaths,
+ dimensions: [{ weight: 0 }],
+ total_quantity: quantity,
+ inventorydata: {
+ inventory: {
+ main: quantity,
+ },
+ },
+ options: sourceOptions.map((option) => ({
+ name: option.name,
+ values: option.values.map((value) => value.label),
+ })),
+ variants,
+ fitmentTags: {
+ make: ["Suzuki"],
+ model: [],
+ year: [],
+ drive: [],
+ baseModel: [],
+ },
+ tags,
+ handle: slugify(`${brand}-${title}-${productId}`),
+ source_url: sourceRecord.url || data.url || null,
+ page_meta: sourceRecord.pageMeta || record.pageMeta || {},
+ },
+ };
+ });
+}
+
+module.exports = {
+ cleanText,
+ convertToShopifyProducts,
+ slugify,
+};
diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/index.js b/src/business-logic/import-pipeline/sources/brocks-performance/index.js
new file mode 100644
index 0000000..68a8dc5
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/brocks-performance/index.js
@@ -0,0 +1,215 @@
+const fs = require("node:fs/promises");
+const path = require("node:path");
+const { cleanText, scrapeBrocksPerformanceProducts, TARGET_URL } = require("./scraper");
+const { convertToShopifyProducts, slugify } = require("./converter");
+
+const sourceKey = "brocks-performance";
+const label = "Brock's Performance";
+const dataDir = path.join("data", "sources", sourceKey);
+
+function paths() {
+ return {
+ aggregatedJson: path.join(dataDir, "01_products_aggregated.json"),
+ historyJson: path.join(dataDir, "01_products_run_history.json"),
+ rawJson: path.join(dataDir, "01_raw_scrape.json"),
+ downloadedImagesDir: path.join(dataDir, "02_downloaded_product_images"),
+ watermarkState: path.join(dataDir, "03_watermark_state.json"),
+ imageUploadState: path.join(dataDir, "04_shopify_image_upload_state.json"),
+ uploadedMapJson: path.join(dataDir, "04_shopify_uploaded_images_map.json"),
+ shopifyReadyJson: path.join(dataDir, "05_shopify_products_ready.json"),
+ logsDir: path.join(dataDir, "99_run_logs"),
+ };
+}
+
+function uniqueBy(values, keyFn) {
+ const seen = new Set();
+ const out = [];
+
+ for (const value of values) {
+ const key = keyFn(value);
+ if (!key || seen.has(key)) {
+ continue;
+ }
+ seen.add(key);
+ out.push(value);
+ }
+
+ return out;
+}
+
+function bestImageUrl(image) {
+ if (!image) {
+ return null;
+ }
+
+ if (typeof image === "string") {
+ return image;
+ }
+
+ const variants = Array.isArray(image.variants) ? image.variants.filter(Boolean) : [];
+ return variants[variants.length - 1] || image.src || null;
+}
+
+function getImageList(record) {
+ const candidates = [
+ record.image,
+ ...(Array.isArray(record.images) ? record.images.map(bestImageUrl) : []),
+ ].filter(Boolean);
+
+ return uniqueBy(candidates, (url) => String(url).split("?")[0]);
+}
+
+function normalizeOptions(options = []) {
+ return options
+ .map((option) => {
+ const name = cleanText(option.label) || "Option";
+ const values = Array.isArray(option.values) ? option.values : [];
+ const normalizedValues = values
+ .map((value) => ({
+ label: cleanText(value.label),
+ value: value.value || null,
+ selected: Boolean(value.selected || value.checked),
+ disabled: Boolean(value.disabled),
+ }))
+ .filter((value) => value.label && !/^choose options$/i.test(value.label));
+
+ return {
+ name,
+ required: Boolean(option.required),
+ type: option.type || null,
+ values: uniqueBy(normalizedValues, (value) => value.label.toLowerCase()),
+ };
+ })
+ .filter((option) => option.values.length > 0);
+}
+
+function getStockQuantity(record) {
+ const raw = record?.stock?.currentStock || record?.productInfo?.["Quantity Available"] || "";
+ const numeric = Number.parseInt(String(raw).replace(/[^0-9-]/g, ""), 10);
+ return Number.isFinite(numeric) ? numeric : 0;
+}
+
+function normalizeRecord(record, index) {
+ const title = cleanText(record.title) || `Brock's Performance Product ${index + 1}`;
+ const sku = cleanText(record.sku) || slugify(title);
+ const productId = record.productId || sku || slugify(record.url || title);
+ const imageList = getImageList(record);
+ const price = Number.isFinite(record.price) ? record.price : 0;
+ const description = cleanText(record.description);
+ const options = normalizeOptions(record.options);
+ const stockQuantity = getStockQuantity(record);
+
+ return {
+ productId,
+ sourceKey,
+ sourceRecord: record,
+ sourceOptions: options,
+ sourceImages: Array.isArray(record.images) ? record.images : [],
+ descriptionSections: Array.isArray(record.descriptionSections) ? record.descriptionSections : [],
+ pageMeta: record.pageMeta || {},
+ reviews: record.reviews || {},
+ productSummary: {
+ id: productId,
+ name: title,
+ cost: { mrp: price },
+ img: imageList,
+ sub_category_name: "Titanium Full Systems",
+ model_series_name: "Suzuki",
+ },
+ productDetails: {
+ servercode: "OK",
+ data: {
+ id: productId,
+ code: sku,
+ name: title,
+ category_name: "Exhaust",
+ sub_category_name: "Titanium Full Systems",
+ model_series_name: "Suzuki",
+ cost: { mrp: price },
+ stocks: { total_units: stockQuantity },
+ img: imageList,
+ url: record.url,
+ mpn: record.mpn || null,
+ brand: record.brand || null,
+ availability: record.availability || null,
+ currency: record.currency || null,
+ canonicalUrl: record.canonicalUrl || null,
+ productInfo: record.productInfo || {},
+ options,
+ details: {
+ description,
+ specifications: [
+ { name: "SKU", text: sku },
+ { name: "MPN", text: record.mpn },
+ { name: "Brand", text: record.brand },
+ { name: "Availability", text: record.availability },
+ { name: "Quantity Available", text: record?.stock?.display },
+ { name: "Source URL", text: record.url },
+ ].filter((spec) => spec.text),
+ weight: {},
+ descriptionSections: Array.isArray(record.descriptionSections) ? record.descriptionSections : [],
+ },
+ },
+ },
+ };
+}
+
+async function fetchWebsiteData({ paths: runPaths }) {
+ const targetUrl = process.env.BROCKS_PERFORMANCE_TARGET_URL || TARGET_URL;
+ const rawProducts = await scrapeBrocksPerformanceProducts(targetUrl);
+ const products = rawProducts.map((record, index) => normalizeRecord(record, index));
+ const now = new Date().toISOString();
+
+ const analysis = {
+ timestamp: now,
+ targetUrl,
+ rawProductsFound: rawProducts.length,
+ totalProductsUnique: products.length,
+ detailSuccess: products.length,
+ detailFailed: 0,
+ cachedDetailReused: 0,
+ detailFetchedNow: products.length,
+ };
+
+ const payload = {
+ generatedAt: now,
+ sourceKey,
+ sourceLabel: label,
+ analysis,
+ products,
+ };
+
+ await fs.mkdir(path.dirname(runPaths.aggregatedJson), { recursive: true });
+ await fs.writeFile(runPaths.rawJson, `${JSON.stringify(rawProducts, null, 2)}\n`, "utf8");
+ await fs.writeFile(runPaths.aggregatedJson, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
+
+ let history = [];
+ try {
+ history = JSON.parse(await fs.readFile(runPaths.historyJson, "utf8"));
+ if (!Array.isArray(history)) history = [];
+ } catch {
+ history = [];
+ }
+ history.push(analysis);
+ await fs.writeFile(runPaths.historyJson, `${JSON.stringify(history, null, 2)}\n`, "utf8");
+
+ console.log(`[BROCKS] Saved ${products.length} products to ${runPaths.aggregatedJson}`);
+
+ return {
+ analysis,
+ outputJsonPath: path.resolve(process.cwd(), runPaths.aggregatedJson),
+ rawJsonPath: path.resolve(process.cwd(), runPaths.rawJson),
+ historyPath: path.resolve(process.cwd(), runPaths.historyJson),
+ };
+}
+
+module.exports = {
+ sourceKey,
+ label,
+ defaultBrand: "Brock's Performance",
+ defaultImageBaseUrl: "",
+ envImageBaseUrl: "BROCKS_PERFORMANCE_IMAGE_BASE_URL",
+ paths,
+ fetchWebsiteData,
+ convertToShopifyProducts,
+};
diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js b/src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js
new file mode 100644
index 0000000..0c58abc
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js
@@ -0,0 +1,411 @@
+const { chromium } = require('playwright');
+const fs = require('fs/promises');
+const path = require('path');
+const { URL } = require('url');
+
+const TARGET_URL = 'https://brocksperformance.com/exhausts/titanium-full-systems/suzuki/';
+const OUTPUT_DIR = path.join(__dirname, '..', 'outputs');
+const OUTPUT_FILE = path.join(OUTPUT_DIR, 'brocksperformance-suzuki-exhausts-full-details.json');
+
+function cleanText(value) {
+ return String(value || '')
+ .replace(/\u2026/g, '...')
+ .replace(/\s+\n/g, '\n')
+ .replace(/\n\s+/g, '\n')
+ .replace(/[ \t]+/g, ' ')
+ .replace(/\n{3,}/g, '\n\n')
+ .trim();
+}
+
+function toNumber(value) {
+ if (!value) return null;
+ const cleaned = String(value).replace(/[^0-9.\-]/g, '');
+ const n = Number.parseFloat(cleaned);
+ return Number.isNaN(n) ? null : n;
+}
+
+async function expandLoadMore(page) {
+ // Click any obvious "load more" buttons until they disappear
+ const loadMoreSelectors = ['button.load-more', '.load-more', 'button.show-more', '.show-more'];
+ for (const sel of loadMoreSelectors) {
+ // Try repeatedly because some sites require multiple clicks
+ while (true) {
+ const btn = await page.$(sel);
+ if (!btn) break;
+ try {
+ await Promise.all([page.waitForResponse(r => r.status() === 200, { timeout: 5000 }).catch(() => {}), btn.click()]);
+ await page.waitForTimeout(800);
+ } catch (e) {
+ break;
+ }
+ }
+ }
+}
+
+async function autoScroll(page, timeout = 2000) {
+ const start = Date.now();
+ let lastHeight = await page.evaluate(() => document.body.scrollHeight);
+ while (Date.now() - start < timeout) {
+ await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight));
+ await page.waitForTimeout(300);
+ const newHeight = await page.evaluate(() => document.body.scrollHeight);
+ if (newHeight === lastHeight) break;
+ lastHeight = newHeight;
+ }
+}
+
+async function scrape(targetUrl = TARGET_URL, options = {}) {
+ const outputFile = options.outputFile || OUTPUT_FILE;
+ const writeOutput = options.writeOutput !== false;
+ const onDetailProgress = typeof options.onDetailProgress === 'function' ? options.onDetailProgress : null;
+ const browser = await chromium.launch({ headless: true });
+ const page = await browser.newPage();
+ await page.setExtraHTTPHeaders({ 'accept-language': 'en-US,en;q=0.9' });
+
+ try {
+ await page.goto(targetUrl, { waitUntil: 'networkidle' });
+
+ // Try to load more items (click buttons, then scroll)
+ await expandLoadMore(page);
+ await autoScroll(page, 3000);
+
+ // BigCommerce category pages render product cards as li.product inside .productGrid.
+ const candidateSelector = '.productGrid .product';
+
+ const products = await page.$$eval(candidateSelector, (nodes, baseUrl) => {
+ const siteOrigin = new URL(baseUrl).origin;
+ const results = [];
+ for (const node of nodes) {
+ const anchor = node.querySelector('.card-title a, .card-figure__link[href], a[href]');
+ if (!anchor) continue;
+ const href = anchor.href || (anchor.getAttribute && anchor.getAttribute('href'));
+ if (!href || href.includes('#') || !href.startsWith(siteOrigin)) continue;
+
+ const titleEl = node.querySelector('.card-title a, .card-title, .product-title, .title, h2, h3, .prod-title') || anchor;
+ const title = titleEl ? (titleEl.textContent || '').trim() : '';
+
+ const priceEl = node.querySelector('[data-product-price-without-tax], [data-product-price-with-tax], .price--withoutTax, .price--withTax, .price-final, .product-price');
+ const priceText = priceEl ? (priceEl.textContent || '').trim() : '';
+
+ const imgEl = node.querySelector('img');
+ const image = imgEl ? (imgEl.src || imgEl.getAttribute('data-src') || '') : '';
+
+ const skuEl = node.querySelector('.sku, .product-sku, .prod-sku');
+ const skuText = skuEl ? (skuEl.textContent || '').trim() : (node.textContent || '').match(/SKU:\s*([A-Z0-9-]+)/i)?.[1] || '';
+ const sku = skuText.replace(/^SKU:\s*/i, '').trim();
+
+ const descEl = node.querySelector('.description, .product-description, p');
+ const description = descEl ? (descEl.textContent || '').trim() : '';
+
+ results.push({
+ title: title || null,
+ url: href ? new URL(href, baseUrl).href : null,
+ priceRaw: priceText || null,
+ image: image || null,
+ sku: sku || null,
+ description: description || null,
+ });
+ }
+ return results;
+ }, page.url());
+
+
+ // Deduplicate by URL
+ const map = new Map();
+ for (const p of products) {
+ if (!p.url) continue;
+ if (!map.has(p.url)) map.set(p.url, p);
+ }
+ console.log(`[BROCKS] Found ${map.size} unique product URLs from ${targetUrl}`);
+
+ // Helper to fetch product page details
+ async function fetchProductDetails(productPage, url) {
+ try {
+ await productPage.goto(url, { waitUntil: 'networkidle', timeout: 30000 });
+ return await productPage.evaluate(() => {
+ const clean = (value) => (value || '').replace(/\s+/g, ' ').trim();
+ const toAbsolute = (value) => {
+ if (!value) return null;
+ try {
+ return new URL(value, document.location.href).href;
+ } catch (e) {
+ return value;
+ }
+ };
+ const pick = (selectors) => {
+ for (const s of selectors) {
+ const el = document.querySelector(s);
+ if (el) return el;
+ }
+ return null;
+ };
+ const unique = (values) => [...new Set(values.filter(Boolean))];
+
+ const textFrom = (el) => {
+ if (!el) return null;
+ return clean(el.textContent || el.getAttribute('content') || el.getAttribute('value') || '');
+ };
+ const metaContent = (selector) => {
+ const el = document.querySelector(selector);
+ return el ? el.getAttribute('content') : null;
+ };
+ const extractSrcset = (value) => {
+ if (!value) return [];
+ return value.split(',')
+ .map((part) => part.trim().split(/\s+/)[0])
+ .map(toAbsolute)
+ .filter(Boolean);
+ };
+
+ const priceEl = pick([
+ '[data-product-price-without-tax]',
+ '[data-product-price-with-tax]',
+ '.productView-price .price--withoutTax',
+ '.productView-price .price--withTax',
+ '.productView-price [itemprop="price"]',
+ '[itemprop="price"]',
+ '.price--main',
+ '.price-final',
+ '.product-price'
+ ]);
+ const priceText = textFrom(priceEl);
+ const productId = document.querySelector('input[name="product_id"]')?.value || null;
+ const title = textFrom(pick(['.productView-title', 'h1', '[itemprop="name"]']));
+
+ const skuEl = pick(['[itemprop="sku"]', '.sku', '.productView-sku', '.product-sku', '.prod-sku']);
+ const sku = textFrom(skuEl);
+
+ const brandEl = pick(['[itemprop="brand"] [itemprop="name"]', '.brand-value span', '.brand-value']);
+ const brand = textFrom(brandEl);
+
+ const descEl = pick(['.productView-description', '.product-description', '#product-description', '.description', '.prod-desc']);
+ const description = descEl ? clean(descEl.textContent) : null;
+
+ const imgEl = pick(['.productView-image img', '.product-hero img', '.productImage img', '.primary-image img', 'img']);
+ const image = imgEl ? toAbsolute(imgEl.getAttribute('src') || imgEl.getAttribute('data-src')) : null;
+
+ const productInfo = {};
+ document.querySelectorAll('.productView-info .pdp__info-wrapper > div').forEach((row) => {
+ const key = clean(row.querySelector('dt')?.textContent || '').replace(/:$/, '');
+ const value = clean(row.querySelector('dd')?.textContent || '');
+ if (key && value && !productInfo[key]) productInfo[key] = value;
+ });
+
+ const options = [...document.querySelectorAll('[data-product-option-change] .form-field')].map((field) => {
+ const label = clean(field.querySelector('.ob-option-name, .form-label')?.textContent || '').replace(/\s*\*$/, '').replace(/:$/, '');
+ const select = field.querySelector('select');
+ const inputs = [...field.querySelectorAll('input[type="radio"], input[type="checkbox"]')];
+ return {
+ label: label || null,
+ required: Boolean(field.querySelector('[required]') || clean(field.textContent).includes('*')),
+ type: select ? 'select' : (inputs[0]?.type || field.getAttribute('data-product-attribute') || null),
+ name: select?.name || inputs[0]?.name || null,
+ values: select
+ ? [...select.options].map((option) => ({
+ label: clean(option.textContent),
+ value: option.value || null,
+ selected: option.selected,
+ disabled: option.disabled,
+ })).filter((option) => option.label)
+ : inputs.map((input) => ({
+ label: clean(field.querySelector(`label[for="${input.id}"]`)?.textContent || input.closest('label')?.textContent || input.value),
+ value: input.value || null,
+ checked: input.checked,
+ disabled: input.disabled,
+ })),
+ };
+ });
+
+ const descriptionSections = [...document.querySelectorAll('.productView-description .tabs .tab-title')].map((tab) => {
+ const id = tab.getAttribute('href');
+ const panel = id ? document.querySelector(id) : null;
+ return {
+ title: clean(tab.textContent) || null,
+ text: panel ? clean(panel.textContent) : null,
+ bullets: panel ? [...panel.querySelectorAll('li')].map((li) => clean(li.textContent)).filter(Boolean) : [],
+ paragraphs: panel ? [...panel.querySelectorAll('p')].map((p) => clean(p.textContent)).filter(Boolean) : [],
+ links: panel ? [...panel.querySelectorAll('a[href]')].map((link) => ({
+ text: clean(link.textContent),
+ url: toAbsolute(link.getAttribute('href')),
+ })).filter((link) => link.text || link.url) : [],
+ images: panel ? [...panel.querySelectorAll('img')].map((img) => ({
+ src: toAbsolute(img.getAttribute('src') || img.getAttribute('data-src')),
+ alt: img.getAttribute('alt') || null,
+ title: img.getAttribute('title') || null,
+ })).filter((item) => item.src) : [],
+ tables: panel ? [...panel.querySelectorAll('table')].map((table) =>
+ [...table.querySelectorAll('tr')].map((row) =>
+ [...row.children].map((cell) => clean(cell.textContent))
+ ).filter((row) => row.length)
+ ) : [],
+ };
+ });
+
+ const images = [...document.querySelectorAll('.productView-images img, .productCarousel img')]
+ .flatMap((img) => {
+ const variants = unique([
+ toAbsolute(img.getAttribute('src')),
+ toAbsolute(img.getAttribute('data-src')),
+ ...extractSrcset(img.getAttribute('srcset')),
+ ...extractSrcset(img.getAttribute('data-srcset')),
+ ]);
+ if (!variants.length) return [];
+ return [{
+ src: variants[0],
+ variants,
+ alt: img.getAttribute('alt') || null,
+ title: img.getAttribute('title') || null,
+ }];
+ });
+
+ const reviews = [...document.querySelectorAll('.productReviews-list .productReview')].map((review) => ({
+ rating: Number.parseFloat(review.querySelector('[itemprop="ratingValue"]')?.getAttribute('content') || '') || null,
+ title: clean(review.querySelector('.productReview-title, h5, [itemprop="name"]')?.textContent || ''),
+ author: clean(review.querySelector('[itemprop="author"], .productReview-author')?.textContent || ''),
+ date: review.querySelector('time, [itemprop="datePublished"]')?.getAttribute('datetime') || clean(review.querySelector('time, [itemprop="datePublished"]')?.textContent || ''),
+ body: clean(review.querySelector('[itemprop="reviewBody"], .productReview-body')?.textContent || review.textContent || ''),
+ }));
+
+ return {
+ productId,
+ title,
+ sku,
+ mpn: metaContent('[itemprop="mpn"]'),
+ brand,
+ url: document.location.href,
+ canonicalUrl: document.querySelector('link[rel="canonical"]')?.href || null,
+ priceText,
+ currency: metaContent('meta[property="product:price:currency"]') || metaContent('[itemprop="priceCurrency"]'),
+ availability: clean(document.querySelector('.pdp__info--red')?.textContent || metaContent('meta[property="og:availability"]') || ''),
+ stock: {
+ display: productInfo['Quantity Available'] || null,
+ currentStock: textFrom(document.querySelector('[data-product-stock]')),
+ },
+ brandUrl: document.querySelector('.brand-value a')?.href || null,
+ image,
+ images,
+ description,
+ descriptionSections,
+ productInfo,
+ options,
+ reviews: {
+ ratingValue: Number.parseFloat(metaContent('[itemprop="ratingValue"]') || '') || null,
+ ratingCount: Number.parseInt(metaContent('[itemprop="ratingCount"]') || '', 10) || null,
+ reviewCount: Number.parseInt(metaContent('[itemprop="reviewCount"]') || '', 10) || reviews.length,
+ items: reviews,
+ },
+ pageMeta: {
+ title: document.title,
+ description: metaContent('meta[name="description"]'),
+ keywords: metaContent('meta[name="keywords"]'),
+ ogTitle: metaContent('meta[property="og:title"]'),
+ ogDescription: metaContent('meta[property="og:description"]'),
+ ogImage: metaContent('meta[property="og:image"]'),
+ },
+ };
+ });
+ } catch (e) {
+ return { error: e.message, priceText: null, sku: null, description: null, image: null };
+ }
+ }
+
+ // For detail extraction we'll open a secondary page to navigate product pages
+ const productPage = await browser.newPage();
+ const origin = new URL(targetUrl).origin;
+
+ const final = [];
+ const productItems = Array.from(map.values());
+ for (let index = 0; index < productItems.length; index += 1) {
+ const item = productItems[index];
+ const out = {
+ recordType: 'product',
+ source: 'page-scrape',
+ title: item.title,
+ url: item.url,
+ price: toNumber(item.priceRaw),
+ priceRaw: item.priceRaw,
+ sku: item.sku,
+ image: item.image,
+ description: item.description,
+ };
+
+ // Only attempt to fetch details for same-origin product pages
+ try {
+ const u = new URL(item.url);
+ if (u.origin === origin) {
+ const details = await fetchProductDetails(productPage, item.url);
+ if ((!out.priceRaw || out.priceRaw.trim() === '' || out.price === null) && details.priceText) {
+ out.priceRaw = details.priceText;
+ out.price = toNumber(details.priceText);
+ }
+ if (details.title) out.title = details.title;
+ if (details.sku) out.sku = details.sku;
+ if (details.description) out.description = details.description;
+ if (details.image) out.image = details.image;
+ Object.assign(out, {
+ productId: details.productId || null,
+ mpn: details.mpn || null,
+ brand: details.brand || null,
+ brandUrl: details.brandUrl || null,
+ canonicalUrl: details.canonicalUrl || null,
+ currency: details.currency || null,
+ availability: details.availability || null,
+ stock: details.stock || {},
+ productInfo: details.productInfo || {},
+ options: details.options || [],
+ images: details.images || [],
+ descriptionSections: details.descriptionSections || [],
+ reviews: details.reviews || {},
+ pageMeta: details.pageMeta || {},
+ scrapeError: details.error || null,
+ });
+ }
+ } catch (e) {
+ // ignore malformed URLs
+ }
+
+ final.push(out);
+ if ((index + 1) % 5 === 0 || index === productItems.length - 1) {
+ console.log(`[DETAIL] ${index + 1}/${productItems.length} completed`);
+ if (onDetailProgress) {
+ onDetailProgress({
+ done: index + 1,
+ total: productItems.length,
+ product: out.title || out.sku || out.url || null,
+ });
+ }
+ }
+ }
+
+ await productPage.close();
+
+ if (writeOutput) {
+ await fs.mkdir(path.dirname(outputFile), { recursive: true });
+ await fs.writeFile(outputFile, `${JSON.stringify(final, null, 2)}\n`, 'utf8');
+ console.log(`Saved ${final.length} products to ${outputFile}`);
+ }
+
+ return final;
+ } catch (err) {
+ console.error('Scrape failed:', err);
+ throw err;
+ } finally {
+ await browser.close();
+ }
+}
+
+if (require.main === module) {
+ scrape().catch(() => {
+ process.exitCode = 1;
+ });
+}
+
+module.exports = {
+ TARGET_URL,
+ cleanText,
+ scrape,
+ scrapeBrocksPerformanceProducts(targetUrl = TARGET_URL, options = {}) {
+ return scrape(targetUrl, { ...options, writeOutput: false });
+ },
+ toNumber,
+};
diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/scraper.js b/src/business-logic/import-pipeline/sources/brocks-performance/scraper.js
new file mode 100644
index 0000000..0ac7850
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/brocks-performance/scraper.js
@@ -0,0 +1 @@
+module.exports = require("./scrape_brocks_products");
diff --git a/src/business-logic/import-pipeline/sources/index.js b/src/business-logic/import-pipeline/sources/index.js
new file mode 100644
index 0000000..691b0d1
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/index.js
@@ -0,0 +1,33 @@
+const kyt = require("./kyt");
+const brocksPerformance = require("./brocks-performance");
+
+const sources = {
+ [kyt.sourceKey]: kyt,
+ [brocksPerformance.sourceKey]: brocksPerformance,
+};
+
+function normalizeSourceKey(sourceKey) {
+ return String(sourceKey || "kyt").trim().toLowerCase();
+}
+
+function getSource(sourceKey = "kyt") {
+ const key = normalizeSourceKey(sourceKey);
+ const source = sources[key];
+ if (!source) {
+ const available = Object.keys(sources).join(", ");
+ throw new Error(`Unknown import source "${sourceKey}". Available sources: ${available}`);
+ }
+ return source;
+}
+
+function listSources() {
+ return Object.values(sources).map((source) => ({
+ sourceKey: source.sourceKey,
+ label: source.label,
+ }));
+}
+
+module.exports = {
+ getSource,
+ listSources,
+};
diff --git a/src/business-logic/kyt-pipeline/05_kyt_to_shopify_converter.js b/src/business-logic/import-pipeline/sources/kyt/converter.js
similarity index 100%
rename from src/business-logic/kyt-pipeline/05_kyt_to_shopify_converter.js
rename to src/business-logic/import-pipeline/sources/kyt/converter.js
diff --git a/src/business-logic/import-pipeline/sources/kyt/index.js b/src/business-logic/import-pipeline/sources/kyt/index.js
new file mode 100644
index 0000000..555fcc3
--- /dev/null
+++ b/src/business-logic/import-pipeline/sources/kyt/index.js
@@ -0,0 +1,48 @@
+const path = require("node:path");
+const { getKytIndiaWebsiteData } = require("./scraper");
+const { convertKytJsonToShopifyProducts } = require("./converter");
+
+const sourceKey = "kyt";
+const label = "KYT India";
+const dataDir = path.join("data", "sources", sourceKey);
+
+function paths() {
+ return {
+ aggregatedJson: path.join(dataDir, "01_products_aggregated.json"),
+ historyJson: path.join(dataDir, "01_products_run_history.json"),
+ aggregatedXlsx: path.join(dataDir, "01_products_aggregated.xlsx"),
+ downloadedImagesDir: path.join(dataDir, "02_downloaded_product_images"),
+ watermarkState: path.join(dataDir, "03_watermark_state.json"),
+ imageUploadState: path.join(dataDir, "04_shopify_image_upload_state.json"),
+ uploadedMapJson: path.join(dataDir, "04_shopify_uploaded_images_map.json"),
+ shopifyReadyJson: path.join(dataDir, "05_shopify_products_ready.json"),
+ logsDir: path.join(dataDir, "99_run_logs"),
+ };
+}
+
+async function fetchWebsiteData({ paths: runPaths }) {
+ return getKytIndiaWebsiteData({
+ outputJson: runPaths.aggregatedJson,
+ outputHistoryJson: runPaths.historyJson,
+ outputXlsx: runPaths.aggregatedXlsx,
+ });
+}
+
+function convertToShopifyProducts(input, options = {}) {
+ return convertKytJsonToShopifyProducts(input, {
+ imageBaseUrl: options.imageBaseUrl || "",
+ brand: options.brand || "KYT",
+ uploadedImageMap: options.uploadedImageMap,
+ });
+}
+
+module.exports = {
+ sourceKey,
+ label,
+ defaultBrand: "KYT",
+ defaultImageBaseUrl: "https://kytindia.com/server/public/uploads",
+ envImageBaseUrl: "KYT_IMAGE_BASE_URL",
+ paths,
+ fetchWebsiteData,
+ convertToShopifyProducts,
+};
diff --git a/src/business-logic/kyt-pipeline/01_get_kytindia_website_data.js b/src/business-logic/import-pipeline/sources/kyt/scraper.js
similarity index 96%
rename from src/business-logic/kyt-pipeline/01_get_kytindia_website_data.js
rename to src/business-logic/import-pipeline/sources/kyt/scraper.js
index 1ca784d..2aca0be 100644
--- a/src/business-logic/kyt-pipeline/01_get_kytindia_website_data.js
+++ b/src/business-logic/import-pipeline/sources/kyt/scraper.js
@@ -253,14 +253,18 @@ function writeExcel(filePath, rows, analysis) {
XLSX.writeFile(wb, filePath);
}
-async function getKytIndiaWebsiteData() {
+async function getKytIndiaWebsiteData(options = {}) {
try {
- const outputJsonPath = path.resolve(process.cwd(), OUT_JSON);
+ const outJson = options.outputJson || OUT_JSON;
+ const outHistoryJson = options.outputHistoryJson || OUT_HISTORY_JSON;
+ const outXlsx = options.outputXlsx || OUT_XLSX;
+
+ const outputJsonPath = path.resolve(process.cwd(), outJson);
const outputDir = path.dirname(outputJsonPath);
await fs.mkdir(outputDir, { recursive: true });
const existingAggregated = await readExistingAggregated(outputJsonPath);
console.log(
- `[CACHE] Loaded ${existingAggregated.products.length} existing products from ${OUT_JSON}` +
+ `[CACHE] Loaded ${existingAggregated.products.length} existing products from ${outJson}` +
(existingAggregated.generatedAt ? ` (generatedAt: ${existingAggregated.generatedAt})` : "")
);
@@ -438,14 +442,16 @@ async function getKytIndiaWebsiteData() {
console.log("[4/5] Writing JSON outputs...");
await fs.writeFile(outputJsonPath, JSON.stringify(finalPayload, null, 2), "utf8");
- const historyPath = path.resolve(process.cwd(), OUT_HISTORY_JSON);
+ const historyPath = path.resolve(process.cwd(), outHistoryJson);
+ await fs.mkdir(path.dirname(historyPath), { recursive: true });
const history = await readHistory(historyPath);
history.push(analysis);
await fs.writeFile(historyPath, JSON.stringify(history, null, 2), "utf8");
console.log("[5/5] Writing Excel output...");
const excelRows = buildExcelRows(detailedProducts);
- const excelPath = path.resolve(process.cwd(), OUT_XLSX);
+ const excelPath = path.resolve(process.cwd(), outXlsx);
+ await fs.mkdir(path.dirname(excelPath), { recursive: true });
writeExcel(excelPath, excelRows, analysis);
console.log("\n=== FINAL ANALYSIS ===");
diff --git a/src/business-logic/kyt-pipeline/00_index.js b/src/business-logic/kyt-pipeline/00_index.js
deleted file mode 100644
index 0571755..0000000
--- a/src/business-logic/kyt-pipeline/00_index.js
+++ /dev/null
@@ -1,540 +0,0 @@
-const { getKytIndiaWebsiteData } = require("./01_get_kytindia_website_data");
-const { downloadProductImagesFromAggregatedJson } = require("./02_download_product_images");
-const { applyWatermarkToDownloadedImages } = require("./03_watermark_downloaded_images");
-const fs = require("node:fs/promises");
-const path = require("node:path");
-const fsSync = require("node:fs");
-const { convertKytJsonToShopifyProducts } = require("./05_kyt_to_shopify_converter");
-const { upsertShopifyProductFull } = require("./06_shopify_product_upsert");
-const { uploadKytWatermarkedImagesToShopifyFiles } = require("./04_shopify_image_file_uploader");
-
-const DEFAULT_AGGREGATED_JSON = "data/01_products_aggregated.json";
-const DEFAULT_SHOPIFY_READY_JSON = "data/05_shopify_products_ready.json";
-const DEFAULT_UPLOADED_MAP_JSON = "data/04_shopify_uploaded_images_map.json";
-const DEFAULT_LOGS_DIR = "data/99_run_logs";
-
-function nowIsoLocal() {
- const d = new Date();
- return d.toISOString();
-}
-
-function initRunLogger(logsDir = DEFAULT_LOGS_DIR) {
- const absLogsDir = path.resolve(process.cwd(), logsDir);
- fsSync.mkdirSync(absLogsDir, { recursive: true });
-
- const stamp = new Date().toISOString().replace(/[:.]/g, "-");
- const logPath = path.join(absLogsDir, `${stamp}.log`);
- const stream = fsSync.createWriteStream(logPath, { flags: "a" });
-
- const original = {
- log: console.log.bind(console),
- info: console.info.bind(console),
- warn: console.warn.bind(console),
- error: console.error.bind(console)
- };
-
- function writeLine(level, args) {
- const text = args.map((a) => {
- if (typeof a === "string") return a;
- try {
- return JSON.stringify(a);
- } catch {
- return String(a);
- }
- }).join(" ");
- stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`);
- }
-
- console.log = (...args) => {
- writeLine("LOG", args);
- original.log(...args);
- };
- console.info = (...args) => {
- writeLine("INFO", args);
- original.info(...args);
- };
- console.warn = (...args) => {
- writeLine("WARN", args);
- original.warn(...args);
- };
- console.error = (...args) => {
- writeLine("ERROR", args);
- original.error(...args);
- };
-
- console.log(`[RUN-LOG] Writing logs to ${logPath}`);
- return { logPath };
-}
-
-function loadDotEnvFile(filePath = ".env") {
- const abs = path.resolve(process.cwd(), filePath);
- if (!fsSync.existsSync(abs)) return;
-
- const raw = fsSync.readFileSync(abs, "utf8");
- const lines = raw.split(/\r?\n/);
- for (const line of lines) {
- const trimmed = line.trim();
- if (!trimmed || trimmed.startsWith("#")) continue;
-
- const eq = trimmed.indexOf("=");
- if (eq <= 0) continue;
-
- const key = trimmed.slice(0, eq).trim();
- const value = trimmed.slice(eq + 1).trim();
- if (key && process.env[key] == null) {
- process.env[key] = value;
- }
- }
-}
-
-loadDotEnvFile(".env");
-const RUN_LOGGER = initRunLogger(DEFAULT_LOGS_DIR);
-
-function parseCliArgs(argv = process.argv.slice(2)) {
- const out = {
- limit: null
- };
-
- for (let i = 0; i < argv.length; i += 1) {
- const a = argv[i];
- if ((a === "--limit" || a === "-n") && argv[i + 1]) {
- const n = Number.parseInt(argv[i + 1], 10);
- if (Number.isFinite(n) && n > 0) out.limit = n;
- }
- }
-
- return out;
-}
-
-async function buildLimitedAggregatedJson(inputPath, limit) {
- const absInputPath = path.resolve(process.cwd(), inputPath);
- const raw = await fs.readFile(absInputPath, "utf8");
- const parsed = JSON.parse(raw);
- const products = Array.isArray(parsed?.products) ? parsed.products : [];
- const limited = products.slice(0, limit);
-
- const outPath = path.resolve(process.cwd(), `data/01_products_aggregated.limit_${limit}.json`);
- const payload = {
- ...parsed,
- generatedAt: new Date().toISOString(),
- products: limited,
- analysis: {
- ...(parsed.analysis || {}),
- totalProductsUnique: limited.length,
- limitedFrom: products.length,
- limitApplied: limit
- }
- };
-
- await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8");
- return {
- inputPath: absInputPath,
- outputPath: outPath,
- totalBefore: products.length,
- totalAfter: limited.length
- };
-}
-
-function readBooleanEnv(name, fallback = false) {
- const val = process.env[name];
- if (val == null) return fallback;
- return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase());
-}
-
-function logStageSummary(stepKey, summary = {}) {
- const parts = Object.entries(summary)
- .filter(([, value]) => value !== undefined && value !== null && value !== "")
- .map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`);
-
- console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`);
-}
-
-async function convertAggregatedToShopifyReady({
- inputPath = DEFAULT_AGGREGATED_JSON,
- outputPath = DEFAULT_SHOPIFY_READY_JSON,
- uploadedImageMapPath = DEFAULT_UPLOADED_MAP_JSON,
- imageBaseUrl = process.env.KYT_IMAGE_BASE_URL || "",
- brand = process.env.SHOPIFY_BRAND || "KYT"
-} = {}) {
- const absInputPath = path.resolve(process.cwd(), inputPath);
- const absOutputPath = path.resolve(process.cwd(), outputPath);
- const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath);
-
- const raw = await fs.readFile(absInputPath, "utf8");
- const parsed = JSON.parse(raw);
- let uploadedImageMap = null;
- try {
- const mapRaw = await fs.readFile(absUploadedMapPath, "utf8");
- uploadedImageMap = JSON.parse(mapRaw);
- } catch {
- uploadedImageMap = null;
- }
-
- const convertedProducts = convertKytJsonToShopifyProducts(parsed, {
- imageBaseUrl,
- brand,
- uploadedImageMap
- });
-
- const payload = {
- generatedAt: new Date().toISOString(),
- sourceFile: absInputPath,
- totalProducts: convertedProducts.length,
- products: convertedProducts
- };
-
- await fs.mkdir(path.dirname(absOutputPath), { recursive: true });
- await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8");
-
- return {
- inputPath: absInputPath,
- outputPath: absOutputPath,
- uploadedImageMapPath: absUploadedMapPath,
- totalProducts: convertedProducts.length
- };
-}
-
-async function upsertShopifyProductsFromConverted({
- convertedPath = DEFAULT_SHOPIFY_READY_JSON,
- shop = process.env.SHOPIFY_SHOP,
- accessToken = process.env.SHOPIFY_ACCESS_TOKEN,
- locationId = process.env.SHOPIFY_LOCATION_ID || null,
- enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
- apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10"
-} = {}) {
- if (!shop) {
- throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage.");
- }
- if (!accessToken) {
- throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage.");
- }
-
- const absConvertedPath = path.resolve(process.cwd(), convertedPath);
- const raw = await fs.readFile(absConvertedPath, "utf8");
- const parsed = JSON.parse(raw);
- const products = Array.isArray(parsed?.products) ? parsed.products : [];
-
- const startedAtMs = Date.now();
- const metrics = {
- sourcePath: absConvertedPath,
- total: products.length,
- processed: 0,
- created: 0,
- updated: 0,
- failed: 0,
- errors: [],
- startedAt: new Date(startedAtMs).toISOString(),
- finishedAt: null,
- durationSeconds: 0,
- successRate: 0
- };
-
- for (let i = 0; i < products.length; i += 1) {
- const product = products[i];
- const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`;
-
- try {
- const result = await upsertShopifyProductFull({
- shop,
- accessToken,
- product,
- locationId,
- enableSeo,
- apiVersion
- });
-
- metrics.processed += 1;
- if (result.action === "created") metrics.created += 1;
- if (result.action === "updated") metrics.updated += 1;
-
- if ((i + 1) % 10 === 0 || i === products.length - 1) {
- console.log(
- `[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}`
- );
- }
- } catch (error) {
- metrics.processed += 1;
- metrics.failed += 1;
- metrics.errors.push({
- index: i + 1,
- product: label,
- error: error.message
- });
- console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`);
- }
- }
-
- const endedAtMs = Date.now();
- metrics.finishedAt = new Date(endedAtMs).toISOString();
- metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2));
- metrics.successRate = metrics.total > 0
- ? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2))
- : 0;
-
- return metrics;
-}
-
-async function runFullKytPipeline(options = {}) {
- const cli = parseCliArgs();
- const onProgress = typeof options.onProgress === "function" ? options.onProgress : null;
- const emitProgress = (stepIndex, stepKey, message) => {
- if (onProgress) {
- onProgress({
- stepIndex,
- totalSteps: 6,
- stepKey,
- message
- });
- }
- };
-
- emitProgress(1, "fetchWebsiteData", "Fetching KYT website data");
- console.log("[PIPELINE 1/6] Fetching KYT website data...");
- const dataSummary = await getKytIndiaWebsiteData();
-
- let aggregatedPathForRun = DEFAULT_AGGREGATED_JSON;
- let imagesDirForRun = "data/02_downloaded_product_images";
- let limitSummary = null;
-
- if (cli.limit) {
- limitSummary = await buildLimitedAggregatedJson(DEFAULT_AGGREGATED_JSON, cli.limit);
- aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/");
- imagesDirForRun = `data/02_downloaded_product_images.limit_${cli.limit}`;
- console.log(
- `[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}`
- );
- }
-
- emitProgress(2, "downloadImages", "Downloading product images");
- console.log("\n[PIPELINE 2/6] Downloading product images...");
- const downloadSummary = await downloadProductImagesFromAggregatedJson({
- jsonPath: aggregatedPathForRun,
- outputDir: imagesDirForRun,
- concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8)
- });
- logStageSummary("downloadImages", {
- total: downloadSummary?.totalImagesFound ?? 0,
- downloaded: downloadSummary?.downloaded ?? 0,
- skipped: downloadSummary?.skipped ?? 0,
- failed: downloadSummary?.failed ?? 0,
- products: downloadSummary?.productsCount ?? 0
- });
-
- emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
- console.log("\n[PIPELINE 3/6] Applying watermark in-place...");
- const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png";
- let watermarkSummary;
- const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun);
- if (!fsSync.existsSync(absWatermarkPath)) {
- watermarkSummary = {
- skipped: true,
- reason: `Watermark file not found: ${absWatermarkPath}`,
- imagesDir: path.resolve(process.cwd(), imagesDirForRun),
- watermarkPath: absWatermarkPath,
- totalImagesFound: 0,
- processed: 0,
- skippedCount: 0,
- failed: 0
- };
- console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`);
- } else {
- const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false);
- try {
- watermarkSummary = await applyWatermarkToDownloadedImages({
- imagesDir: imagesDirForRun,
- watermarkPath: watermarkPathForRun,
- concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4)
- });
- } catch (error) {
- if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) {
- watermarkSummary = {
- skipped: true,
- reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`,
- imagesDir: path.resolve(process.cwd(), imagesDirForRun),
- watermarkPath: absWatermarkPath,
- totalImagesFound: 0,
- processed: 0,
- skippedCount: 0,
- failed: 0
- };
- console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`);
- } else {
- throw error;
- }
- }
- }
- logStageSummary("watermarkImages", {
- total: watermarkSummary?.totalImagesFound ?? 0,
- processed: watermarkSummary?.processed ?? 0,
- skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0,
- failed: watermarkSummary?.failed ?? 0,
- concurrency: watermarkSummary?.concurrency ?? ""
- });
-
- emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
- console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
- const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true);
- const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false);
- let imageUploadSummary;
-
- if (!imageUploadEnabled) {
- imageUploadSummary = {
- skipped: true,
- reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false",
- totalTasks: 0,
- processed: 0,
- uploaded: 0,
- failed: 0
- };
- console.log("[PIPELINE 4/6] Image upload stage skipped by config.");
- } else {
- try {
- imageUploadSummary = await uploadKytWatermarkedImagesToShopifyFiles({
- shop: process.env.SHOPIFY_SHOP,
- accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
- apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
- aggregatedJsonPath: aggregatedPathForRun,
- imagesDir: imagesDirForRun,
- statePath: "data/04_shopify_image_upload_state.json",
- mapPath: DEFAULT_UPLOADED_MAP_JSON,
- concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3)
- });
- } catch (error) {
- const message = String(error?.message || "Unknown image upload error");
- imageUploadSummary = {
- skipped: true,
- reason: message,
- totalTasks: 0,
- processed: 0,
- uploaded: 0,
- failed: 0
- };
-
- console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`);
- if (imageUploadRequired) {
- throw error;
- }
- console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false).");
- }
- }
- logStageSummary("uploadImagesToShopifyFiles", {
- total: imageUploadSummary?.totalTasks ?? 0,
- processed: imageUploadSummary?.processed ?? 0,
- uploaded: imageUploadSummary?.uploaded ?? 0,
- skipped: imageUploadSummary?.skipped ?? 0,
- failed: imageUploadSummary?.failed ?? 0,
- concurrency: imageUploadSummary?.concurrency ?? ""
- });
-
- emitProgress(5, "convertToShopifyReady", "Converting KYT data to Shopify-ready products");
- console.log("\n[PIPELINE 5/6] Converting KYT data to Shopify-ready products...");
- const conversionSummary = await convertAggregatedToShopifyReady({
- inputPath: aggregatedPathForRun,
- outputPath: DEFAULT_SHOPIFY_READY_JSON,
- uploadedImageMapPath: DEFAULT_UPLOADED_MAP_JSON,
- imageBaseUrl: process.env.KYT_IMAGE_BASE_URL || "",
- brand: process.env.SHOPIFY_BRAND || "KYT"
- });
- logStageSummary("convertToShopifyReady", {
- total: conversionSummary?.totalProducts ?? 0
- });
-
- emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
- console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
- const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({
- convertedPath: DEFAULT_SHOPIFY_READY_JSON,
- shop: process.env.SHOPIFY_SHOP,
- accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
- locationId: process.env.SHOPIFY_LOCATION_ID || null,
- enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
- apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10"
- });
- logStageSummary("upsertToShopify", {
- total: shopifyUpsertSummary?.total ?? 0,
- processed: shopifyUpsertSummary?.processed ?? 0,
- created: shopifyUpsertSummary?.created ?? 0,
- updated: shopifyUpsertSummary?.updated ?? 0,
- failed: shopifyUpsertSummary?.failed ?? 0,
- successRate: shopifyUpsertSummary?.successRate ?? 0
- });
-
- const summary = {
- completedAt: new Date().toISOString(),
- runLogPath: RUN_LOGGER.logPath,
- limit: cli.limit || null,
- limitSummary,
- steps: {
- fetchWebsiteData: dataSummary,
- downloadImages: downloadSummary,
- watermarkImages: watermarkSummary,
- uploadImagesToShopifyFiles: imageUploadSummary,
- convertToShopifyReady: conversionSummary,
- upsertToShopify: shopifyUpsertSummary
- },
- upcomingSteps: []
- };
-
- emitProgress(6, "completed", "KYT pipeline completed");
- console.log("\n=== FULL PIPELINE SUMMARY ===");
- console.log(JSON.stringify(summary, null, 2));
- console.log("\n=== SUMMARY TABLE ===");
- console.table([
- {
- step: "fetchWebsiteData",
- total: dataSummary?.analysis?.totalProductsUnique ?? "",
- processed: dataSummary?.analysis?.detailFetchedNow ?? "",
- skipped: dataSummary?.analysis?.cachedDetailReused ?? "",
- failed: dataSummary?.analysis?.detailFailed ?? ""
- },
- {
- step: "downloadImages",
- total: downloadSummary?.totalImagesFound ?? "",
- processed: downloadSummary?.downloaded ?? "",
- skipped: downloadSummary?.skipped ?? "",
- failed: downloadSummary?.failed ?? ""
- },
- {
- step: "watermarkImages",
- total: watermarkSummary?.totalImagesFound ?? "",
- processed: watermarkSummary?.processed ?? "",
- skipped: watermarkSummary?.skipped ?? "",
- failed: watermarkSummary?.failed ?? ""
- },
- {
- step: "uploadImagesToShopifyFiles",
- total: imageUploadSummary?.totalTasks ?? "",
- processed: imageUploadSummary?.processed ?? "",
- skipped: imageUploadSummary?.skipped ?? "",
- failed: imageUploadSummary?.failed ?? ""
- },
- {
- step: "convertToShopifyReady",
- total: conversionSummary?.totalProducts ?? "",
- processed: conversionSummary?.totalProducts ?? "",
- skipped: "",
- failed: ""
- },
- {
- step: "upsertToShopify",
- total: shopifyUpsertSummary?.total ?? "",
- processed: shopifyUpsertSummary?.processed ?? "",
- skipped: "",
- failed: shopifyUpsertSummary?.failed ?? ""
- }
- ]);
-
- return summary;
-}
-
-module.exports = {
- runFullKytPipeline,
- convertAggregatedToShopifyReady,
- upsertShopifyProductsFromConverted
-};
-
-if (require.main === module) {
- runFullKytPipeline().catch((error) => {
- console.error("Full pipeline failed:", error.message);
- process.exitCode = 1;
- });
-}
diff --git a/src/pipelineJobs.js b/src/pipelineJobs.js
index a6e6bb8..2b1a1fb 100644
--- a/src/pipelineJobs.js
+++ b/src/pipelineJobs.js
@@ -10,17 +10,26 @@ function getJob(jobId) {
return jobs[jobId] || null;
}
-function canStartJob(shop) {
+function getJobIdForPayload(payload = {}) {
+ const shop = String(payload.shop || "").trim();
+ const source = String(payload.source || "kyt").trim().toLowerCase();
+ if (!shop) {
+ return "";
+ }
+ return source === "kyt" ? shop : `${shop}::${source}`;
+}
+
+function canStartJob(shop, source = "kyt") {
if (!shop) {
return false;
}
- const existing = jobs[shop];
+ const existing = jobs[getJobIdForPayload({ shop, source })];
return !existing || existing.status === "done" || existing.status === "error";
}
function createJob(payload = {}) {
- const id = String(payload.shop || "").trim();
+ const id = getJobIdForPayload(payload);
if (!id) {
throw new Error("Shop is required to create a job.");
}
@@ -94,4 +103,5 @@ module.exports = {
createJob,
updateJob,
appendJobLog,
+ getJobIdForPayload,
};
diff --git a/src/runKytPipelineJob.js b/src/runKytPipelineJob.js
index bc34969..ff6b460 100644
--- a/src/runKytPipelineJob.js
+++ b/src/runKytPipelineJob.js
@@ -1,7 +1,6 @@
-const path = require("node:path");
const { getToken } = require("../tokenStore");
const { log } = require("../logger");
-const { runFullKytPipeline } = require("./business-logic/kyt-pipeline/00_index");
+const { runFullSourcePipeline } = require("./business-logic/import-pipeline/runSourcePipeline");
const { updateJob, appendJobLog } = require("./pipelineJobs");
function stringifyLogArgs(args) {
@@ -189,7 +188,7 @@ function deriveLiveStats(job, line) {
}
async function runKytPipelineJob(job) {
- const { shop, limit } = job.payload || {};
+ const { shop, limit, source = "kyt" } = job.payload || {};
const tokenRecord = getToken(shop);
if (!tokenRecord) {
@@ -223,7 +222,7 @@ async function runKytPipelineJob(job) {
updateJob(job.id, {
status: "running",
step: "starting",
- detail: `Starting KYT pipeline for ${shop}`,
+ detail: `Starting ${source} import pipeline for ${shop}`,
});
const originalConsole = {
@@ -247,7 +246,10 @@ async function runKytPipelineJob(job) {
console.error = capture("error");
try {
- const summary = await runFullKytPipeline({
+ const summary = await runFullSourcePipeline({
+ sourceKey: source,
+ limit,
+ argv: [],
onProgress(progress) {
updateJob(job.id, {
status: "running",
@@ -267,7 +269,7 @@ async function runKytPipelineJob(job) {
detail: "Pipeline completed successfully",
summary,
});
- log(shop, `KYT pipeline completed for job ${job.id}`);
+ log(shop, `${source} import pipeline completed for job ${job.id}`);
} finally {
console.log = originalConsole.log;
console.info = originalConsole.info;
@@ -280,7 +282,7 @@ async function runKytPipelineJob(job) {
error: error.message,
detail: `Pipeline failed: ${error.message}`,
});
- log(shop, `KYT pipeline failed for job ${job.id}: ${error.message}`);
+ log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`);
} finally {
process.argv = originalArgv;
process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP;