From 68949f124e1e7e02e64e740be710f638fb5c3d48 Mon Sep 17 00:00:00 2001 From: MOHAN Date: Thu, 14 May 2026 23:57:27 +0530 Subject: [PATCH] Add multi-source import pipeline --- .gitignore | 9 +- package-lock.json | 45 ++ package.json | 1 + routes/pipeline.js | 23 +- .../_backup/kyt-pipeline/00_index.js | 26 + .../01_get_kytindia_website_data.js | 7 + .../02_download_product_images.js | 13 + .../03_watermark_downloaded_images.js | 13 + .../04_shopify_image_file_uploader.js | 8 + .../05_kyt_to_shopify_converter.js | 1 + .../kyt-pipeline/06_shopify_product_upsert.js | 8 + .../import-pipeline/runSourcePipeline.js | 533 +++++++++++++++++ .../shared/downloadProductImages.js} | 38 +- .../shared/shopifyImageUploader.js} | 14 +- .../shared/shopifyProductUpsert.js} | 335 ++++++++--- .../shared/watermarkImages.js} | 0 .../import-pipeline/sources/README.md | 52 ++ .../sources/brocks-performance/converter.js | 250 ++++++++ .../sources/brocks-performance/index.js | 215 +++++++ .../scrape_brocks_products.js | 411 +++++++++++++ .../sources/brocks-performance/scraper.js | 1 + .../import-pipeline/sources/index.js | 33 ++ .../sources/kyt/converter.js} | 0 .../import-pipeline/sources/kyt/index.js | 48 ++ .../sources/kyt/scraper.js} | 16 +- src/business-logic/kyt-pipeline/00_index.js | 540 ------------------ src/pipelineJobs.js | 16 +- src/runKytPipelineJob.js | 16 +- 28 files changed, 2025 insertions(+), 647 deletions(-) create mode 100644 src/business-logic/_backup/kyt-pipeline/00_index.js create mode 100644 src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js create mode 100644 src/business-logic/_backup/kyt-pipeline/02_download_product_images.js create mode 100644 src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js create mode 100644 src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js create mode 100644 src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js create mode 100644 src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js create mode 100644 src/business-logic/import-pipeline/runSourcePipeline.js rename src/business-logic/{kyt-pipeline/02_download_product_images.js => import-pipeline/shared/downloadProductImages.js} (85%) rename src/business-logic/{kyt-pipeline/04_shopify_image_file_uploader.js => import-pipeline/shared/shopifyImageUploader.js} (97%) rename src/business-logic/{kyt-pipeline/06_shopify_product_upsert.js => import-pipeline/shared/shopifyProductUpsert.js} (61%) rename src/business-logic/{kyt-pipeline/03_watermark_downloaded_images.js => import-pipeline/shared/watermarkImages.js} (100%) create mode 100644 src/business-logic/import-pipeline/sources/README.md create mode 100644 src/business-logic/import-pipeline/sources/brocks-performance/converter.js create mode 100644 src/business-logic/import-pipeline/sources/brocks-performance/index.js create mode 100644 src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js create mode 100644 src/business-logic/import-pipeline/sources/brocks-performance/scraper.js create mode 100644 src/business-logic/import-pipeline/sources/index.js rename src/business-logic/{kyt-pipeline/05_kyt_to_shopify_converter.js => import-pipeline/sources/kyt/converter.js} (100%) create mode 100644 src/business-logic/import-pipeline/sources/kyt/index.js rename src/business-logic/{kyt-pipeline/01_get_kytindia_website_data.js => import-pipeline/sources/kyt/scraper.js} (96%) delete mode 100644 src/business-logic/kyt-pipeline/00_index.js diff --git a/.gitignore b/.gitignore index 477123c..25cedfd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -@" node_modules/ .env .env.* @@ -6,6 +5,13 @@ dist/ build/ .next/ coverage/ + +# Runtime/generated data +data/ +logs/ +*.log +src/business-logic/import-pipeline/sources/outputs/ + npm-debug.log* yarn-debug.log* yarn-error.log* @@ -13,4 +19,3 @@ yarn-error.log* Thumbs.db .vscode/ .idea/ -"@ | Out-File -Encoding utf8 .gitignore \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index c854b53..16af9e7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "cors": "^2.8.5", "dotenv": "^17.2.0", "express": "^5.1.0", + "playwright": "^1.59.1", "sharp": "^0.33.5", "uuid": "^11.1.0", "xlsx": "^0.18.5" @@ -926,6 +927,20 @@ "node": ">= 0.8" } }, + "node_modules/fsevents": { + "version": "2.3.2", + "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.2.tgz", + "integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==", + "hasInstallScript": true, + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": "^8.16.0 || ^10.6.0 || >=11.0.0" + } + }, "node_modules/function-bind": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.2.tgz", @@ -1217,6 +1232,36 @@ "url": "https://opencollective.com/express" } }, + "node_modules/playwright": { + "version": "1.59.1", + "resolved": "https://registry.npmjs.org/playwright/-/playwright-1.59.1.tgz", + "integrity": "sha512-C8oWjPR3F81yljW9o5OxcWzfh6avkVwDD2VYdwIGqTkl+OGFISgypqzfu7dOe4QNLL2aqcWBmI3PMtLIK233lw==", + "license": "Apache-2.0", + "dependencies": { + "playwright-core": "1.59.1" + }, + "bin": { + "playwright": "cli.js" + }, + "engines": { + "node": ">=18" + }, + "optionalDependencies": { + "fsevents": "2.3.2" + } + }, + "node_modules/playwright-core": { + "version": "1.59.1", + "resolved": "https://registry.npmjs.org/playwright-core/-/playwright-core-1.59.1.tgz", + "integrity": "sha512-HBV/RJg81z5BiiZ9yPzIiClYV/QMsDCKUyogwH9p3MCP6IYjUFu/MActgYAvK0oWyV9NlwM3GLBjADyWgydVyg==", + "license": "Apache-2.0", + "bin": { + "playwright-core": "cli.js" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", diff --git a/package.json b/package.json index 58c1cef..d392530 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "cors": "^2.8.5", "dotenv": "^17.2.0", "express": "^5.1.0", + "playwright": "^1.59.1", "sharp": "^0.33.5", "uuid": "^11.1.0", "xlsx": "^0.18.5" diff --git a/routes/pipeline.js b/routes/pipeline.js index c0998f8..c12508f 100644 --- a/routes/pipeline.js +++ b/routes/pipeline.js @@ -2,29 +2,37 @@ const express = require("express"); const { createJob, updateJob, getJob, listJobs, canStartJob } = require("../src/pipelineJobs"); const { runKytPipelineJob } = require("../src/runKytPipelineJob"); const { getToken } = require("../tokenStore"); +const { getSource, listSources } = require("../src/business-logic/import-pipeline/sources"); const router = express.Router(); router.post("/run", async (req, res) => { - const { shop, limit = null } = req.body || {}; + const { shop, limit = null, source = "kyt" } = req.body || {}; if (!shop) { return res.status(400).json({ error: "Missing shop." }); } + let sourceConfig; + try { + sourceConfig = getSource(source); + } catch (error) { + return res.status(400).json({ error: error.message }); + } + if (!getToken(shop)) { return res.status(400).json({ error: "No stored Shopify token found for this shop. Complete app auth first." }); } - if (!canStartJob(shop)) { - return res.status(409).json({ error: `A KYT pipeline job is already running for ${shop}.` }); + if (!canStartJob(shop, sourceConfig.sourceKey)) { + return res.status(409).json({ error: `An import job is already running for ${shop} and source ${sourceConfig.sourceKey}.` }); } - const job = createJob({ shop, limit }); + const job = createJob({ shop, limit, source: sourceConfig.sourceKey }); updateJob(job.id, { status: "queued", step: "queued", - detail: "Job queued", + detail: `${sourceConfig.label} import job queued`, }); setImmediate(() => { @@ -35,10 +43,15 @@ router.post("/run", async (req, res) => { jobId: job.id, status: "queued", shop, + source: sourceConfig.sourceKey, limit, }); }); +router.get("/sources", (req, res) => { + return res.json({ sources: listSources() }); +}); + router.get("/status/:jobId", (req, res) => { const job = getJob(req.params.jobId); if (!job) { diff --git a/src/business-logic/_backup/kyt-pipeline/00_index.js b/src/business-logic/_backup/kyt-pipeline/00_index.js new file mode 100644 index 0000000..5951e65 --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/00_index.js @@ -0,0 +1,26 @@ +const { + runFullSourcePipeline, + convertAggregatedToShopifyReady, + upsertShopifyProductsFromConverted, +} = require("../import-pipeline/runSourcePipeline"); + +function runFullKytPipeline(options = {}) { + return runFullSourcePipeline({ + ...options, + sourceKey: "kyt", + }); +} + +module.exports = { + runFullKytPipeline, + runFullSourcePipeline, + convertAggregatedToShopifyReady, + upsertShopifyProductsFromConverted, +}; + +if (require.main === module) { + runFullKytPipeline().catch((error) => { + console.error("Full pipeline failed:", error.message); + process.exitCode = 1; + }); +} diff --git a/src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js b/src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js new file mode 100644 index 0000000..2c42565 --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/01_get_kytindia_website_data.js @@ -0,0 +1,7 @@ +module.exports = require("../import-pipeline/sources/kyt/scraper"); + +if (require.main === module) { + module.exports.getKytIndiaWebsiteData().catch(() => { + process.exitCode = 1; + }); +} diff --git a/src/business-logic/_backup/kyt-pipeline/02_download_product_images.js b/src/business-logic/_backup/kyt-pipeline/02_download_product_images.js new file mode 100644 index 0000000..10f73c5 --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/02_download_product_images.js @@ -0,0 +1,13 @@ +module.exports = require("../import-pipeline/shared/downloadProductImages"); + +if (require.main === module) { + module.exports.downloadProductImagesFromAggregatedJson() + .then((summary) => { + console.log("\nDownload summary:"); + console.log(JSON.stringify(summary, null, 2)); + }) + .catch((error) => { + console.error("Image download failed:", error.message); + process.exitCode = 1; + }); +} diff --git a/src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js b/src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js new file mode 100644 index 0000000..6d64994 --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/03_watermark_downloaded_images.js @@ -0,0 +1,13 @@ +module.exports = require("../import-pipeline/shared/watermarkImages"); + +if (require.main === module) { + module.exports.applyWatermarkToDownloadedImages() + .then((summary) => { + console.log("\nWatermark summary:"); + console.log(JSON.stringify(summary, null, 2)); + }) + .catch((error) => { + console.error("Watermark run failed:", error.message); + process.exitCode = 1; + }); +} diff --git a/src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js b/src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js new file mode 100644 index 0000000..6ba6902 --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/04_shopify_image_file_uploader.js @@ -0,0 +1,8 @@ +module.exports = require("../import-pipeline/shared/shopifyImageUploader"); + +if (require.main === module) { + module.exports.runStandaloneImageUpload().catch((error) => { + console.error("Image upload pipeline failed:", error.message); + process.exitCode = 1; + }); +} diff --git a/src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js b/src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js new file mode 100644 index 0000000..3edb67a --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/05_kyt_to_shopify_converter.js @@ -0,0 +1 @@ +module.exports = require("../import-pipeline/sources/kyt/converter"); diff --git a/src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js b/src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js new file mode 100644 index 0000000..4569b07 --- /dev/null +++ b/src/business-logic/_backup/kyt-pipeline/06_shopify_product_upsert.js @@ -0,0 +1,8 @@ +module.exports = require("../import-pipeline/shared/shopifyProductUpsert"); + +if (require.main === module) { + module.exports.runStandaloneSelfTest().catch((err) => { + console.error("Self-test failed:", err.message); + process.exit(1); + }); +} diff --git a/src/business-logic/import-pipeline/runSourcePipeline.js b/src/business-logic/import-pipeline/runSourcePipeline.js new file mode 100644 index 0000000..2317a11 --- /dev/null +++ b/src/business-logic/import-pipeline/runSourcePipeline.js @@ -0,0 +1,533 @@ +const fs = require("node:fs/promises"); +const path = require("node:path"); +const fsSync = require("node:fs"); +const { downloadProductImagesFromAggregatedJson } = require("./shared/downloadProductImages"); +const { applyWatermarkToDownloadedImages } = require("./shared/watermarkImages"); +const { uploadKytWatermarkedImagesToShopifyFiles } = require("./shared/shopifyImageUploader"); +const { upsertShopifyProductFull } = require("./shared/shopifyProductUpsert"); +const { getSource } = require("./sources"); + +function nowIsoLocal() { + return new Date().toISOString(); +} + +function initRunLogger(logsDir) { + const absLogsDir = path.resolve(process.cwd(), logsDir); + fsSync.mkdirSync(absLogsDir, { recursive: true }); + + const stamp = new Date().toISOString().replace(/[:.]/g, "-"); + const logPath = path.join(absLogsDir, `${stamp}.log`); + const stream = fsSync.createWriteStream(logPath, { flags: "a" }); + + const original = { + log: console.log.bind(console), + info: console.info.bind(console), + warn: console.warn.bind(console), + error: console.error.bind(console), + }; + + function writeLine(level, args) { + const text = args.map((a) => { + if (typeof a === "string") return a; + try { + return JSON.stringify(a); + } catch { + return String(a); + } + }).join(" "); + stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`); + } + + console.log = (...args) => { + writeLine("LOG", args); + original.log(...args); + }; + console.info = (...args) => { + writeLine("INFO", args); + original.info(...args); + }; + console.warn = (...args) => { + writeLine("WARN", args); + original.warn(...args); + }; + console.error = (...args) => { + writeLine("ERROR", args); + original.error(...args); + }; + + console.log(`[RUN-LOG] Writing logs to ${logPath}`); + return { logPath, stream, original }; +} + +function restoreRunLogger(logger) { + if (!logger) return; + console.log = logger.original.log; + console.info = logger.original.info; + console.warn = logger.original.warn; + console.error = logger.original.error; + logger.stream.end(); +} + +function loadDotEnvFile(filePath = ".env") { + const abs = path.resolve(process.cwd(), filePath); + if (!fsSync.existsSync(abs)) return; + + const raw = fsSync.readFileSync(abs, "utf8"); + const lines = raw.split(/\r?\n/); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + + const eq = trimmed.indexOf("="); + if (eq <= 0) continue; + + const key = trimmed.slice(0, eq).trim(); + const value = trimmed.slice(eq + 1).trim(); + if (key && process.env[key] == null) { + process.env[key] = value; + } + } +} + +function parseCliArgs(argv = process.argv.slice(2)) { + const out = { + limit: null, + source: null, + }; + + for (let i = 0; i < argv.length; i += 1) { + const a = argv[i]; + if ((a === "--limit" || a === "-n") && argv[i + 1]) { + const n = Number.parseInt(argv[i + 1], 10); + if (Number.isFinite(n) && n > 0) out.limit = n; + } + if ((a === "--source" || a === "-s") && argv[i + 1]) { + out.source = argv[i + 1]; + } + } + + return out; +} + +async function buildLimitedAggregatedJson(inputPath, limit, outputDir) { + const absInputPath = path.resolve(process.cwd(), inputPath); + const raw = await fs.readFile(absInputPath, "utf8"); + const parsed = JSON.parse(raw); + const products = Array.isArray(parsed?.products) ? parsed.products : []; + const limited = products.slice(0, limit); + + const outPath = path.resolve(process.cwd(), outputDir, `01_products_aggregated.limit_${limit}.json`); + const payload = { + ...parsed, + generatedAt: new Date().toISOString(), + products: limited, + analysis: { + ...(parsed.analysis || {}), + totalProductsUnique: limited.length, + limitedFrom: products.length, + limitApplied: limit, + }, + }; + + await fs.mkdir(path.dirname(outPath), { recursive: true }); + await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8"); + return { + inputPath: absInputPath, + outputPath: outPath, + totalBefore: products.length, + totalAfter: limited.length, + }; +} + +function readBooleanEnv(name, fallback = false) { + const val = process.env[name]; + if (val == null) return fallback; + return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase()); +} + +function logStageSummary(stepKey, summary = {}) { + const parts = Object.entries(summary) + .filter(([, value]) => value !== undefined && value !== null && value !== "") + .map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`); + + console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`); +} + +async function convertAggregatedToShopifyReady({ + source, + inputPath, + outputPath, + uploadedImageMapPath, + imageBaseUrl, + brand, +}) { + const absInputPath = path.resolve(process.cwd(), inputPath); + const absOutputPath = path.resolve(process.cwd(), outputPath); + const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath); + + const raw = await fs.readFile(absInputPath, "utf8"); + const parsed = JSON.parse(raw); + let uploadedImageMap = null; + try { + const mapRaw = await fs.readFile(absUploadedMapPath, "utf8"); + uploadedImageMap = JSON.parse(mapRaw); + } catch { + uploadedImageMap = null; + } + + const convertedProducts = source.convertToShopifyProducts(parsed, { + imageBaseUrl, + brand, + uploadedImageMap, + }); + + const payload = { + generatedAt: new Date().toISOString(), + sourceKey: source.sourceKey, + sourceLabel: source.label, + sourceFile: absInputPath, + totalProducts: convertedProducts.length, + products: convertedProducts, + }; + + await fs.mkdir(path.dirname(absOutputPath), { recursive: true }); + await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8"); + + return { + inputPath: absInputPath, + outputPath: absOutputPath, + uploadedImageMapPath: absUploadedMapPath, + totalProducts: convertedProducts.length, + }; +} + +async function upsertShopifyProductsFromConverted({ + convertedPath, + shop = process.env.SHOPIFY_SHOP, + accessToken = process.env.SHOPIFY_ACCESS_TOKEN, + locationId = process.env.SHOPIFY_LOCATION_ID || null, + enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false), + apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10", +} = {}) { + if (!shop) { + throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage."); + } + if (!accessToken) { + throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage."); + } + + const absConvertedPath = path.resolve(process.cwd(), convertedPath); + const raw = await fs.readFile(absConvertedPath, "utf8"); + const parsed = JSON.parse(raw); + const products = Array.isArray(parsed?.products) ? parsed.products : []; + + const startedAtMs = Date.now(); + const metrics = { + sourcePath: absConvertedPath, + total: products.length, + processed: 0, + created: 0, + updated: 0, + failed: 0, + errors: [], + startedAt: new Date(startedAtMs).toISOString(), + finishedAt: null, + durationSeconds: 0, + successRate: 0, + }; + + for (let i = 0; i < products.length; i += 1) { + const product = products[i]; + const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`; + + try { + const result = await upsertShopifyProductFull({ + shop, + accessToken, + product, + locationId, + enableSeo, + apiVersion, + }); + + metrics.processed += 1; + if (result.action === "created") metrics.created += 1; + if (result.action === "updated") metrics.updated += 1; + + if ((i + 1) % 10 === 0 || i === products.length - 1) { + console.log( + `[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}` + ); + } + } catch (error) { + metrics.processed += 1; + metrics.failed += 1; + metrics.errors.push({ + index: i + 1, + product: label, + error: error.message, + }); + console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`); + } + } + + const endedAtMs = Date.now(); + metrics.finishedAt = new Date(endedAtMs).toISOString(); + metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2)); + metrics.successRate = metrics.total > 0 + ? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2)) + : 0; + + return metrics; +} + +async function runFullSourcePipeline(options = {}) { + loadDotEnvFile(".env"); + + const cli = parseCliArgs(options.argv || process.argv.slice(2)); + const source = getSource(options.sourceKey || cli.source || "kyt"); + const runPaths = source.paths(); + const runLogger = initRunLogger(runPaths.logsDir); + const limit = options.limit || cli.limit || null; + const onProgress = typeof options.onProgress === "function" ? options.onProgress : null; + const emitProgress = (stepIndex, stepKey, message) => { + if (onProgress) { + onProgress({ + stepIndex, + totalSteps: 6, + stepKey, + message, + }); + } + }; + + try { + const imageBaseUrl = process.env[source.envImageBaseUrl] || process.env.SOURCE_IMAGE_BASE_URL || source.defaultImageBaseUrl || ""; + const brand = process.env.SOURCE_BRAND || process.env.SHOPIFY_BRAND || source.defaultBrand || source.label; + + emitProgress(1, "fetchWebsiteData", `Fetching ${source.label} website data`); + console.log(`[PIPELINE 1/6] Fetching ${source.label} website data...`); + const dataSummary = await source.fetchWebsiteData({ paths: runPaths }); + + let aggregatedPathForRun = runPaths.aggregatedJson; + let imagesDirForRun = runPaths.downloadedImagesDir; + let limitSummary = null; + + if (limit) { + limitSummary = await buildLimitedAggregatedJson(runPaths.aggregatedJson, limit, path.dirname(runPaths.aggregatedJson)); + aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/"); + imagesDirForRun = `${runPaths.downloadedImagesDir}.limit_${limit}`; + console.log( + `[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}` + ); + } + + emitProgress(2, "downloadImages", "Downloading product images"); + console.log("\n[PIPELINE 2/6] Downloading product images..."); + const downloadSummary = source.downloadImages + ? await source.downloadImages({ + aggregatedJsonPath: aggregatedPathForRun, + imagesDir: imagesDirForRun, + imageBaseUrl, + concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8), + }) + : await downloadProductImagesFromAggregatedJson({ + jsonPath: aggregatedPathForRun, + outputDir: imagesDirForRun, + baseImageUrl: imageBaseUrl, + concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8), + }); + logStageSummary("downloadImages", { + total: downloadSummary?.totalImagesFound ?? 0, + downloaded: downloadSummary?.downloaded ?? 0, + skipped: downloadSummary?.skipped ?? 0, + failed: downloadSummary?.failed ?? 0, + products: downloadSummary?.productsCount ?? 0, + }); + + emitProgress(3, "watermarkImages", "Applying watermark to downloaded images"); + console.log("\n[PIPELINE 3/6] Applying watermark in-place..."); + const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png"; + let watermarkSummary; + const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun); + if (!fsSync.existsSync(absWatermarkPath)) { + watermarkSummary = { + skipped: true, + reason: `Watermark file not found: ${absWatermarkPath}`, + imagesDir: path.resolve(process.cwd(), imagesDirForRun), + watermarkPath: absWatermarkPath, + totalImagesFound: 0, + processed: 0, + skippedCount: 0, + failed: 0, + }; + console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`); + } else { + const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false); + try { + watermarkSummary = await applyWatermarkToDownloadedImages({ + imagesDir: imagesDirForRun, + watermarkPath: watermarkPathForRun, + statePath: runPaths.watermarkState, + concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4), + }); + } catch (error) { + if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) { + watermarkSummary = { + skipped: true, + reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`, + imagesDir: path.resolve(process.cwd(), imagesDirForRun), + watermarkPath: absWatermarkPath, + totalImagesFound: 0, + processed: 0, + skippedCount: 0, + failed: 0, + }; + console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`); + } else { + throw error; + } + } + } + logStageSummary("watermarkImages", { + total: watermarkSummary?.totalImagesFound ?? 0, + processed: watermarkSummary?.processed ?? 0, + skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0, + failed: watermarkSummary?.failed ?? 0, + concurrency: watermarkSummary?.concurrency ?? "", + }); + + emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files"); + console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files..."); + const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true); + const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false); + let imageUploadSummary; + + if (!imageUploadEnabled) { + imageUploadSummary = { + skipped: true, + reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false", + totalTasks: 0, + processed: 0, + uploaded: 0, + failed: 0, + }; + console.log("[PIPELINE 4/6] Image upload stage skipped by config."); + } else { + try { + const imageUploadInput = { + shop: process.env.SHOPIFY_SHOP, + accessToken: process.env.SHOPIFY_ACCESS_TOKEN, + apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10", + aggregatedJsonPath: aggregatedPathForRun, + imagesDir: imagesDirForRun, + statePath: runPaths.imageUploadState, + mapPath: runPaths.uploadedMapJson, + concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3), + }; + imageUploadSummary = source.uploadImagesToShopifyFiles + ? await source.uploadImagesToShopifyFiles(imageUploadInput) + : await uploadKytWatermarkedImagesToShopifyFiles(imageUploadInput); + } catch (error) { + const message = String(error?.message || "Unknown image upload error"); + imageUploadSummary = { + skipped: true, + reason: message, + totalTasks: 0, + processed: 0, + uploaded: 0, + failed: 0, + }; + + console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`); + if (imageUploadRequired) { + throw error; + } + console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false)."); + } + } + logStageSummary("uploadImagesToShopifyFiles", { + total: imageUploadSummary?.totalTasks ?? 0, + processed: imageUploadSummary?.processed ?? 0, + uploaded: imageUploadSummary?.uploaded ?? 0, + skipped: imageUploadSummary?.skipped ?? 0, + failed: imageUploadSummary?.failed ?? 0, + concurrency: imageUploadSummary?.concurrency ?? "", + }); + + emitProgress(5, "convertToShopifyReady", `Converting ${source.label} data to Shopify-ready products`); + console.log(`\n[PIPELINE 5/6] Converting ${source.label} data to Shopify-ready products...`); + const conversionSummary = await convertAggregatedToShopifyReady({ + source, + inputPath: aggregatedPathForRun, + outputPath: runPaths.shopifyReadyJson, + uploadedImageMapPath: runPaths.uploadedMapJson, + imageBaseUrl, + brand, + }); + logStageSummary("convertToShopifyReady", { + total: conversionSummary?.totalProducts ?? 0, + }); + + emitProgress(6, "upsertToShopify", "Upserting products to Shopify"); + console.log("\n[PIPELINE 6/6] Upserting products to Shopify..."); + const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({ + convertedPath: runPaths.shopifyReadyJson, + shop: process.env.SHOPIFY_SHOP, + accessToken: process.env.SHOPIFY_ACCESS_TOKEN, + locationId: process.env.SHOPIFY_LOCATION_ID || null, + enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false), + apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10", + }); + logStageSummary("upsertToShopify", { + total: shopifyUpsertSummary?.total ?? 0, + processed: shopifyUpsertSummary?.processed ?? 0, + created: shopifyUpsertSummary?.created ?? 0, + updated: shopifyUpsertSummary?.updated ?? 0, + failed: shopifyUpsertSummary?.failed ?? 0, + successRate: shopifyUpsertSummary?.successRate ?? 0, + }); + + const summary = { + source: { + key: source.sourceKey, + label: source.label, + }, + paths: runPaths, + completedAt: new Date().toISOString(), + runLogPath: runLogger.logPath, + limit: limit || null, + limitSummary, + steps: { + fetchWebsiteData: dataSummary, + downloadImages: downloadSummary, + watermarkImages: watermarkSummary, + uploadImagesToShopifyFiles: imageUploadSummary, + convertToShopifyReady: conversionSummary, + upsertToShopify: shopifyUpsertSummary, + }, + upcomingSteps: [], + }; + + emitProgress(6, "completed", `${source.label} pipeline completed`); + console.log("\n=== FULL PIPELINE SUMMARY ==="); + console.log(JSON.stringify(summary, null, 2)); + + return summary; + } finally { + restoreRunLogger(runLogger); + } +} + +module.exports = { + runFullSourcePipeline, + convertAggregatedToShopifyReady, + upsertShopifyProductsFromConverted, +}; + +if (require.main === module) { + runFullSourcePipeline().catch((error) => { + console.error("Full pipeline failed:", error.message); + process.exitCode = 1; + }); +} diff --git a/src/business-logic/kyt-pipeline/02_download_product_images.js b/src/business-logic/import-pipeline/shared/downloadProductImages.js similarity index 85% rename from src/business-logic/kyt-pipeline/02_download_product_images.js rename to src/business-logic/import-pipeline/shared/downloadProductImages.js index b55db9b..b77efe8 100644 --- a/src/business-logic/kyt-pipeline/02_download_product_images.js +++ b/src/business-logic/import-pipeline/shared/downloadProductImages.js @@ -21,7 +21,7 @@ function encodePathKeepingSlashes(relativePath) { } function getExtFromPathOrType(imagePath, contentType) { - const parsed = path.extname(imagePath || "").toLowerCase(); + const parsed = path.extname(getPathWithoutQuery(imagePath)).toLowerCase(); if (parsed) { return parsed; } @@ -38,6 +38,35 @@ function getExtFromPathOrType(imagePath, contentType) { return ".jpg"; } +function isAbsoluteUrl(value) { + return /^https?:\/\//i.test(String(value || "")); +} + +function getPathWithoutQuery(value) { + const raw = String(value || ""); + try { + return new URL(raw).pathname; + } catch { + return raw.split(/[?#]/)[0]; + } +} + +function buildImageUrl(imagePath, baseImageUrl) { + if (isAbsoluteUrl(imagePath)) { + return String(imagePath); + } + + const encoded = encodePathKeepingSlashes(imagePath); + return `${String(baseImageUrl || "").replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`; +} + +function getFilePartsFromImagePath(imagePath, fallbackIndex) { + const cleanPath = getPathWithoutQuery(imagePath); + const originalBase = path.basename(cleanPath, path.extname(cleanPath)) || `image_${fallbackIndex + 1}`; + const originalExt = path.extname(cleanPath) || ".png"; + return { originalBase, originalExt }; +} + async function downloadWithRetry(url, retries = 3) { let lastError; @@ -129,11 +158,8 @@ async function downloadProductImagesFromAggregatedJson(options = {}) { for (let idx = 0; idx < imagePaths.length; idx += 1) { const relPath = imagePaths[idx]; - const encoded = encodePathKeepingSlashes(relPath); - const imageUrl = `${baseImageUrl.replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`; - - const originalBase = path.basename(String(relPath || ""), path.extname(String(relPath || ""))) || `image_${idx + 1}`; - const originalExt = path.extname(String(relPath || "")) || ".png"; + const imageUrl = buildImageUrl(relPath, baseImageUrl); + const { originalBase, originalExt } = getFilePartsFromImagePath(relPath, idx); let localBase = sanitizeName(originalBase) || `image_${idx + 1}`; let fileName = `${localBase}${originalExt}`; diff --git a/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js b/src/business-logic/import-pipeline/shared/shopifyImageUploader.js similarity index 97% rename from src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js rename to src/business-logic/import-pipeline/shared/shopifyImageUploader.js index e05ba8b..af01af9 100644 --- a/src/business-logic/kyt-pipeline/04_shopify_image_file_uploader.js +++ b/src/business-logic/import-pipeline/shared/shopifyImageUploader.js @@ -39,6 +39,15 @@ function sanitizeName(value) { .slice(0, 150); } +function getPathWithoutQuery(value) { + const raw = String(value || ""); + try { + return new URL(raw).pathname; + } catch { + return raw.split(/[?#]/)[0]; + } +} + function getMimeType(filePath) { const ext = path.extname(filePath).toLowerCase(); if (ext === ".png") return "image/png"; @@ -258,8 +267,9 @@ function buildLocalImageTasks(aggregatedPayload, imagesDir) { for (let idx = 0; idx < imagePaths.length; idx += 1) { const sourcePath = imagePaths[idx]; - const originalBase = path.basename(String(sourcePath || ""), path.extname(String(sourcePath || ""))) || `image_${idx + 1}`; - const originalExt = path.extname(String(sourcePath || "")) || ".png"; + const cleanSourcePath = getPathWithoutQuery(sourcePath); + const originalBase = path.basename(cleanSourcePath, path.extname(cleanSourcePath)) || `image_${idx + 1}`; + const originalExt = path.extname(cleanSourcePath) || ".png"; const localBase = sanitizeName(originalBase) || `image_${idx + 1}`; let fileName = `${localBase}${originalExt}`; diff --git a/src/business-logic/kyt-pipeline/06_shopify_product_upsert.js b/src/business-logic/import-pipeline/shared/shopifyProductUpsert.js similarity index 61% rename from src/business-logic/kyt-pipeline/06_shopify_product_upsert.js rename to src/business-logic/import-pipeline/shared/shopifyProductUpsert.js index 3509297..f103bfd 100644 --- a/src/business-logic/kyt-pipeline/06_shopify_product_upsert.js +++ b/src/business-logic/import-pipeline/shared/shopifyProductUpsert.js @@ -98,6 +98,207 @@ function normalizeFitmentTags(input) { return { fitmentMap, fitmentFlat }; } +function normalizeOptionKey(optionValues = []) { + return optionValues + .map((item) => `${String(item.name || "").trim().toLowerCase()}=${String(item.value || "").trim().toLowerCase()}`) + .sort() + .join("|"); +} + +function getDesiredOptions(attrs) { + const options = Array.isArray(attrs.options) ? attrs.options : []; + return options + .map((option) => ({ + name: String(option.name || "").trim(), + values: Array.from(new Set((Array.isArray(option.values) ? option.values : []).map((value) => String(value || "").trim()).filter(Boolean))), + })) + .filter((option) => option.name && option.values.length); +} + +function getDesiredVariants(attrs, finalPrice, compareAtPrice) { + const variants = Array.isArray(attrs.variants) ? attrs.variants : []; + if (!variants.length) { + return [ + { + sku: attrs.part_number || "", + price: finalPrice, + compareAtPrice, + quantity: Number(attrs.total_quantity ?? attrs.quantity ?? 0), + optionValues: [], + }, + ]; + } + + return variants.map((variant, index) => ({ + sku: String(variant.sku || attrs.part_number || `${attrs.part_number || "variant"}-${index + 1}`).slice(0, 64), + price: Number(variant.price ?? finalPrice), + compareAtPrice: variant.compare_price != null ? Number(variant.compare_price) : compareAtPrice, + quantity: Number(variant.quantity ?? attrs.total_quantity ?? attrs.quantity ?? 0), + optionValues: Array.isArray(variant.optionValues) + ? variant.optionValues + .map((item) => ({ + name: String(item.name || "").trim(), + value: String(item.value || "").trim(), + })) + .filter((item) => item.name && item.value) + : [], + })); +} + +function buildVariantBulkInput(variant, attrs, weightValue, variantId = null) { + const input = { + ...(variantId ? { id: variantId } : {}), + price: Number(variant.price || 0).toFixed(2), + ...(variant.compareAtPrice !== null && variant.compareAtPrice !== undefined + ? { compareAtPrice: Number(variant.compareAtPrice).toFixed(2) } + : {}), + ...(attrs.barcode ? { barcode: attrs.barcode } : {}), + inventoryItem: { + sku: variant.sku || attrs.part_number || "", + measurement: { weight: { value: weightValue, unit: "POUNDS" } }, + }, + }; + + if (!variantId && variant.optionValues?.length) { + input.optionValues = variant.optionValues.map((optionValue) => ({ + name: optionValue.value, + optionName: optionValue.name, + })); + } + + return input; +} + +async function ensureProductOptions(client, productId, currentOptions, desiredOptions) { + if (!desiredOptions.length) { + return currentOptions || []; + } + + const existingNames = new Set((currentOptions || []).map((option) => String(option.name || "").toLowerCase())); + const missingOptions = desiredOptions.filter((option) => !existingNames.has(option.name.toLowerCase())); + if (!missingOptions.length) { + return currentOptions || []; + } + + const created = await gql( + client, + `mutation($productId: ID!, $options: [OptionCreateInput!]!, $variantStrategy: ProductOptionCreateVariantStrategy) { + productOptionsCreate(productId: $productId, options: $options, variantStrategy: $variantStrategy) { + product { + id + options { id name values position optionValues { id name hasVariants } } + } + userErrors { field message code } + } + }`, + { + productId, + options: missingOptions.map((option) => ({ + name: option.name, + values: option.values.map((value) => ({ name: value })), + })), + variantStrategy: "CREATE", + } + ); + + const errs = created.productOptionsCreate.userErrors || []; + if (errs.length) { + throw new Error(`productOptionsCreate failed: ${errs.map((e) => e.message).join(", ")}`); + } + + return created.productOptionsCreate.product?.options || currentOptions || []; +} + +async function fetchProductVariantState(client, productId) { + const data = await gql( + client, + `query($id: ID!) { + product(id: $id) { + id + options { id name values position optionValues { id name hasVariants } } + variants(first: 250) { + nodes { + id + sku + selectedOptions { name value } + inventoryItem { id } + } + } + } + }`, + { id: productId } + ); + + return data.product || { id: productId, options: [], variants: { nodes: [] } }; +} + +async function syncProductVariants({ client, productId, attrs, desiredVariants, desiredOptions, weightValue }) { + let productState = await fetchProductVariantState(client, productId); + await ensureProductOptions(client, productId, productState.options || [], desiredOptions); + productState = await fetchProductVariantState(client, productId); + + const existingVariants = productState.variants?.nodes || []; + const existingByOptions = new Map(); + for (const variant of existingVariants) { + existingByOptions.set(normalizeOptionKey(variant.selectedOptions || []), variant); + } + + const variantsToUpdate = []; + const variantsToCreate = []; + + desiredVariants.forEach((variant, index) => { + const key = normalizeOptionKey(variant.optionValues || []); + const existing = existingByOptions.get(key) || (index === 0 ? existingVariants[0] : null); + if (existing?.id) { + variantsToUpdate.push(buildVariantBulkInput(variant, attrs, weightValue, existing.id)); + } else { + variantsToCreate.push(buildVariantBulkInput(variant, attrs, weightValue)); + } + }); + + const syncedVariants = []; + + if (variantsToUpdate.length) { + const updated = await gql( + client, + `mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) { + productVariantsBulkUpdate(productId: $productId, variants: $variants) { + productVariants { id sku inventoryItem { id sku } } + userErrors { field message } + } + }`, + { productId, variants: variantsToUpdate } + ); + + if (updated.productVariantsBulkUpdate.userErrors?.length) { + throw new Error(`variant update failed: ${updated.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`); + } + + syncedVariants.push(...(updated.productVariantsBulkUpdate.productVariants || [])); + } + + if (variantsToCreate.length) { + const created = await gql( + client, + `mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!, $strategy: ProductVariantsBulkCreateStrategy) { + productVariantsBulkCreate(productId: $productId, variants: $variants, strategy: $strategy) { + productVariants { id sku inventoryItem { id sku } selectedOptions { name value } } + userErrors { field message } + } + }`, + { productId, variants: variantsToCreate, strategy: "REMOVE_STANDALONE_VARIANT" } + ); + + if (created.productVariantsBulkCreate.userErrors?.length) { + throw new Error(`variant create failed: ${created.productVariantsBulkCreate.userErrors.map((e) => e.message).join(", ")}`); + } + + syncedVariants.push(...(created.productVariantsBulkCreate.productVariants || [])); + } + + return syncedVariants; +} + async function generateSeo(attrs, seoClient) { const seoInput = { product_name: attrs.product_name, @@ -305,8 +506,6 @@ async function upsertShopifyProductFull({ const existing = existingSearch.products.nodes?.[0]; let productId; - let variantId; - let inventoryItemId; let action; if (existing) { @@ -336,8 +535,6 @@ async function upsertShopifyProductFull({ if (errs.length) throw new Error(`productUpdate failed: ${errs.map((e) => e.message).join(", ")}`); productId = existing.id; - variantId = existing.variants?.nodes?.[0]?.id; - inventoryItemId = existing.variants?.nodes?.[0]?.inventoryItem?.id; action = "updated"; } else { const crt = await gql( @@ -370,13 +567,9 @@ async function upsertShopifyProductFull({ if (errs.length) throw new Error(`productCreate failed: ${errs.map((e) => e.message).join(", ")}`); productId = crt.productCreate.product.id; - variantId = crt.productCreate.product.variants?.nodes?.[0]?.id; - inventoryItemId = crt.productCreate.product.variants?.nodes?.[0]?.inventoryItem?.id; action = "created"; } - if (!variantId) throw new Error("No variant ID found after upsert."); - const { priceType, percentage } = await getPricingConfig(client); const basePrice = Number(attrs.price || 0); let finalPrice = basePrice; @@ -386,37 +579,25 @@ async function upsertShopifyProductFull({ const compareAtPrice = attrs.compare_price != null ? Number(attrs.compare_price) : null; const weightValue = Number(attrs.dimensions?.[0]?.weight || 0); - - const bulk = await gql( + const desiredOptions = getDesiredOptions(attrs); + const desiredVariants = getDesiredVariants(attrs, finalPrice, compareAtPrice); + const syncedVariants = await syncProductVariants({ client, - `mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) { - productVariantsBulkUpdate(productId: $productId, variants: $variants) { - productVariants { id price barcode inventoryItem { id sku } } - userErrors { field message } - } - }`, - { - productId, - variants: [ - { - id: variantId, - price: Number(finalPrice).toFixed(2), - ...(compareAtPrice !== null ? { compareAtPrice: Number(compareAtPrice).toFixed(2) } : {}), - ...(attrs.barcode ? { barcode: attrs.barcode } : {}), - inventoryItem: { - sku: attrs.part_number || "", - measurement: { weight: { value: weightValue, unit: "POUNDS" } }, - }, - }, - ], - } - ); + productId, + attrs, + desiredVariants, + desiredOptions, + weightValue, + }); - if (bulk.productVariantsBulkUpdate.userErrors?.length) { - throw new Error(`variant update failed: ${bulk.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`); - } + const inventoryTargets = syncedVariants + .map((variant, index) => ({ + inventoryItemId: variant.inventoryItem?.id, + quantity: Number(desiredVariants[index]?.quantity ?? attrs.total_quantity ?? attrs.quantity ?? 0), + })) + .filter((item) => item.inventoryItemId); - if (inventoryItemId) { + for (const inventoryTarget of inventoryTargets) { const inv = await gql( client, `mutation($id: ID!, $input: InventoryItemInput!) { @@ -426,7 +607,7 @@ async function upsertShopifyProductFull({ } }`, { - id: inventoryItemId, + id: inventoryTarget.inventoryItemId, input: { cost: Number(attrs.purchase_cost || 0), tracked: true, @@ -437,9 +618,11 @@ async function upsertShopifyProductFull({ if (inv.inventoryItemUpdate.userErrors?.length) { throw new Error(`inventoryItemUpdate failed: ${inv.inventoryItemUpdate.userErrors.map((e) => e.message).join(", ")}`); } + } - if (locationId) { - try { + if (locationId && inventoryTargets.length) { + try { + for (const inventoryTarget of inventoryTargets) { const act = await gql( client, `mutation($inventoryItemId: ID!, $locationId: ID!) { @@ -448,49 +631,46 @@ async function upsertShopifyProductFull({ userErrors { field message } } }`, - { inventoryItemId, locationId } + { inventoryItemId: inventoryTarget.inventoryItemId, locationId } ); if (act.inventoryActivate.userErrors?.length) { throw new Error(`inventoryActivate failed: ${act.inventoryActivate.userErrors.map((e) => e.message).join(", ")}`); } - - const totalQty = Number(attrs.total_quantity ?? attrs.quantity ?? 0); - const setQty = await gql( - client, - `mutation($input: InventorySetQuantitiesInput!) { - inventorySetQuantities(input: $input) { - userErrors { field message } - } - }`, - { - input: { - name: "available", - reason: "correction", - ignoreCompareQuantity: true, - quantities: [ - { - inventoryItemId, - locationId, - quantity: totalQty, - compareQuantity: 0, - }, - ], - }, - } - ); - - if (setQty.inventorySetQuantities.userErrors?.length) { - throw new Error(`inventorySetQuantities failed: ${setQty.inventorySetQuantities.userErrors.map((e) => e.message).join(", ")}`); - } - } catch (inventoryError) { - console.log( - `[INVENTORY-WARN] Skipping inventory activation/set for product ${productId}. Reason: ${inventoryError.message}` - ); } - } else { - console.log(`[INVENTORY-WARN] No locationId provided. Skipping inventory activation/set for product ${productId}.`); + + const setQty = await gql( + client, + `mutation($input: InventorySetQuantitiesInput!) { + inventorySetQuantities(input: $input) { + userErrors { field message } + } + }`, + { + input: { + name: "available", + reason: "correction", + ignoreCompareQuantity: true, + quantities: inventoryTargets.map((inventoryTarget) => ({ + inventoryItemId: inventoryTarget.inventoryItemId, + locationId, + quantity: inventoryTarget.quantity, + compareQuantity: 0, + })), + }, + } + ); + + if (setQty.inventorySetQuantities.userErrors?.length) { + throw new Error(`inventorySetQuantities failed: ${setQty.inventorySetQuantities.userErrors.map((e) => e.message).join(", ")}`); + } + } catch (inventoryError) { + console.log( + `[INVENTORY-WARN] Skipping inventory activation/set for product ${productId}. Reason: ${inventoryError.message}` + ); } + } else if (!locationId) { + console.log(`[INVENTORY-WARN] No locationId provided. Skipping inventory activation/set for product ${productId}.`); } await publishToOnlineStore(client, productId); @@ -527,8 +707,9 @@ async function upsertShopifyProductFull({ ok: true, action, productId, - variantId, - inventoryItemId, + variantId: syncedVariants[0]?.id || null, + inventoryItemId: syncedVariants[0]?.inventoryItem?.id || null, + variantsSynced: syncedVariants.length, handle, }; } diff --git a/src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js b/src/business-logic/import-pipeline/shared/watermarkImages.js similarity index 100% rename from src/business-logic/kyt-pipeline/03_watermark_downloaded_images.js rename to src/business-logic/import-pipeline/shared/watermarkImages.js diff --git a/src/business-logic/import-pipeline/sources/README.md b/src/business-logic/import-pipeline/sources/README.md new file mode 100644 index 0000000..56ebc13 --- /dev/null +++ b/src/business-logic/import-pipeline/sources/README.md @@ -0,0 +1,52 @@ +# Import Source Modules + +Each client website gets one source folder here. A source owns the parts that change from website to website: + +- where scraped data is stored +- how website data is fetched +- how raw website records are converted to Shopify-ready products +- optionally, how image download/upload tasks are discovered if the raw data shape is different + +The shared pipeline still handles: + +- job progress +- image watermarking +- Shopify Files upload state +- Shopify product upsert +- logs and summaries + +## Folder Shape + +Every client should follow the same shape: + +```txt +sources/ + client-key/ + index.js source metadata, paths, fetchWebsiteData orchestration + scraper.js website/API scraping only + converter.js source-normalized JSON to Shopify-ready products +``` + +The source registry loads each folder through its `index.js`. + +## Source Contract + +`index.js` exports an object like this: + +```js +module.exports = { + sourceKey: "client-key", + label: "Client Name", + defaultBrand: "Client Brand", + defaultImageBaseUrl: "https://example.com/images", + envImageBaseUrl: "CLIENT_IMAGE_BASE_URL", + paths, + fetchWebsiteData, + convertToShopifyProducts, + // optional: + // downloadImages, + // uploadImagesToShopifyFiles, +}; +``` + +Register the new source in `sources/index.js`. diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/converter.js b/src/business-logic/import-pipeline/sources/brocks-performance/converter.js new file mode 100644 index 0000000..662bda3 --- /dev/null +++ b/src/business-logic/import-pipeline/sources/brocks-performance/converter.js @@ -0,0 +1,250 @@ +const path = require("node:path"); + +function slugify(str) { + return String(str || "") + .trim() + .toLowerCase() + .replace(/[^a-z0-9]+/g, "-") + .replace(/^-+|-+$/g, ""); +} + +function escapeHtml(input) { + return String(input || "") + .replace(/&/g, "&") + .replace(//g, ">") + .replace(/"/g, """) + .replace(/'/g, "'"); +} + +function cleanText(value) { + return String(value || "") + .replace(/\u2026/g, "...") + .replace(/\s+\n/g, "\n") + .replace(/\n\s+/g, "\n") + .replace(/[ \t]+/g, " ") + .replace(/\n{3,}/g, "\n\n") + .trim(); +} + +function getUploadedImageUrl(imgPath, uploadedImageMap) { + if (!uploadedImageMap || typeof uploadedImageMap !== "object") return null; + const bySourcePath = uploadedImageMap.bySourcePath && typeof uploadedImageMap.bySourcePath === "object" + ? uploadedImageMap.bySourcePath + : {}; + return bySourcePath[String(imgPath)]?.url || null; +} + +function getImageFileName(imagePath) { + try { + return path.basename(new URL(imagePath).pathname); + } catch { + return path.basename(String(imagePath || "").split(/[?#]/)[0]); + } +} + +function descriptionToHtml(description, sections = []) { + const blocks = []; + const clean = cleanText(description); + if (clean) { + blocks.push( + ...clean + .split(/\n{2,}/) + .map((block) => `

${escapeHtml(block).replace(/\n/g, "
")}

`) + ); + } + + for (const section of sections) { + const title = cleanText(section.title); + const paragraphs = Array.isArray(section.paragraphs) ? section.paragraphs.map(cleanText).filter(Boolean) : []; + const bullets = Array.isArray(section.bullets) ? section.bullets.map(cleanText).filter(Boolean) : []; + + if (title) { + blocks.push(`

${escapeHtml(title)}

`); + } + for (const paragraph of paragraphs) { + blocks.push(`

${escapeHtml(paragraph)}

`); + } + if (bullets.length) { + blocks.push(``); + } + } + + return blocks.join("\n"); +} + +function normalizeOptionName(name) { + const clean = cleanText(name).replace(/^select\s+/i, ""); + return clean || "Option"; +} + +function getSourceOptions(record, data) { + const options = Array.isArray(record?.sourceOptions) ? record.sourceOptions : Array.isArray(data?.options) ? data.options : []; + return options + .map((option) => { + const name = normalizeOptionName(option.name || option.label); + const values = Array.isArray(option.values) ? option.values : []; + return { + name, + values: values + .map((value) => ({ + label: cleanText(value.label || value.name), + sourceValue: value.value || null, + selected: Boolean(value.selected), + })) + .filter((value) => value.label && !/^choose options$/i.test(value.label)), + }; + }) + .filter((option) => option.values.length); +} + +function buildOptionCombinations(options) { + if (!options.length) { + return []; + } + + return options.reduce((combinations, option) => { + const next = []; + for (const combo of combinations.length ? combinations : [[]]) { + for (const value of option.values) { + next.push([...combo, { name: option.name, value: value.label, sourceValue: value.sourceValue }]); + } + } + return next; + }, []); +} + +function buildVariants({ options, baseSku, price, quantity, compareAtPrice = null }) { + const combinations = buildOptionCombinations(options); + if (!combinations.length) { + return [ + { + sku: baseSku, + price, + compare_price: compareAtPrice, + quantity, + optionValues: [], + }, + ]; + } + + return combinations.map((combo, index) => { + const suffix = combo + .map((item) => slugify(item.value).slice(0, 28)) + .filter(Boolean) + .join("-"); + + return { + sku: index === 0 ? baseSku : `${baseSku}-${suffix || index + 1}`.slice(0, 64), + price, + compare_price: compareAtPrice, + quantity, + optionValues: combo, + }; + }); +} + +function convertToShopifyProducts(input, options = {}) { + const records = Array.isArray(input?.products) ? input.products : []; + const brand = options.brand || "Brock's Performance"; + + return records.map((record, index) => { + const data = record?.productDetails?.data || {}; + const sourceRecord = record?.sourceRecord || {}; + const productId = record?.productId || data.id || slugify(sourceRecord.url || data.name || `brocks-${index + 1}`); + const title = cleanText(data.name || sourceRecord.title || `Brock's Performance Product ${index + 1}`); + const sku = cleanText(data.code || sourceRecord.sku || productId); + const imagePaths = Array.isArray(data.img) ? data.img : []; + const sourceOptions = getSourceOptions(record, data); + const quantity = Number(data?.stocks?.total_units ?? sourceRecord?.stock?.currentStock ?? 0) || 0; + const descriptionHtml = descriptionToHtml( + data?.details?.description || sourceRecord.description, + data?.details?.descriptionSections || record?.descriptionSections || sourceRecord.descriptionSections || [] + ); + const price = Number(data?.cost?.mrp ?? sourceRecord.price ?? 0) || 0; + const files = imagePaths + .map((imagePath) => ({ + type: "Image", + url: getUploadedImageUrl(imagePath, options.uploadedImageMap) || imagePath, + media_content: getImageFileName(imagePath), + source_path: imagePath, + })) + .filter((file) => file.url); + const variants = buildVariants({ + options: sourceOptions, + baseSku: sku, + price, + quantity, + }); + + const category = "Exhaust"; + const subcategory = "Titanium Full Systems, Suzuki"; + const tags = [ + brand, + category, + "Titanium Full Systems", + "Suzuki", + "Motorcycle Exhaust", + sku, + ].filter(Boolean); + + return { + id: productId, + source: { + productId, + sourceKey: "brocks-performance", + url: sourceRecord.url || data.url || null, + image_paths: imagePaths, + }, + attributes: { + product_name: title, + brand, + category, + subcategory, + part_number: sku, + mfr_part_number: sku, + price, + compare_price: null, + purchase_cost: null, + barcode: "", + price_group: null, + units_per_sku: null, + part_description: descriptionHtml, + descriptions: descriptionHtml + ? [{ type: "Market Description", description: descriptionHtml }] + : [], + files, + image_paths: imagePaths, + dimensions: [{ weight: 0 }], + total_quantity: quantity, + inventorydata: { + inventory: { + main: quantity, + }, + }, + options: sourceOptions.map((option) => ({ + name: option.name, + values: option.values.map((value) => value.label), + })), + variants, + fitmentTags: { + make: ["Suzuki"], + model: [], + year: [], + drive: [], + baseModel: [], + }, + tags, + handle: slugify(`${brand}-${title}-${productId}`), + source_url: sourceRecord.url || data.url || null, + page_meta: sourceRecord.pageMeta || record.pageMeta || {}, + }, + }; + }); +} + +module.exports = { + cleanText, + convertToShopifyProducts, + slugify, +}; diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/index.js b/src/business-logic/import-pipeline/sources/brocks-performance/index.js new file mode 100644 index 0000000..68a8dc5 --- /dev/null +++ b/src/business-logic/import-pipeline/sources/brocks-performance/index.js @@ -0,0 +1,215 @@ +const fs = require("node:fs/promises"); +const path = require("node:path"); +const { cleanText, scrapeBrocksPerformanceProducts, TARGET_URL } = require("./scraper"); +const { convertToShopifyProducts, slugify } = require("./converter"); + +const sourceKey = "brocks-performance"; +const label = "Brock's Performance"; +const dataDir = path.join("data", "sources", sourceKey); + +function paths() { + return { + aggregatedJson: path.join(dataDir, "01_products_aggregated.json"), + historyJson: path.join(dataDir, "01_products_run_history.json"), + rawJson: path.join(dataDir, "01_raw_scrape.json"), + downloadedImagesDir: path.join(dataDir, "02_downloaded_product_images"), + watermarkState: path.join(dataDir, "03_watermark_state.json"), + imageUploadState: path.join(dataDir, "04_shopify_image_upload_state.json"), + uploadedMapJson: path.join(dataDir, "04_shopify_uploaded_images_map.json"), + shopifyReadyJson: path.join(dataDir, "05_shopify_products_ready.json"), + logsDir: path.join(dataDir, "99_run_logs"), + }; +} + +function uniqueBy(values, keyFn) { + const seen = new Set(); + const out = []; + + for (const value of values) { + const key = keyFn(value); + if (!key || seen.has(key)) { + continue; + } + seen.add(key); + out.push(value); + } + + return out; +} + +function bestImageUrl(image) { + if (!image) { + return null; + } + + if (typeof image === "string") { + return image; + } + + const variants = Array.isArray(image.variants) ? image.variants.filter(Boolean) : []; + return variants[variants.length - 1] || image.src || null; +} + +function getImageList(record) { + const candidates = [ + record.image, + ...(Array.isArray(record.images) ? record.images.map(bestImageUrl) : []), + ].filter(Boolean); + + return uniqueBy(candidates, (url) => String(url).split("?")[0]); +} + +function normalizeOptions(options = []) { + return options + .map((option) => { + const name = cleanText(option.label) || "Option"; + const values = Array.isArray(option.values) ? option.values : []; + const normalizedValues = values + .map((value) => ({ + label: cleanText(value.label), + value: value.value || null, + selected: Boolean(value.selected || value.checked), + disabled: Boolean(value.disabled), + })) + .filter((value) => value.label && !/^choose options$/i.test(value.label)); + + return { + name, + required: Boolean(option.required), + type: option.type || null, + values: uniqueBy(normalizedValues, (value) => value.label.toLowerCase()), + }; + }) + .filter((option) => option.values.length > 0); +} + +function getStockQuantity(record) { + const raw = record?.stock?.currentStock || record?.productInfo?.["Quantity Available"] || ""; + const numeric = Number.parseInt(String(raw).replace(/[^0-9-]/g, ""), 10); + return Number.isFinite(numeric) ? numeric : 0; +} + +function normalizeRecord(record, index) { + const title = cleanText(record.title) || `Brock's Performance Product ${index + 1}`; + const sku = cleanText(record.sku) || slugify(title); + const productId = record.productId || sku || slugify(record.url || title); + const imageList = getImageList(record); + const price = Number.isFinite(record.price) ? record.price : 0; + const description = cleanText(record.description); + const options = normalizeOptions(record.options); + const stockQuantity = getStockQuantity(record); + + return { + productId, + sourceKey, + sourceRecord: record, + sourceOptions: options, + sourceImages: Array.isArray(record.images) ? record.images : [], + descriptionSections: Array.isArray(record.descriptionSections) ? record.descriptionSections : [], + pageMeta: record.pageMeta || {}, + reviews: record.reviews || {}, + productSummary: { + id: productId, + name: title, + cost: { mrp: price }, + img: imageList, + sub_category_name: "Titanium Full Systems", + model_series_name: "Suzuki", + }, + productDetails: { + servercode: "OK", + data: { + id: productId, + code: sku, + name: title, + category_name: "Exhaust", + sub_category_name: "Titanium Full Systems", + model_series_name: "Suzuki", + cost: { mrp: price }, + stocks: { total_units: stockQuantity }, + img: imageList, + url: record.url, + mpn: record.mpn || null, + brand: record.brand || null, + availability: record.availability || null, + currency: record.currency || null, + canonicalUrl: record.canonicalUrl || null, + productInfo: record.productInfo || {}, + options, + details: { + description, + specifications: [ + { name: "SKU", text: sku }, + { name: "MPN", text: record.mpn }, + { name: "Brand", text: record.brand }, + { name: "Availability", text: record.availability }, + { name: "Quantity Available", text: record?.stock?.display }, + { name: "Source URL", text: record.url }, + ].filter((spec) => spec.text), + weight: {}, + descriptionSections: Array.isArray(record.descriptionSections) ? record.descriptionSections : [], + }, + }, + }, + }; +} + +async function fetchWebsiteData({ paths: runPaths }) { + const targetUrl = process.env.BROCKS_PERFORMANCE_TARGET_URL || TARGET_URL; + const rawProducts = await scrapeBrocksPerformanceProducts(targetUrl); + const products = rawProducts.map((record, index) => normalizeRecord(record, index)); + const now = new Date().toISOString(); + + const analysis = { + timestamp: now, + targetUrl, + rawProductsFound: rawProducts.length, + totalProductsUnique: products.length, + detailSuccess: products.length, + detailFailed: 0, + cachedDetailReused: 0, + detailFetchedNow: products.length, + }; + + const payload = { + generatedAt: now, + sourceKey, + sourceLabel: label, + analysis, + products, + }; + + await fs.mkdir(path.dirname(runPaths.aggregatedJson), { recursive: true }); + await fs.writeFile(runPaths.rawJson, `${JSON.stringify(rawProducts, null, 2)}\n`, "utf8"); + await fs.writeFile(runPaths.aggregatedJson, `${JSON.stringify(payload, null, 2)}\n`, "utf8"); + + let history = []; + try { + history = JSON.parse(await fs.readFile(runPaths.historyJson, "utf8")); + if (!Array.isArray(history)) history = []; + } catch { + history = []; + } + history.push(analysis); + await fs.writeFile(runPaths.historyJson, `${JSON.stringify(history, null, 2)}\n`, "utf8"); + + console.log(`[BROCKS] Saved ${products.length} products to ${runPaths.aggregatedJson}`); + + return { + analysis, + outputJsonPath: path.resolve(process.cwd(), runPaths.aggregatedJson), + rawJsonPath: path.resolve(process.cwd(), runPaths.rawJson), + historyPath: path.resolve(process.cwd(), runPaths.historyJson), + }; +} + +module.exports = { + sourceKey, + label, + defaultBrand: "Brock's Performance", + defaultImageBaseUrl: "", + envImageBaseUrl: "BROCKS_PERFORMANCE_IMAGE_BASE_URL", + paths, + fetchWebsiteData, + convertToShopifyProducts, +}; diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js b/src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js new file mode 100644 index 0000000..0c58abc --- /dev/null +++ b/src/business-logic/import-pipeline/sources/brocks-performance/scrape_brocks_products.js @@ -0,0 +1,411 @@ +const { chromium } = require('playwright'); +const fs = require('fs/promises'); +const path = require('path'); +const { URL } = require('url'); + +const TARGET_URL = 'https://brocksperformance.com/exhausts/titanium-full-systems/suzuki/'; +const OUTPUT_DIR = path.join(__dirname, '..', 'outputs'); +const OUTPUT_FILE = path.join(OUTPUT_DIR, 'brocksperformance-suzuki-exhausts-full-details.json'); + +function cleanText(value) { + return String(value || '') + .replace(/\u2026/g, '...') + .replace(/\s+\n/g, '\n') + .replace(/\n\s+/g, '\n') + .replace(/[ \t]+/g, ' ') + .replace(/\n{3,}/g, '\n\n') + .trim(); +} + +function toNumber(value) { + if (!value) return null; + const cleaned = String(value).replace(/[^0-9.\-]/g, ''); + const n = Number.parseFloat(cleaned); + return Number.isNaN(n) ? null : n; +} + +async function expandLoadMore(page) { + // Click any obvious "load more" buttons until they disappear + const loadMoreSelectors = ['button.load-more', '.load-more', 'button.show-more', '.show-more']; + for (const sel of loadMoreSelectors) { + // Try repeatedly because some sites require multiple clicks + while (true) { + const btn = await page.$(sel); + if (!btn) break; + try { + await Promise.all([page.waitForResponse(r => r.status() === 200, { timeout: 5000 }).catch(() => {}), btn.click()]); + await page.waitForTimeout(800); + } catch (e) { + break; + } + } + } +} + +async function autoScroll(page, timeout = 2000) { + const start = Date.now(); + let lastHeight = await page.evaluate(() => document.body.scrollHeight); + while (Date.now() - start < timeout) { + await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); + await page.waitForTimeout(300); + const newHeight = await page.evaluate(() => document.body.scrollHeight); + if (newHeight === lastHeight) break; + lastHeight = newHeight; + } +} + +async function scrape(targetUrl = TARGET_URL, options = {}) { + const outputFile = options.outputFile || OUTPUT_FILE; + const writeOutput = options.writeOutput !== false; + const onDetailProgress = typeof options.onDetailProgress === 'function' ? options.onDetailProgress : null; + const browser = await chromium.launch({ headless: true }); + const page = await browser.newPage(); + await page.setExtraHTTPHeaders({ 'accept-language': 'en-US,en;q=0.9' }); + + try { + await page.goto(targetUrl, { waitUntil: 'networkidle' }); + + // Try to load more items (click buttons, then scroll) + await expandLoadMore(page); + await autoScroll(page, 3000); + + // BigCommerce category pages render product cards as li.product inside .productGrid. + const candidateSelector = '.productGrid .product'; + + const products = await page.$$eval(candidateSelector, (nodes, baseUrl) => { + const siteOrigin = new URL(baseUrl).origin; + const results = []; + for (const node of nodes) { + const anchor = node.querySelector('.card-title a, .card-figure__link[href], a[href]'); + if (!anchor) continue; + const href = anchor.href || (anchor.getAttribute && anchor.getAttribute('href')); + if (!href || href.includes('#') || !href.startsWith(siteOrigin)) continue; + + const titleEl = node.querySelector('.card-title a, .card-title, .product-title, .title, h2, h3, .prod-title') || anchor; + const title = titleEl ? (titleEl.textContent || '').trim() : ''; + + const priceEl = node.querySelector('[data-product-price-without-tax], [data-product-price-with-tax], .price--withoutTax, .price--withTax, .price-final, .product-price'); + const priceText = priceEl ? (priceEl.textContent || '').trim() : ''; + + const imgEl = node.querySelector('img'); + const image = imgEl ? (imgEl.src || imgEl.getAttribute('data-src') || '') : ''; + + const skuEl = node.querySelector('.sku, .product-sku, .prod-sku'); + const skuText = skuEl ? (skuEl.textContent || '').trim() : (node.textContent || '').match(/SKU:\s*([A-Z0-9-]+)/i)?.[1] || ''; + const sku = skuText.replace(/^SKU:\s*/i, '').trim(); + + const descEl = node.querySelector('.description, .product-description, p'); + const description = descEl ? (descEl.textContent || '').trim() : ''; + + results.push({ + title: title || null, + url: href ? new URL(href, baseUrl).href : null, + priceRaw: priceText || null, + image: image || null, + sku: sku || null, + description: description || null, + }); + } + return results; + }, page.url()); + + + // Deduplicate by URL + const map = new Map(); + for (const p of products) { + if (!p.url) continue; + if (!map.has(p.url)) map.set(p.url, p); + } + console.log(`[BROCKS] Found ${map.size} unique product URLs from ${targetUrl}`); + + // Helper to fetch product page details + async function fetchProductDetails(productPage, url) { + try { + await productPage.goto(url, { waitUntil: 'networkidle', timeout: 30000 }); + return await productPage.evaluate(() => { + const clean = (value) => (value || '').replace(/\s+/g, ' ').trim(); + const toAbsolute = (value) => { + if (!value) return null; + try { + return new URL(value, document.location.href).href; + } catch (e) { + return value; + } + }; + const pick = (selectors) => { + for (const s of selectors) { + const el = document.querySelector(s); + if (el) return el; + } + return null; + }; + const unique = (values) => [...new Set(values.filter(Boolean))]; + + const textFrom = (el) => { + if (!el) return null; + return clean(el.textContent || el.getAttribute('content') || el.getAttribute('value') || ''); + }; + const metaContent = (selector) => { + const el = document.querySelector(selector); + return el ? el.getAttribute('content') : null; + }; + const extractSrcset = (value) => { + if (!value) return []; + return value.split(',') + .map((part) => part.trim().split(/\s+/)[0]) + .map(toAbsolute) + .filter(Boolean); + }; + + const priceEl = pick([ + '[data-product-price-without-tax]', + '[data-product-price-with-tax]', + '.productView-price .price--withoutTax', + '.productView-price .price--withTax', + '.productView-price [itemprop="price"]', + '[itemprop="price"]', + '.price--main', + '.price-final', + '.product-price' + ]); + const priceText = textFrom(priceEl); + const productId = document.querySelector('input[name="product_id"]')?.value || null; + const title = textFrom(pick(['.productView-title', 'h1', '[itemprop="name"]'])); + + const skuEl = pick(['[itemprop="sku"]', '.sku', '.productView-sku', '.product-sku', '.prod-sku']); + const sku = textFrom(skuEl); + + const brandEl = pick(['[itemprop="brand"] [itemprop="name"]', '.brand-value span', '.brand-value']); + const brand = textFrom(brandEl); + + const descEl = pick(['.productView-description', '.product-description', '#product-description', '.description', '.prod-desc']); + const description = descEl ? clean(descEl.textContent) : null; + + const imgEl = pick(['.productView-image img', '.product-hero img', '.productImage img', '.primary-image img', 'img']); + const image = imgEl ? toAbsolute(imgEl.getAttribute('src') || imgEl.getAttribute('data-src')) : null; + + const productInfo = {}; + document.querySelectorAll('.productView-info .pdp__info-wrapper > div').forEach((row) => { + const key = clean(row.querySelector('dt')?.textContent || '').replace(/:$/, ''); + const value = clean(row.querySelector('dd')?.textContent || ''); + if (key && value && !productInfo[key]) productInfo[key] = value; + }); + + const options = [...document.querySelectorAll('[data-product-option-change] .form-field')].map((field) => { + const label = clean(field.querySelector('.ob-option-name, .form-label')?.textContent || '').replace(/\s*\*$/, '').replace(/:$/, ''); + const select = field.querySelector('select'); + const inputs = [...field.querySelectorAll('input[type="radio"], input[type="checkbox"]')]; + return { + label: label || null, + required: Boolean(field.querySelector('[required]') || clean(field.textContent).includes('*')), + type: select ? 'select' : (inputs[0]?.type || field.getAttribute('data-product-attribute') || null), + name: select?.name || inputs[0]?.name || null, + values: select + ? [...select.options].map((option) => ({ + label: clean(option.textContent), + value: option.value || null, + selected: option.selected, + disabled: option.disabled, + })).filter((option) => option.label) + : inputs.map((input) => ({ + label: clean(field.querySelector(`label[for="${input.id}"]`)?.textContent || input.closest('label')?.textContent || input.value), + value: input.value || null, + checked: input.checked, + disabled: input.disabled, + })), + }; + }); + + const descriptionSections = [...document.querySelectorAll('.productView-description .tabs .tab-title')].map((tab) => { + const id = tab.getAttribute('href'); + const panel = id ? document.querySelector(id) : null; + return { + title: clean(tab.textContent) || null, + text: panel ? clean(panel.textContent) : null, + bullets: panel ? [...panel.querySelectorAll('li')].map((li) => clean(li.textContent)).filter(Boolean) : [], + paragraphs: panel ? [...panel.querySelectorAll('p')].map((p) => clean(p.textContent)).filter(Boolean) : [], + links: panel ? [...panel.querySelectorAll('a[href]')].map((link) => ({ + text: clean(link.textContent), + url: toAbsolute(link.getAttribute('href')), + })).filter((link) => link.text || link.url) : [], + images: panel ? [...panel.querySelectorAll('img')].map((img) => ({ + src: toAbsolute(img.getAttribute('src') || img.getAttribute('data-src')), + alt: img.getAttribute('alt') || null, + title: img.getAttribute('title') || null, + })).filter((item) => item.src) : [], + tables: panel ? [...panel.querySelectorAll('table')].map((table) => + [...table.querySelectorAll('tr')].map((row) => + [...row.children].map((cell) => clean(cell.textContent)) + ).filter((row) => row.length) + ) : [], + }; + }); + + const images = [...document.querySelectorAll('.productView-images img, .productCarousel img')] + .flatMap((img) => { + const variants = unique([ + toAbsolute(img.getAttribute('src')), + toAbsolute(img.getAttribute('data-src')), + ...extractSrcset(img.getAttribute('srcset')), + ...extractSrcset(img.getAttribute('data-srcset')), + ]); + if (!variants.length) return []; + return [{ + src: variants[0], + variants, + alt: img.getAttribute('alt') || null, + title: img.getAttribute('title') || null, + }]; + }); + + const reviews = [...document.querySelectorAll('.productReviews-list .productReview')].map((review) => ({ + rating: Number.parseFloat(review.querySelector('[itemprop="ratingValue"]')?.getAttribute('content') || '') || null, + title: clean(review.querySelector('.productReview-title, h5, [itemprop="name"]')?.textContent || ''), + author: clean(review.querySelector('[itemprop="author"], .productReview-author')?.textContent || ''), + date: review.querySelector('time, [itemprop="datePublished"]')?.getAttribute('datetime') || clean(review.querySelector('time, [itemprop="datePublished"]')?.textContent || ''), + body: clean(review.querySelector('[itemprop="reviewBody"], .productReview-body')?.textContent || review.textContent || ''), + })); + + return { + productId, + title, + sku, + mpn: metaContent('[itemprop="mpn"]'), + brand, + url: document.location.href, + canonicalUrl: document.querySelector('link[rel="canonical"]')?.href || null, + priceText, + currency: metaContent('meta[property="product:price:currency"]') || metaContent('[itemprop="priceCurrency"]'), + availability: clean(document.querySelector('.pdp__info--red')?.textContent || metaContent('meta[property="og:availability"]') || ''), + stock: { + display: productInfo['Quantity Available'] || null, + currentStock: textFrom(document.querySelector('[data-product-stock]')), + }, + brandUrl: document.querySelector('.brand-value a')?.href || null, + image, + images, + description, + descriptionSections, + productInfo, + options, + reviews: { + ratingValue: Number.parseFloat(metaContent('[itemprop="ratingValue"]') || '') || null, + ratingCount: Number.parseInt(metaContent('[itemprop="ratingCount"]') || '', 10) || null, + reviewCount: Number.parseInt(metaContent('[itemprop="reviewCount"]') || '', 10) || reviews.length, + items: reviews, + }, + pageMeta: { + title: document.title, + description: metaContent('meta[name="description"]'), + keywords: metaContent('meta[name="keywords"]'), + ogTitle: metaContent('meta[property="og:title"]'), + ogDescription: metaContent('meta[property="og:description"]'), + ogImage: metaContent('meta[property="og:image"]'), + }, + }; + }); + } catch (e) { + return { error: e.message, priceText: null, sku: null, description: null, image: null }; + } + } + + // For detail extraction we'll open a secondary page to navigate product pages + const productPage = await browser.newPage(); + const origin = new URL(targetUrl).origin; + + const final = []; + const productItems = Array.from(map.values()); + for (let index = 0; index < productItems.length; index += 1) { + const item = productItems[index]; + const out = { + recordType: 'product', + source: 'page-scrape', + title: item.title, + url: item.url, + price: toNumber(item.priceRaw), + priceRaw: item.priceRaw, + sku: item.sku, + image: item.image, + description: item.description, + }; + + // Only attempt to fetch details for same-origin product pages + try { + const u = new URL(item.url); + if (u.origin === origin) { + const details = await fetchProductDetails(productPage, item.url); + if ((!out.priceRaw || out.priceRaw.trim() === '' || out.price === null) && details.priceText) { + out.priceRaw = details.priceText; + out.price = toNumber(details.priceText); + } + if (details.title) out.title = details.title; + if (details.sku) out.sku = details.sku; + if (details.description) out.description = details.description; + if (details.image) out.image = details.image; + Object.assign(out, { + productId: details.productId || null, + mpn: details.mpn || null, + brand: details.brand || null, + brandUrl: details.brandUrl || null, + canonicalUrl: details.canonicalUrl || null, + currency: details.currency || null, + availability: details.availability || null, + stock: details.stock || {}, + productInfo: details.productInfo || {}, + options: details.options || [], + images: details.images || [], + descriptionSections: details.descriptionSections || [], + reviews: details.reviews || {}, + pageMeta: details.pageMeta || {}, + scrapeError: details.error || null, + }); + } + } catch (e) { + // ignore malformed URLs + } + + final.push(out); + if ((index + 1) % 5 === 0 || index === productItems.length - 1) { + console.log(`[DETAIL] ${index + 1}/${productItems.length} completed`); + if (onDetailProgress) { + onDetailProgress({ + done: index + 1, + total: productItems.length, + product: out.title || out.sku || out.url || null, + }); + } + } + } + + await productPage.close(); + + if (writeOutput) { + await fs.mkdir(path.dirname(outputFile), { recursive: true }); + await fs.writeFile(outputFile, `${JSON.stringify(final, null, 2)}\n`, 'utf8'); + console.log(`Saved ${final.length} products to ${outputFile}`); + } + + return final; + } catch (err) { + console.error('Scrape failed:', err); + throw err; + } finally { + await browser.close(); + } +} + +if (require.main === module) { + scrape().catch(() => { + process.exitCode = 1; + }); +} + +module.exports = { + TARGET_URL, + cleanText, + scrape, + scrapeBrocksPerformanceProducts(targetUrl = TARGET_URL, options = {}) { + return scrape(targetUrl, { ...options, writeOutput: false }); + }, + toNumber, +}; diff --git a/src/business-logic/import-pipeline/sources/brocks-performance/scraper.js b/src/business-logic/import-pipeline/sources/brocks-performance/scraper.js new file mode 100644 index 0000000..0ac7850 --- /dev/null +++ b/src/business-logic/import-pipeline/sources/brocks-performance/scraper.js @@ -0,0 +1 @@ +module.exports = require("./scrape_brocks_products"); diff --git a/src/business-logic/import-pipeline/sources/index.js b/src/business-logic/import-pipeline/sources/index.js new file mode 100644 index 0000000..691b0d1 --- /dev/null +++ b/src/business-logic/import-pipeline/sources/index.js @@ -0,0 +1,33 @@ +const kyt = require("./kyt"); +const brocksPerformance = require("./brocks-performance"); + +const sources = { + [kyt.sourceKey]: kyt, + [brocksPerformance.sourceKey]: brocksPerformance, +}; + +function normalizeSourceKey(sourceKey) { + return String(sourceKey || "kyt").trim().toLowerCase(); +} + +function getSource(sourceKey = "kyt") { + const key = normalizeSourceKey(sourceKey); + const source = sources[key]; + if (!source) { + const available = Object.keys(sources).join(", "); + throw new Error(`Unknown import source "${sourceKey}". Available sources: ${available}`); + } + return source; +} + +function listSources() { + return Object.values(sources).map((source) => ({ + sourceKey: source.sourceKey, + label: source.label, + })); +} + +module.exports = { + getSource, + listSources, +}; diff --git a/src/business-logic/kyt-pipeline/05_kyt_to_shopify_converter.js b/src/business-logic/import-pipeline/sources/kyt/converter.js similarity index 100% rename from src/business-logic/kyt-pipeline/05_kyt_to_shopify_converter.js rename to src/business-logic/import-pipeline/sources/kyt/converter.js diff --git a/src/business-logic/import-pipeline/sources/kyt/index.js b/src/business-logic/import-pipeline/sources/kyt/index.js new file mode 100644 index 0000000..555fcc3 --- /dev/null +++ b/src/business-logic/import-pipeline/sources/kyt/index.js @@ -0,0 +1,48 @@ +const path = require("node:path"); +const { getKytIndiaWebsiteData } = require("./scraper"); +const { convertKytJsonToShopifyProducts } = require("./converter"); + +const sourceKey = "kyt"; +const label = "KYT India"; +const dataDir = path.join("data", "sources", sourceKey); + +function paths() { + return { + aggregatedJson: path.join(dataDir, "01_products_aggregated.json"), + historyJson: path.join(dataDir, "01_products_run_history.json"), + aggregatedXlsx: path.join(dataDir, "01_products_aggregated.xlsx"), + downloadedImagesDir: path.join(dataDir, "02_downloaded_product_images"), + watermarkState: path.join(dataDir, "03_watermark_state.json"), + imageUploadState: path.join(dataDir, "04_shopify_image_upload_state.json"), + uploadedMapJson: path.join(dataDir, "04_shopify_uploaded_images_map.json"), + shopifyReadyJson: path.join(dataDir, "05_shopify_products_ready.json"), + logsDir: path.join(dataDir, "99_run_logs"), + }; +} + +async function fetchWebsiteData({ paths: runPaths }) { + return getKytIndiaWebsiteData({ + outputJson: runPaths.aggregatedJson, + outputHistoryJson: runPaths.historyJson, + outputXlsx: runPaths.aggregatedXlsx, + }); +} + +function convertToShopifyProducts(input, options = {}) { + return convertKytJsonToShopifyProducts(input, { + imageBaseUrl: options.imageBaseUrl || "", + brand: options.brand || "KYT", + uploadedImageMap: options.uploadedImageMap, + }); +} + +module.exports = { + sourceKey, + label, + defaultBrand: "KYT", + defaultImageBaseUrl: "https://kytindia.com/server/public/uploads", + envImageBaseUrl: "KYT_IMAGE_BASE_URL", + paths, + fetchWebsiteData, + convertToShopifyProducts, +}; diff --git a/src/business-logic/kyt-pipeline/01_get_kytindia_website_data.js b/src/business-logic/import-pipeline/sources/kyt/scraper.js similarity index 96% rename from src/business-logic/kyt-pipeline/01_get_kytindia_website_data.js rename to src/business-logic/import-pipeline/sources/kyt/scraper.js index 1ca784d..2aca0be 100644 --- a/src/business-logic/kyt-pipeline/01_get_kytindia_website_data.js +++ b/src/business-logic/import-pipeline/sources/kyt/scraper.js @@ -253,14 +253,18 @@ function writeExcel(filePath, rows, analysis) { XLSX.writeFile(wb, filePath); } -async function getKytIndiaWebsiteData() { +async function getKytIndiaWebsiteData(options = {}) { try { - const outputJsonPath = path.resolve(process.cwd(), OUT_JSON); + const outJson = options.outputJson || OUT_JSON; + const outHistoryJson = options.outputHistoryJson || OUT_HISTORY_JSON; + const outXlsx = options.outputXlsx || OUT_XLSX; + + const outputJsonPath = path.resolve(process.cwd(), outJson); const outputDir = path.dirname(outputJsonPath); await fs.mkdir(outputDir, { recursive: true }); const existingAggregated = await readExistingAggregated(outputJsonPath); console.log( - `[CACHE] Loaded ${existingAggregated.products.length} existing products from ${OUT_JSON}` + + `[CACHE] Loaded ${existingAggregated.products.length} existing products from ${outJson}` + (existingAggregated.generatedAt ? ` (generatedAt: ${existingAggregated.generatedAt})` : "") ); @@ -438,14 +442,16 @@ async function getKytIndiaWebsiteData() { console.log("[4/5] Writing JSON outputs..."); await fs.writeFile(outputJsonPath, JSON.stringify(finalPayload, null, 2), "utf8"); - const historyPath = path.resolve(process.cwd(), OUT_HISTORY_JSON); + const historyPath = path.resolve(process.cwd(), outHistoryJson); + await fs.mkdir(path.dirname(historyPath), { recursive: true }); const history = await readHistory(historyPath); history.push(analysis); await fs.writeFile(historyPath, JSON.stringify(history, null, 2), "utf8"); console.log("[5/5] Writing Excel output..."); const excelRows = buildExcelRows(detailedProducts); - const excelPath = path.resolve(process.cwd(), OUT_XLSX); + const excelPath = path.resolve(process.cwd(), outXlsx); + await fs.mkdir(path.dirname(excelPath), { recursive: true }); writeExcel(excelPath, excelRows, analysis); console.log("\n=== FINAL ANALYSIS ==="); diff --git a/src/business-logic/kyt-pipeline/00_index.js b/src/business-logic/kyt-pipeline/00_index.js deleted file mode 100644 index 0571755..0000000 --- a/src/business-logic/kyt-pipeline/00_index.js +++ /dev/null @@ -1,540 +0,0 @@ -const { getKytIndiaWebsiteData } = require("./01_get_kytindia_website_data"); -const { downloadProductImagesFromAggregatedJson } = require("./02_download_product_images"); -const { applyWatermarkToDownloadedImages } = require("./03_watermark_downloaded_images"); -const fs = require("node:fs/promises"); -const path = require("node:path"); -const fsSync = require("node:fs"); -const { convertKytJsonToShopifyProducts } = require("./05_kyt_to_shopify_converter"); -const { upsertShopifyProductFull } = require("./06_shopify_product_upsert"); -const { uploadKytWatermarkedImagesToShopifyFiles } = require("./04_shopify_image_file_uploader"); - -const DEFAULT_AGGREGATED_JSON = "data/01_products_aggregated.json"; -const DEFAULT_SHOPIFY_READY_JSON = "data/05_shopify_products_ready.json"; -const DEFAULT_UPLOADED_MAP_JSON = "data/04_shopify_uploaded_images_map.json"; -const DEFAULT_LOGS_DIR = "data/99_run_logs"; - -function nowIsoLocal() { - const d = new Date(); - return d.toISOString(); -} - -function initRunLogger(logsDir = DEFAULT_LOGS_DIR) { - const absLogsDir = path.resolve(process.cwd(), logsDir); - fsSync.mkdirSync(absLogsDir, { recursive: true }); - - const stamp = new Date().toISOString().replace(/[:.]/g, "-"); - const logPath = path.join(absLogsDir, `${stamp}.log`); - const stream = fsSync.createWriteStream(logPath, { flags: "a" }); - - const original = { - log: console.log.bind(console), - info: console.info.bind(console), - warn: console.warn.bind(console), - error: console.error.bind(console) - }; - - function writeLine(level, args) { - const text = args.map((a) => { - if (typeof a === "string") return a; - try { - return JSON.stringify(a); - } catch { - return String(a); - } - }).join(" "); - stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`); - } - - console.log = (...args) => { - writeLine("LOG", args); - original.log(...args); - }; - console.info = (...args) => { - writeLine("INFO", args); - original.info(...args); - }; - console.warn = (...args) => { - writeLine("WARN", args); - original.warn(...args); - }; - console.error = (...args) => { - writeLine("ERROR", args); - original.error(...args); - }; - - console.log(`[RUN-LOG] Writing logs to ${logPath}`); - return { logPath }; -} - -function loadDotEnvFile(filePath = ".env") { - const abs = path.resolve(process.cwd(), filePath); - if (!fsSync.existsSync(abs)) return; - - const raw = fsSync.readFileSync(abs, "utf8"); - const lines = raw.split(/\r?\n/); - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed || trimmed.startsWith("#")) continue; - - const eq = trimmed.indexOf("="); - if (eq <= 0) continue; - - const key = trimmed.slice(0, eq).trim(); - const value = trimmed.slice(eq + 1).trim(); - if (key && process.env[key] == null) { - process.env[key] = value; - } - } -} - -loadDotEnvFile(".env"); -const RUN_LOGGER = initRunLogger(DEFAULT_LOGS_DIR); - -function parseCliArgs(argv = process.argv.slice(2)) { - const out = { - limit: null - }; - - for (let i = 0; i < argv.length; i += 1) { - const a = argv[i]; - if ((a === "--limit" || a === "-n") && argv[i + 1]) { - const n = Number.parseInt(argv[i + 1], 10); - if (Number.isFinite(n) && n > 0) out.limit = n; - } - } - - return out; -} - -async function buildLimitedAggregatedJson(inputPath, limit) { - const absInputPath = path.resolve(process.cwd(), inputPath); - const raw = await fs.readFile(absInputPath, "utf8"); - const parsed = JSON.parse(raw); - const products = Array.isArray(parsed?.products) ? parsed.products : []; - const limited = products.slice(0, limit); - - const outPath = path.resolve(process.cwd(), `data/01_products_aggregated.limit_${limit}.json`); - const payload = { - ...parsed, - generatedAt: new Date().toISOString(), - products: limited, - analysis: { - ...(parsed.analysis || {}), - totalProductsUnique: limited.length, - limitedFrom: products.length, - limitApplied: limit - } - }; - - await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8"); - return { - inputPath: absInputPath, - outputPath: outPath, - totalBefore: products.length, - totalAfter: limited.length - }; -} - -function readBooleanEnv(name, fallback = false) { - const val = process.env[name]; - if (val == null) return fallback; - return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase()); -} - -function logStageSummary(stepKey, summary = {}) { - const parts = Object.entries(summary) - .filter(([, value]) => value !== undefined && value !== null && value !== "") - .map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`); - - console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`); -} - -async function convertAggregatedToShopifyReady({ - inputPath = DEFAULT_AGGREGATED_JSON, - outputPath = DEFAULT_SHOPIFY_READY_JSON, - uploadedImageMapPath = DEFAULT_UPLOADED_MAP_JSON, - imageBaseUrl = process.env.KYT_IMAGE_BASE_URL || "", - brand = process.env.SHOPIFY_BRAND || "KYT" -} = {}) { - const absInputPath = path.resolve(process.cwd(), inputPath); - const absOutputPath = path.resolve(process.cwd(), outputPath); - const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath); - - const raw = await fs.readFile(absInputPath, "utf8"); - const parsed = JSON.parse(raw); - let uploadedImageMap = null; - try { - const mapRaw = await fs.readFile(absUploadedMapPath, "utf8"); - uploadedImageMap = JSON.parse(mapRaw); - } catch { - uploadedImageMap = null; - } - - const convertedProducts = convertKytJsonToShopifyProducts(parsed, { - imageBaseUrl, - brand, - uploadedImageMap - }); - - const payload = { - generatedAt: new Date().toISOString(), - sourceFile: absInputPath, - totalProducts: convertedProducts.length, - products: convertedProducts - }; - - await fs.mkdir(path.dirname(absOutputPath), { recursive: true }); - await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8"); - - return { - inputPath: absInputPath, - outputPath: absOutputPath, - uploadedImageMapPath: absUploadedMapPath, - totalProducts: convertedProducts.length - }; -} - -async function upsertShopifyProductsFromConverted({ - convertedPath = DEFAULT_SHOPIFY_READY_JSON, - shop = process.env.SHOPIFY_SHOP, - accessToken = process.env.SHOPIFY_ACCESS_TOKEN, - locationId = process.env.SHOPIFY_LOCATION_ID || null, - enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false), - apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10" -} = {}) { - if (!shop) { - throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage."); - } - if (!accessToken) { - throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage."); - } - - const absConvertedPath = path.resolve(process.cwd(), convertedPath); - const raw = await fs.readFile(absConvertedPath, "utf8"); - const parsed = JSON.parse(raw); - const products = Array.isArray(parsed?.products) ? parsed.products : []; - - const startedAtMs = Date.now(); - const metrics = { - sourcePath: absConvertedPath, - total: products.length, - processed: 0, - created: 0, - updated: 0, - failed: 0, - errors: [], - startedAt: new Date(startedAtMs).toISOString(), - finishedAt: null, - durationSeconds: 0, - successRate: 0 - }; - - for (let i = 0; i < products.length; i += 1) { - const product = products[i]; - const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`; - - try { - const result = await upsertShopifyProductFull({ - shop, - accessToken, - product, - locationId, - enableSeo, - apiVersion - }); - - metrics.processed += 1; - if (result.action === "created") metrics.created += 1; - if (result.action === "updated") metrics.updated += 1; - - if ((i + 1) % 10 === 0 || i === products.length - 1) { - console.log( - `[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}` - ); - } - } catch (error) { - metrics.processed += 1; - metrics.failed += 1; - metrics.errors.push({ - index: i + 1, - product: label, - error: error.message - }); - console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`); - } - } - - const endedAtMs = Date.now(); - metrics.finishedAt = new Date(endedAtMs).toISOString(); - metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2)); - metrics.successRate = metrics.total > 0 - ? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2)) - : 0; - - return metrics; -} - -async function runFullKytPipeline(options = {}) { - const cli = parseCliArgs(); - const onProgress = typeof options.onProgress === "function" ? options.onProgress : null; - const emitProgress = (stepIndex, stepKey, message) => { - if (onProgress) { - onProgress({ - stepIndex, - totalSteps: 6, - stepKey, - message - }); - } - }; - - emitProgress(1, "fetchWebsiteData", "Fetching KYT website data"); - console.log("[PIPELINE 1/6] Fetching KYT website data..."); - const dataSummary = await getKytIndiaWebsiteData(); - - let aggregatedPathForRun = DEFAULT_AGGREGATED_JSON; - let imagesDirForRun = "data/02_downloaded_product_images"; - let limitSummary = null; - - if (cli.limit) { - limitSummary = await buildLimitedAggregatedJson(DEFAULT_AGGREGATED_JSON, cli.limit); - aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/"); - imagesDirForRun = `data/02_downloaded_product_images.limit_${cli.limit}`; - console.log( - `[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}` - ); - } - - emitProgress(2, "downloadImages", "Downloading product images"); - console.log("\n[PIPELINE 2/6] Downloading product images..."); - const downloadSummary = await downloadProductImagesFromAggregatedJson({ - jsonPath: aggregatedPathForRun, - outputDir: imagesDirForRun, - concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8) - }); - logStageSummary("downloadImages", { - total: downloadSummary?.totalImagesFound ?? 0, - downloaded: downloadSummary?.downloaded ?? 0, - skipped: downloadSummary?.skipped ?? 0, - failed: downloadSummary?.failed ?? 0, - products: downloadSummary?.productsCount ?? 0 - }); - - emitProgress(3, "watermarkImages", "Applying watermark to downloaded images"); - console.log("\n[PIPELINE 3/6] Applying watermark in-place..."); - const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png"; - let watermarkSummary; - const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun); - if (!fsSync.existsSync(absWatermarkPath)) { - watermarkSummary = { - skipped: true, - reason: `Watermark file not found: ${absWatermarkPath}`, - imagesDir: path.resolve(process.cwd(), imagesDirForRun), - watermarkPath: absWatermarkPath, - totalImagesFound: 0, - processed: 0, - skippedCount: 0, - failed: 0 - }; - console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`); - } else { - const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false); - try { - watermarkSummary = await applyWatermarkToDownloadedImages({ - imagesDir: imagesDirForRun, - watermarkPath: watermarkPathForRun, - concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4) - }); - } catch (error) { - if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) { - watermarkSummary = { - skipped: true, - reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`, - imagesDir: path.resolve(process.cwd(), imagesDirForRun), - watermarkPath: absWatermarkPath, - totalImagesFound: 0, - processed: 0, - skippedCount: 0, - failed: 0 - }; - console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`); - } else { - throw error; - } - } - } - logStageSummary("watermarkImages", { - total: watermarkSummary?.totalImagesFound ?? 0, - processed: watermarkSummary?.processed ?? 0, - skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0, - failed: watermarkSummary?.failed ?? 0, - concurrency: watermarkSummary?.concurrency ?? "" - }); - - emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files"); - console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files..."); - const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true); - const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false); - let imageUploadSummary; - - if (!imageUploadEnabled) { - imageUploadSummary = { - skipped: true, - reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false", - totalTasks: 0, - processed: 0, - uploaded: 0, - failed: 0 - }; - console.log("[PIPELINE 4/6] Image upload stage skipped by config."); - } else { - try { - imageUploadSummary = await uploadKytWatermarkedImagesToShopifyFiles({ - shop: process.env.SHOPIFY_SHOP, - accessToken: process.env.SHOPIFY_ACCESS_TOKEN, - apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10", - aggregatedJsonPath: aggregatedPathForRun, - imagesDir: imagesDirForRun, - statePath: "data/04_shopify_image_upload_state.json", - mapPath: DEFAULT_UPLOADED_MAP_JSON, - concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3) - }); - } catch (error) { - const message = String(error?.message || "Unknown image upload error"); - imageUploadSummary = { - skipped: true, - reason: message, - totalTasks: 0, - processed: 0, - uploaded: 0, - failed: 0 - }; - - console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`); - if (imageUploadRequired) { - throw error; - } - console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false)."); - } - } - logStageSummary("uploadImagesToShopifyFiles", { - total: imageUploadSummary?.totalTasks ?? 0, - processed: imageUploadSummary?.processed ?? 0, - uploaded: imageUploadSummary?.uploaded ?? 0, - skipped: imageUploadSummary?.skipped ?? 0, - failed: imageUploadSummary?.failed ?? 0, - concurrency: imageUploadSummary?.concurrency ?? "" - }); - - emitProgress(5, "convertToShopifyReady", "Converting KYT data to Shopify-ready products"); - console.log("\n[PIPELINE 5/6] Converting KYT data to Shopify-ready products..."); - const conversionSummary = await convertAggregatedToShopifyReady({ - inputPath: aggregatedPathForRun, - outputPath: DEFAULT_SHOPIFY_READY_JSON, - uploadedImageMapPath: DEFAULT_UPLOADED_MAP_JSON, - imageBaseUrl: process.env.KYT_IMAGE_BASE_URL || "", - brand: process.env.SHOPIFY_BRAND || "KYT" - }); - logStageSummary("convertToShopifyReady", { - total: conversionSummary?.totalProducts ?? 0 - }); - - emitProgress(6, "upsertToShopify", "Upserting products to Shopify"); - console.log("\n[PIPELINE 6/6] Upserting products to Shopify..."); - const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({ - convertedPath: DEFAULT_SHOPIFY_READY_JSON, - shop: process.env.SHOPIFY_SHOP, - accessToken: process.env.SHOPIFY_ACCESS_TOKEN, - locationId: process.env.SHOPIFY_LOCATION_ID || null, - enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false), - apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10" - }); - logStageSummary("upsertToShopify", { - total: shopifyUpsertSummary?.total ?? 0, - processed: shopifyUpsertSummary?.processed ?? 0, - created: shopifyUpsertSummary?.created ?? 0, - updated: shopifyUpsertSummary?.updated ?? 0, - failed: shopifyUpsertSummary?.failed ?? 0, - successRate: shopifyUpsertSummary?.successRate ?? 0 - }); - - const summary = { - completedAt: new Date().toISOString(), - runLogPath: RUN_LOGGER.logPath, - limit: cli.limit || null, - limitSummary, - steps: { - fetchWebsiteData: dataSummary, - downloadImages: downloadSummary, - watermarkImages: watermarkSummary, - uploadImagesToShopifyFiles: imageUploadSummary, - convertToShopifyReady: conversionSummary, - upsertToShopify: shopifyUpsertSummary - }, - upcomingSteps: [] - }; - - emitProgress(6, "completed", "KYT pipeline completed"); - console.log("\n=== FULL PIPELINE SUMMARY ==="); - console.log(JSON.stringify(summary, null, 2)); - console.log("\n=== SUMMARY TABLE ==="); - console.table([ - { - step: "fetchWebsiteData", - total: dataSummary?.analysis?.totalProductsUnique ?? "", - processed: dataSummary?.analysis?.detailFetchedNow ?? "", - skipped: dataSummary?.analysis?.cachedDetailReused ?? "", - failed: dataSummary?.analysis?.detailFailed ?? "" - }, - { - step: "downloadImages", - total: downloadSummary?.totalImagesFound ?? "", - processed: downloadSummary?.downloaded ?? "", - skipped: downloadSummary?.skipped ?? "", - failed: downloadSummary?.failed ?? "" - }, - { - step: "watermarkImages", - total: watermarkSummary?.totalImagesFound ?? "", - processed: watermarkSummary?.processed ?? "", - skipped: watermarkSummary?.skipped ?? "", - failed: watermarkSummary?.failed ?? "" - }, - { - step: "uploadImagesToShopifyFiles", - total: imageUploadSummary?.totalTasks ?? "", - processed: imageUploadSummary?.processed ?? "", - skipped: imageUploadSummary?.skipped ?? "", - failed: imageUploadSummary?.failed ?? "" - }, - { - step: "convertToShopifyReady", - total: conversionSummary?.totalProducts ?? "", - processed: conversionSummary?.totalProducts ?? "", - skipped: "", - failed: "" - }, - { - step: "upsertToShopify", - total: shopifyUpsertSummary?.total ?? "", - processed: shopifyUpsertSummary?.processed ?? "", - skipped: "", - failed: shopifyUpsertSummary?.failed ?? "" - } - ]); - - return summary; -} - -module.exports = { - runFullKytPipeline, - convertAggregatedToShopifyReady, - upsertShopifyProductsFromConverted -}; - -if (require.main === module) { - runFullKytPipeline().catch((error) => { - console.error("Full pipeline failed:", error.message); - process.exitCode = 1; - }); -} diff --git a/src/pipelineJobs.js b/src/pipelineJobs.js index a6e6bb8..2b1a1fb 100644 --- a/src/pipelineJobs.js +++ b/src/pipelineJobs.js @@ -10,17 +10,26 @@ function getJob(jobId) { return jobs[jobId] || null; } -function canStartJob(shop) { +function getJobIdForPayload(payload = {}) { + const shop = String(payload.shop || "").trim(); + const source = String(payload.source || "kyt").trim().toLowerCase(); + if (!shop) { + return ""; + } + return source === "kyt" ? shop : `${shop}::${source}`; +} + +function canStartJob(shop, source = "kyt") { if (!shop) { return false; } - const existing = jobs[shop]; + const existing = jobs[getJobIdForPayload({ shop, source })]; return !existing || existing.status === "done" || existing.status === "error"; } function createJob(payload = {}) { - const id = String(payload.shop || "").trim(); + const id = getJobIdForPayload(payload); if (!id) { throw new Error("Shop is required to create a job."); } @@ -94,4 +103,5 @@ module.exports = { createJob, updateJob, appendJobLog, + getJobIdForPayload, }; diff --git a/src/runKytPipelineJob.js b/src/runKytPipelineJob.js index bc34969..ff6b460 100644 --- a/src/runKytPipelineJob.js +++ b/src/runKytPipelineJob.js @@ -1,7 +1,6 @@ -const path = require("node:path"); const { getToken } = require("../tokenStore"); const { log } = require("../logger"); -const { runFullKytPipeline } = require("./business-logic/kyt-pipeline/00_index"); +const { runFullSourcePipeline } = require("./business-logic/import-pipeline/runSourcePipeline"); const { updateJob, appendJobLog } = require("./pipelineJobs"); function stringifyLogArgs(args) { @@ -189,7 +188,7 @@ function deriveLiveStats(job, line) { } async function runKytPipelineJob(job) { - const { shop, limit } = job.payload || {}; + const { shop, limit, source = "kyt" } = job.payload || {}; const tokenRecord = getToken(shop); if (!tokenRecord) { @@ -223,7 +222,7 @@ async function runKytPipelineJob(job) { updateJob(job.id, { status: "running", step: "starting", - detail: `Starting KYT pipeline for ${shop}`, + detail: `Starting ${source} import pipeline for ${shop}`, }); const originalConsole = { @@ -247,7 +246,10 @@ async function runKytPipelineJob(job) { console.error = capture("error"); try { - const summary = await runFullKytPipeline({ + const summary = await runFullSourcePipeline({ + sourceKey: source, + limit, + argv: [], onProgress(progress) { updateJob(job.id, { status: "running", @@ -267,7 +269,7 @@ async function runKytPipelineJob(job) { detail: "Pipeline completed successfully", summary, }); - log(shop, `KYT pipeline completed for job ${job.id}`); + log(shop, `${source} import pipeline completed for job ${job.id}`); } finally { console.log = originalConsole.log; console.info = originalConsole.info; @@ -280,7 +282,7 @@ async function runKytPipelineJob(job) { error: error.message, detail: `Pipeline failed: ${error.message}`, }); - log(shop, `KYT pipeline failed for job ${job.id}: ${error.message}`); + log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`); } finally { process.argv = originalArgv; process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP;