Add multi-source import pipeline

This commit is contained in:
MOHAN 2026-05-14 23:57:27 +05:30
parent bef07eff10
commit 68949f124e
28 changed files with 2025 additions and 647 deletions

9
.gitignore vendored
View File

@ -1,4 +1,3 @@
@"
node_modules/
.env
.env.*
@ -6,6 +5,13 @@ dist/
build/
.next/
coverage/
# Runtime/generated data
data/
logs/
*.log
src/business-logic/import-pipeline/sources/outputs/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
@ -13,4 +19,3 @@ yarn-error.log*
Thumbs.db
.vscode/
.idea/
"@ | Out-File -Encoding utf8 .gitignore

45
package-lock.json generated
View File

@ -12,6 +12,7 @@
"cors": "^2.8.5",
"dotenv": "^17.2.0",
"express": "^5.1.0",
"playwright": "^1.59.1",
"sharp": "^0.33.5",
"uuid": "^11.1.0",
"xlsx": "^0.18.5"
@ -926,6 +927,20 @@
"node": ">= 0.8"
}
},
"node_modules/fsevents": {
"version": "2.3.2",
"resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.2.tgz",
"integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==",
"hasInstallScript": true,
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": "^8.16.0 || ^10.6.0 || >=11.0.0"
}
},
"node_modules/function-bind": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.2.tgz",
@ -1217,6 +1232,36 @@
"url": "https://opencollective.com/express"
}
},
"node_modules/playwright": {
"version": "1.59.1",
"resolved": "https://registry.npmjs.org/playwright/-/playwright-1.59.1.tgz",
"integrity": "sha512-C8oWjPR3F81yljW9o5OxcWzfh6avkVwDD2VYdwIGqTkl+OGFISgypqzfu7dOe4QNLL2aqcWBmI3PMtLIK233lw==",
"license": "Apache-2.0",
"dependencies": {
"playwright-core": "1.59.1"
},
"bin": {
"playwright": "cli.js"
},
"engines": {
"node": ">=18"
},
"optionalDependencies": {
"fsevents": "2.3.2"
}
},
"node_modules/playwright-core": {
"version": "1.59.1",
"resolved": "https://registry.npmjs.org/playwright-core/-/playwright-core-1.59.1.tgz",
"integrity": "sha512-HBV/RJg81z5BiiZ9yPzIiClYV/QMsDCKUyogwH9p3MCP6IYjUFu/MActgYAvK0oWyV9NlwM3GLBjADyWgydVyg==",
"license": "Apache-2.0",
"bin": {
"playwright-core": "cli.js"
},
"engines": {
"node": ">=18"
}
},
"node_modules/proxy-addr": {
"version": "2.0.7",
"resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz",

View File

@ -12,6 +12,7 @@
"cors": "^2.8.5",
"dotenv": "^17.2.0",
"express": "^5.1.0",
"playwright": "^1.59.1",
"sharp": "^0.33.5",
"uuid": "^11.1.0",
"xlsx": "^0.18.5"

View File

@ -2,29 +2,37 @@ const express = require("express");
const { createJob, updateJob, getJob, listJobs, canStartJob } = require("../src/pipelineJobs");
const { runKytPipelineJob } = require("../src/runKytPipelineJob");
const { getToken } = require("../tokenStore");
const { getSource, listSources } = require("../src/business-logic/import-pipeline/sources");
const router = express.Router();
router.post("/run", async (req, res) => {
const { shop, limit = null } = req.body || {};
const { shop, limit = null, source = "kyt" } = req.body || {};
if (!shop) {
return res.status(400).json({ error: "Missing shop." });
}
let sourceConfig;
try {
sourceConfig = getSource(source);
} catch (error) {
return res.status(400).json({ error: error.message });
}
if (!getToken(shop)) {
return res.status(400).json({ error: "No stored Shopify token found for this shop. Complete app auth first." });
}
if (!canStartJob(shop)) {
return res.status(409).json({ error: `A KYT pipeline job is already running for ${shop}.` });
if (!canStartJob(shop, sourceConfig.sourceKey)) {
return res.status(409).json({ error: `An import job is already running for ${shop} and source ${sourceConfig.sourceKey}.` });
}
const job = createJob({ shop, limit });
const job = createJob({ shop, limit, source: sourceConfig.sourceKey });
updateJob(job.id, {
status: "queued",
step: "queued",
detail: "Job queued",
detail: `${sourceConfig.label} import job queued`,
});
setImmediate(() => {
@ -35,10 +43,15 @@ router.post("/run", async (req, res) => {
jobId: job.id,
status: "queued",
shop,
source: sourceConfig.sourceKey,
limit,
});
});
router.get("/sources", (req, res) => {
return res.json({ sources: listSources() });
});
router.get("/status/:jobId", (req, res) => {
const job = getJob(req.params.jobId);
if (!job) {

View File

@ -0,0 +1,26 @@
const {
runFullSourcePipeline,
convertAggregatedToShopifyReady,
upsertShopifyProductsFromConverted,
} = require("../import-pipeline/runSourcePipeline");
function runFullKytPipeline(options = {}) {
return runFullSourcePipeline({
...options,
sourceKey: "kyt",
});
}
module.exports = {
runFullKytPipeline,
runFullSourcePipeline,
convertAggregatedToShopifyReady,
upsertShopifyProductsFromConverted,
};
if (require.main === module) {
runFullKytPipeline().catch((error) => {
console.error("Full pipeline failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1,7 @@
module.exports = require("../import-pipeline/sources/kyt/scraper");
if (require.main === module) {
module.exports.getKytIndiaWebsiteData().catch(() => {
process.exitCode = 1;
});
}

View File

@ -0,0 +1,13 @@
module.exports = require("../import-pipeline/shared/downloadProductImages");
if (require.main === module) {
module.exports.downloadProductImagesFromAggregatedJson()
.then((summary) => {
console.log("\nDownload summary:");
console.log(JSON.stringify(summary, null, 2));
})
.catch((error) => {
console.error("Image download failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1,13 @@
module.exports = require("../import-pipeline/shared/watermarkImages");
if (require.main === module) {
module.exports.applyWatermarkToDownloadedImages()
.then((summary) => {
console.log("\nWatermark summary:");
console.log(JSON.stringify(summary, null, 2));
})
.catch((error) => {
console.error("Watermark run failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1,8 @@
module.exports = require("../import-pipeline/shared/shopifyImageUploader");
if (require.main === module) {
module.exports.runStandaloneImageUpload().catch((error) => {
console.error("Image upload pipeline failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1 @@
module.exports = require("../import-pipeline/sources/kyt/converter");

View File

@ -0,0 +1,8 @@
module.exports = require("../import-pipeline/shared/shopifyProductUpsert");
if (require.main === module) {
module.exports.runStandaloneSelfTest().catch((err) => {
console.error("Self-test failed:", err.message);
process.exit(1);
});
}

View File

@ -0,0 +1,533 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const fsSync = require("node:fs");
const { downloadProductImagesFromAggregatedJson } = require("./shared/downloadProductImages");
const { applyWatermarkToDownloadedImages } = require("./shared/watermarkImages");
const { uploadKytWatermarkedImagesToShopifyFiles } = require("./shared/shopifyImageUploader");
const { upsertShopifyProductFull } = require("./shared/shopifyProductUpsert");
const { getSource } = require("./sources");
function nowIsoLocal() {
return new Date().toISOString();
}
function initRunLogger(logsDir) {
const absLogsDir = path.resolve(process.cwd(), logsDir);
fsSync.mkdirSync(absLogsDir, { recursive: true });
const stamp = new Date().toISOString().replace(/[:.]/g, "-");
const logPath = path.join(absLogsDir, `${stamp}.log`);
const stream = fsSync.createWriteStream(logPath, { flags: "a" });
const original = {
log: console.log.bind(console),
info: console.info.bind(console),
warn: console.warn.bind(console),
error: console.error.bind(console),
};
function writeLine(level, args) {
const text = args.map((a) => {
if (typeof a === "string") return a;
try {
return JSON.stringify(a);
} catch {
return String(a);
}
}).join(" ");
stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`);
}
console.log = (...args) => {
writeLine("LOG", args);
original.log(...args);
};
console.info = (...args) => {
writeLine("INFO", args);
original.info(...args);
};
console.warn = (...args) => {
writeLine("WARN", args);
original.warn(...args);
};
console.error = (...args) => {
writeLine("ERROR", args);
original.error(...args);
};
console.log(`[RUN-LOG] Writing logs to ${logPath}`);
return { logPath, stream, original };
}
function restoreRunLogger(logger) {
if (!logger) return;
console.log = logger.original.log;
console.info = logger.original.info;
console.warn = logger.original.warn;
console.error = logger.original.error;
logger.stream.end();
}
function loadDotEnvFile(filePath = ".env") {
const abs = path.resolve(process.cwd(), filePath);
if (!fsSync.existsSync(abs)) return;
const raw = fsSync.readFileSync(abs, "utf8");
const lines = raw.split(/\r?\n/);
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eq = trimmed.indexOf("=");
if (eq <= 0) continue;
const key = trimmed.slice(0, eq).trim();
const value = trimmed.slice(eq + 1).trim();
if (key && process.env[key] == null) {
process.env[key] = value;
}
}
}
function parseCliArgs(argv = process.argv.slice(2)) {
const out = {
limit: null,
source: null,
};
for (let i = 0; i < argv.length; i += 1) {
const a = argv[i];
if ((a === "--limit" || a === "-n") && argv[i + 1]) {
const n = Number.parseInt(argv[i + 1], 10);
if (Number.isFinite(n) && n > 0) out.limit = n;
}
if ((a === "--source" || a === "-s") && argv[i + 1]) {
out.source = argv[i + 1];
}
}
return out;
}
async function buildLimitedAggregatedJson(inputPath, limit, outputDir) {
const absInputPath = path.resolve(process.cwd(), inputPath);
const raw = await fs.readFile(absInputPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
const limited = products.slice(0, limit);
const outPath = path.resolve(process.cwd(), outputDir, `01_products_aggregated.limit_${limit}.json`);
const payload = {
...parsed,
generatedAt: new Date().toISOString(),
products: limited,
analysis: {
...(parsed.analysis || {}),
totalProductsUnique: limited.length,
limitedFrom: products.length,
limitApplied: limit,
},
};
await fs.mkdir(path.dirname(outPath), { recursive: true });
await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8");
return {
inputPath: absInputPath,
outputPath: outPath,
totalBefore: products.length,
totalAfter: limited.length,
};
}
function readBooleanEnv(name, fallback = false) {
const val = process.env[name];
if (val == null) return fallback;
return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase());
}
function logStageSummary(stepKey, summary = {}) {
const parts = Object.entries(summary)
.filter(([, value]) => value !== undefined && value !== null && value !== "")
.map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`);
console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`);
}
async function convertAggregatedToShopifyReady({
source,
inputPath,
outputPath,
uploadedImageMapPath,
imageBaseUrl,
brand,
}) {
const absInputPath = path.resolve(process.cwd(), inputPath);
const absOutputPath = path.resolve(process.cwd(), outputPath);
const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath);
const raw = await fs.readFile(absInputPath, "utf8");
const parsed = JSON.parse(raw);
let uploadedImageMap = null;
try {
const mapRaw = await fs.readFile(absUploadedMapPath, "utf8");
uploadedImageMap = JSON.parse(mapRaw);
} catch {
uploadedImageMap = null;
}
const convertedProducts = source.convertToShopifyProducts(parsed, {
imageBaseUrl,
brand,
uploadedImageMap,
});
const payload = {
generatedAt: new Date().toISOString(),
sourceKey: source.sourceKey,
sourceLabel: source.label,
sourceFile: absInputPath,
totalProducts: convertedProducts.length,
products: convertedProducts,
};
await fs.mkdir(path.dirname(absOutputPath), { recursive: true });
await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8");
return {
inputPath: absInputPath,
outputPath: absOutputPath,
uploadedImageMapPath: absUploadedMapPath,
totalProducts: convertedProducts.length,
};
}
async function upsertShopifyProductsFromConverted({
convertedPath,
shop = process.env.SHOPIFY_SHOP,
accessToken = process.env.SHOPIFY_ACCESS_TOKEN,
locationId = process.env.SHOPIFY_LOCATION_ID || null,
enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10",
} = {}) {
if (!shop) {
throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage.");
}
if (!accessToken) {
throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage.");
}
const absConvertedPath = path.resolve(process.cwd(), convertedPath);
const raw = await fs.readFile(absConvertedPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
const startedAtMs = Date.now();
const metrics = {
sourcePath: absConvertedPath,
total: products.length,
processed: 0,
created: 0,
updated: 0,
failed: 0,
errors: [],
startedAt: new Date(startedAtMs).toISOString(),
finishedAt: null,
durationSeconds: 0,
successRate: 0,
};
for (let i = 0; i < products.length; i += 1) {
const product = products[i];
const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`;
try {
const result = await upsertShopifyProductFull({
shop,
accessToken,
product,
locationId,
enableSeo,
apiVersion,
});
metrics.processed += 1;
if (result.action === "created") metrics.created += 1;
if (result.action === "updated") metrics.updated += 1;
if ((i + 1) % 10 === 0 || i === products.length - 1) {
console.log(
`[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}`
);
}
} catch (error) {
metrics.processed += 1;
metrics.failed += 1;
metrics.errors.push({
index: i + 1,
product: label,
error: error.message,
});
console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`);
}
}
const endedAtMs = Date.now();
metrics.finishedAt = new Date(endedAtMs).toISOString();
metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2));
metrics.successRate = metrics.total > 0
? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2))
: 0;
return metrics;
}
async function runFullSourcePipeline(options = {}) {
loadDotEnvFile(".env");
const cli = parseCliArgs(options.argv || process.argv.slice(2));
const source = getSource(options.sourceKey || cli.source || "kyt");
const runPaths = source.paths();
const runLogger = initRunLogger(runPaths.logsDir);
const limit = options.limit || cli.limit || null;
const onProgress = typeof options.onProgress === "function" ? options.onProgress : null;
const emitProgress = (stepIndex, stepKey, message) => {
if (onProgress) {
onProgress({
stepIndex,
totalSteps: 6,
stepKey,
message,
});
}
};
try {
const imageBaseUrl = process.env[source.envImageBaseUrl] || process.env.SOURCE_IMAGE_BASE_URL || source.defaultImageBaseUrl || "";
const brand = process.env.SOURCE_BRAND || process.env.SHOPIFY_BRAND || source.defaultBrand || source.label;
emitProgress(1, "fetchWebsiteData", `Fetching ${source.label} website data`);
console.log(`[PIPELINE 1/6] Fetching ${source.label} website data...`);
const dataSummary = await source.fetchWebsiteData({ paths: runPaths });
let aggregatedPathForRun = runPaths.aggregatedJson;
let imagesDirForRun = runPaths.downloadedImagesDir;
let limitSummary = null;
if (limit) {
limitSummary = await buildLimitedAggregatedJson(runPaths.aggregatedJson, limit, path.dirname(runPaths.aggregatedJson));
aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/");
imagesDirForRun = `${runPaths.downloadedImagesDir}.limit_${limit}`;
console.log(
`[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}`
);
}
emitProgress(2, "downloadImages", "Downloading product images");
console.log("\n[PIPELINE 2/6] Downloading product images...");
const downloadSummary = source.downloadImages
? await source.downloadImages({
aggregatedJsonPath: aggregatedPathForRun,
imagesDir: imagesDirForRun,
imageBaseUrl,
concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8),
})
: await downloadProductImagesFromAggregatedJson({
jsonPath: aggregatedPathForRun,
outputDir: imagesDirForRun,
baseImageUrl: imageBaseUrl,
concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8),
});
logStageSummary("downloadImages", {
total: downloadSummary?.totalImagesFound ?? 0,
downloaded: downloadSummary?.downloaded ?? 0,
skipped: downloadSummary?.skipped ?? 0,
failed: downloadSummary?.failed ?? 0,
products: downloadSummary?.productsCount ?? 0,
});
emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
console.log("\n[PIPELINE 3/6] Applying watermark in-place...");
const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png";
let watermarkSummary;
const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun);
if (!fsSync.existsSync(absWatermarkPath)) {
watermarkSummary = {
skipped: true,
reason: `Watermark file not found: ${absWatermarkPath}`,
imagesDir: path.resolve(process.cwd(), imagesDirForRun),
watermarkPath: absWatermarkPath,
totalImagesFound: 0,
processed: 0,
skippedCount: 0,
failed: 0,
};
console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`);
} else {
const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false);
try {
watermarkSummary = await applyWatermarkToDownloadedImages({
imagesDir: imagesDirForRun,
watermarkPath: watermarkPathForRun,
statePath: runPaths.watermarkState,
concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4),
});
} catch (error) {
if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) {
watermarkSummary = {
skipped: true,
reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`,
imagesDir: path.resolve(process.cwd(), imagesDirForRun),
watermarkPath: absWatermarkPath,
totalImagesFound: 0,
processed: 0,
skippedCount: 0,
failed: 0,
};
console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`);
} else {
throw error;
}
}
}
logStageSummary("watermarkImages", {
total: watermarkSummary?.totalImagesFound ?? 0,
processed: watermarkSummary?.processed ?? 0,
skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0,
failed: watermarkSummary?.failed ?? 0,
concurrency: watermarkSummary?.concurrency ?? "",
});
emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true);
const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false);
let imageUploadSummary;
if (!imageUploadEnabled) {
imageUploadSummary = {
skipped: true,
reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false",
totalTasks: 0,
processed: 0,
uploaded: 0,
failed: 0,
};
console.log("[PIPELINE 4/6] Image upload stage skipped by config.");
} else {
try {
const imageUploadInput = {
shop: process.env.SHOPIFY_SHOP,
accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
aggregatedJsonPath: aggregatedPathForRun,
imagesDir: imagesDirForRun,
statePath: runPaths.imageUploadState,
mapPath: runPaths.uploadedMapJson,
concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3),
};
imageUploadSummary = source.uploadImagesToShopifyFiles
? await source.uploadImagesToShopifyFiles(imageUploadInput)
: await uploadKytWatermarkedImagesToShopifyFiles(imageUploadInput);
} catch (error) {
const message = String(error?.message || "Unknown image upload error");
imageUploadSummary = {
skipped: true,
reason: message,
totalTasks: 0,
processed: 0,
uploaded: 0,
failed: 0,
};
console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`);
if (imageUploadRequired) {
throw error;
}
console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false).");
}
}
logStageSummary("uploadImagesToShopifyFiles", {
total: imageUploadSummary?.totalTasks ?? 0,
processed: imageUploadSummary?.processed ?? 0,
uploaded: imageUploadSummary?.uploaded ?? 0,
skipped: imageUploadSummary?.skipped ?? 0,
failed: imageUploadSummary?.failed ?? 0,
concurrency: imageUploadSummary?.concurrency ?? "",
});
emitProgress(5, "convertToShopifyReady", `Converting ${source.label} data to Shopify-ready products`);
console.log(`\n[PIPELINE 5/6] Converting ${source.label} data to Shopify-ready products...`);
const conversionSummary = await convertAggregatedToShopifyReady({
source,
inputPath: aggregatedPathForRun,
outputPath: runPaths.shopifyReadyJson,
uploadedImageMapPath: runPaths.uploadedMapJson,
imageBaseUrl,
brand,
});
logStageSummary("convertToShopifyReady", {
total: conversionSummary?.totalProducts ?? 0,
});
emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({
convertedPath: runPaths.shopifyReadyJson,
shop: process.env.SHOPIFY_SHOP,
accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
locationId: process.env.SHOPIFY_LOCATION_ID || null,
enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
});
logStageSummary("upsertToShopify", {
total: shopifyUpsertSummary?.total ?? 0,
processed: shopifyUpsertSummary?.processed ?? 0,
created: shopifyUpsertSummary?.created ?? 0,
updated: shopifyUpsertSummary?.updated ?? 0,
failed: shopifyUpsertSummary?.failed ?? 0,
successRate: shopifyUpsertSummary?.successRate ?? 0,
});
const summary = {
source: {
key: source.sourceKey,
label: source.label,
},
paths: runPaths,
completedAt: new Date().toISOString(),
runLogPath: runLogger.logPath,
limit: limit || null,
limitSummary,
steps: {
fetchWebsiteData: dataSummary,
downloadImages: downloadSummary,
watermarkImages: watermarkSummary,
uploadImagesToShopifyFiles: imageUploadSummary,
convertToShopifyReady: conversionSummary,
upsertToShopify: shopifyUpsertSummary,
},
upcomingSteps: [],
};
emitProgress(6, "completed", `${source.label} pipeline completed`);
console.log("\n=== FULL PIPELINE SUMMARY ===");
console.log(JSON.stringify(summary, null, 2));
return summary;
} finally {
restoreRunLogger(runLogger);
}
}
module.exports = {
runFullSourcePipeline,
convertAggregatedToShopifyReady,
upsertShopifyProductsFromConverted,
};
if (require.main === module) {
runFullSourcePipeline().catch((error) => {
console.error("Full pipeline failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -21,7 +21,7 @@ function encodePathKeepingSlashes(relativePath) {
}
function getExtFromPathOrType(imagePath, contentType) {
const parsed = path.extname(imagePath || "").toLowerCase();
const parsed = path.extname(getPathWithoutQuery(imagePath)).toLowerCase();
if (parsed) {
return parsed;
}
@ -38,6 +38,35 @@ function getExtFromPathOrType(imagePath, contentType) {
return ".jpg";
}
function isAbsoluteUrl(value) {
return /^https?:\/\//i.test(String(value || ""));
}
function getPathWithoutQuery(value) {
const raw = String(value || "");
try {
return new URL(raw).pathname;
} catch {
return raw.split(/[?#]/)[0];
}
}
function buildImageUrl(imagePath, baseImageUrl) {
if (isAbsoluteUrl(imagePath)) {
return String(imagePath);
}
const encoded = encodePathKeepingSlashes(imagePath);
return `${String(baseImageUrl || "").replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`;
}
function getFilePartsFromImagePath(imagePath, fallbackIndex) {
const cleanPath = getPathWithoutQuery(imagePath);
const originalBase = path.basename(cleanPath, path.extname(cleanPath)) || `image_${fallbackIndex + 1}`;
const originalExt = path.extname(cleanPath) || ".png";
return { originalBase, originalExt };
}
async function downloadWithRetry(url, retries = 3) {
let lastError;
@ -129,11 +158,8 @@ async function downloadProductImagesFromAggregatedJson(options = {}) {
for (let idx = 0; idx < imagePaths.length; idx += 1) {
const relPath = imagePaths[idx];
const encoded = encodePathKeepingSlashes(relPath);
const imageUrl = `${baseImageUrl.replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`;
const originalBase = path.basename(String(relPath || ""), path.extname(String(relPath || ""))) || `image_${idx + 1}`;
const originalExt = path.extname(String(relPath || "")) || ".png";
const imageUrl = buildImageUrl(relPath, baseImageUrl);
const { originalBase, originalExt } = getFilePartsFromImagePath(relPath, idx);
let localBase = sanitizeName(originalBase) || `image_${idx + 1}`;
let fileName = `${localBase}${originalExt}`;

View File

@ -39,6 +39,15 @@ function sanitizeName(value) {
.slice(0, 150);
}
function getPathWithoutQuery(value) {
const raw = String(value || "");
try {
return new URL(raw).pathname;
} catch {
return raw.split(/[?#]/)[0];
}
}
function getMimeType(filePath) {
const ext = path.extname(filePath).toLowerCase();
if (ext === ".png") return "image/png";
@ -258,8 +267,9 @@ function buildLocalImageTasks(aggregatedPayload, imagesDir) {
for (let idx = 0; idx < imagePaths.length; idx += 1) {
const sourcePath = imagePaths[idx];
const originalBase = path.basename(String(sourcePath || ""), path.extname(String(sourcePath || ""))) || `image_${idx + 1}`;
const originalExt = path.extname(String(sourcePath || "")) || ".png";
const cleanSourcePath = getPathWithoutQuery(sourcePath);
const originalBase = path.basename(cleanSourcePath, path.extname(cleanSourcePath)) || `image_${idx + 1}`;
const originalExt = path.extname(cleanSourcePath) || ".png";
const localBase = sanitizeName(originalBase) || `image_${idx + 1}`;
let fileName = `${localBase}${originalExt}`;

View File

@ -98,6 +98,207 @@ function normalizeFitmentTags(input) {
return { fitmentMap, fitmentFlat };
}
function normalizeOptionKey(optionValues = []) {
return optionValues
.map((item) => `${String(item.name || "").trim().toLowerCase()}=${String(item.value || "").trim().toLowerCase()}`)
.sort()
.join("|");
}
function getDesiredOptions(attrs) {
const options = Array.isArray(attrs.options) ? attrs.options : [];
return options
.map((option) => ({
name: String(option.name || "").trim(),
values: Array.from(new Set((Array.isArray(option.values) ? option.values : []).map((value) => String(value || "").trim()).filter(Boolean))),
}))
.filter((option) => option.name && option.values.length);
}
function getDesiredVariants(attrs, finalPrice, compareAtPrice) {
const variants = Array.isArray(attrs.variants) ? attrs.variants : [];
if (!variants.length) {
return [
{
sku: attrs.part_number || "",
price: finalPrice,
compareAtPrice,
quantity: Number(attrs.total_quantity ?? attrs.quantity ?? 0),
optionValues: [],
},
];
}
return variants.map((variant, index) => ({
sku: String(variant.sku || attrs.part_number || `${attrs.part_number || "variant"}-${index + 1}`).slice(0, 64),
price: Number(variant.price ?? finalPrice),
compareAtPrice: variant.compare_price != null ? Number(variant.compare_price) : compareAtPrice,
quantity: Number(variant.quantity ?? attrs.total_quantity ?? attrs.quantity ?? 0),
optionValues: Array.isArray(variant.optionValues)
? variant.optionValues
.map((item) => ({
name: String(item.name || "").trim(),
value: String(item.value || "").trim(),
}))
.filter((item) => item.name && item.value)
: [],
}));
}
function buildVariantBulkInput(variant, attrs, weightValue, variantId = null) {
const input = {
...(variantId ? { id: variantId } : {}),
price: Number(variant.price || 0).toFixed(2),
...(variant.compareAtPrice !== null && variant.compareAtPrice !== undefined
? { compareAtPrice: Number(variant.compareAtPrice).toFixed(2) }
: {}),
...(attrs.barcode ? { barcode: attrs.barcode } : {}),
inventoryItem: {
sku: variant.sku || attrs.part_number || "",
measurement: { weight: { value: weightValue, unit: "POUNDS" } },
},
};
if (!variantId && variant.optionValues?.length) {
input.optionValues = variant.optionValues.map((optionValue) => ({
name: optionValue.value,
optionName: optionValue.name,
}));
}
return input;
}
async function ensureProductOptions(client, productId, currentOptions, desiredOptions) {
if (!desiredOptions.length) {
return currentOptions || [];
}
const existingNames = new Set((currentOptions || []).map((option) => String(option.name || "").toLowerCase()));
const missingOptions = desiredOptions.filter((option) => !existingNames.has(option.name.toLowerCase()));
if (!missingOptions.length) {
return currentOptions || [];
}
const created = await gql(
client,
`mutation($productId: ID!, $options: [OptionCreateInput!]!, $variantStrategy: ProductOptionCreateVariantStrategy) {
productOptionsCreate(productId: $productId, options: $options, variantStrategy: $variantStrategy) {
product {
id
options { id name values position optionValues { id name hasVariants } }
}
userErrors { field message code }
}
}`,
{
productId,
options: missingOptions.map((option) => ({
name: option.name,
values: option.values.map((value) => ({ name: value })),
})),
variantStrategy: "CREATE",
}
);
const errs = created.productOptionsCreate.userErrors || [];
if (errs.length) {
throw new Error(`productOptionsCreate failed: ${errs.map((e) => e.message).join(", ")}`);
}
return created.productOptionsCreate.product?.options || currentOptions || [];
}
async function fetchProductVariantState(client, productId) {
const data = await gql(
client,
`query($id: ID!) {
product(id: $id) {
id
options { id name values position optionValues { id name hasVariants } }
variants(first: 250) {
nodes {
id
sku
selectedOptions { name value }
inventoryItem { id }
}
}
}
}`,
{ id: productId }
);
return data.product || { id: productId, options: [], variants: { nodes: [] } };
}
async function syncProductVariants({ client, productId, attrs, desiredVariants, desiredOptions, weightValue }) {
let productState = await fetchProductVariantState(client, productId);
await ensureProductOptions(client, productId, productState.options || [], desiredOptions);
productState = await fetchProductVariantState(client, productId);
const existingVariants = productState.variants?.nodes || [];
const existingByOptions = new Map();
for (const variant of existingVariants) {
existingByOptions.set(normalizeOptionKey(variant.selectedOptions || []), variant);
}
const variantsToUpdate = [];
const variantsToCreate = [];
desiredVariants.forEach((variant, index) => {
const key = normalizeOptionKey(variant.optionValues || []);
const existing = existingByOptions.get(key) || (index === 0 ? existingVariants[0] : null);
if (existing?.id) {
variantsToUpdate.push(buildVariantBulkInput(variant, attrs, weightValue, existing.id));
} else {
variantsToCreate.push(buildVariantBulkInput(variant, attrs, weightValue));
}
});
const syncedVariants = [];
if (variantsToUpdate.length) {
const updated = await gql(
client,
`mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) {
productVariantsBulkUpdate(productId: $productId, variants: $variants) {
productVariants { id sku inventoryItem { id sku } }
userErrors { field message }
}
}`,
{ productId, variants: variantsToUpdate }
);
if (updated.productVariantsBulkUpdate.userErrors?.length) {
throw new Error(`variant update failed: ${updated.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
syncedVariants.push(...(updated.productVariantsBulkUpdate.productVariants || []));
}
if (variantsToCreate.length) {
const created = await gql(
client,
`mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!, $strategy: ProductVariantsBulkCreateStrategy) {
productVariantsBulkCreate(productId: $productId, variants: $variants, strategy: $strategy) {
productVariants { id sku inventoryItem { id sku } selectedOptions { name value } }
userErrors { field message }
}
}`,
{ productId, variants: variantsToCreate, strategy: "REMOVE_STANDALONE_VARIANT" }
);
if (created.productVariantsBulkCreate.userErrors?.length) {
throw new Error(`variant create failed: ${created.productVariantsBulkCreate.userErrors.map((e) => e.message).join(", ")}`);
}
syncedVariants.push(...(created.productVariantsBulkCreate.productVariants || []));
}
return syncedVariants;
}
async function generateSeo(attrs, seoClient) {
const seoInput = {
product_name: attrs.product_name,
@ -305,8 +506,6 @@ async function upsertShopifyProductFull({
const existing = existingSearch.products.nodes?.[0];
let productId;
let variantId;
let inventoryItemId;
let action;
if (existing) {
@ -336,8 +535,6 @@ async function upsertShopifyProductFull({
if (errs.length) throw new Error(`productUpdate failed: ${errs.map((e) => e.message).join(", ")}`);
productId = existing.id;
variantId = existing.variants?.nodes?.[0]?.id;
inventoryItemId = existing.variants?.nodes?.[0]?.inventoryItem?.id;
action = "updated";
} else {
const crt = await gql(
@ -370,13 +567,9 @@ async function upsertShopifyProductFull({
if (errs.length) throw new Error(`productCreate failed: ${errs.map((e) => e.message).join(", ")}`);
productId = crt.productCreate.product.id;
variantId = crt.productCreate.product.variants?.nodes?.[0]?.id;
inventoryItemId = crt.productCreate.product.variants?.nodes?.[0]?.inventoryItem?.id;
action = "created";
}
if (!variantId) throw new Error("No variant ID found after upsert.");
const { priceType, percentage } = await getPricingConfig(client);
const basePrice = Number(attrs.price || 0);
let finalPrice = basePrice;
@ -386,37 +579,25 @@ async function upsertShopifyProductFull({
const compareAtPrice = attrs.compare_price != null ? Number(attrs.compare_price) : null;
const weightValue = Number(attrs.dimensions?.[0]?.weight || 0);
const bulk = await gql(
const desiredOptions = getDesiredOptions(attrs);
const desiredVariants = getDesiredVariants(attrs, finalPrice, compareAtPrice);
const syncedVariants = await syncProductVariants({
client,
`mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) {
productVariantsBulkUpdate(productId: $productId, variants: $variants) {
productVariants { id price barcode inventoryItem { id sku } }
userErrors { field message }
}
}`,
{
productId,
variants: [
{
id: variantId,
price: Number(finalPrice).toFixed(2),
...(compareAtPrice !== null ? { compareAtPrice: Number(compareAtPrice).toFixed(2) } : {}),
...(attrs.barcode ? { barcode: attrs.barcode } : {}),
inventoryItem: {
sku: attrs.part_number || "",
measurement: { weight: { value: weightValue, unit: "POUNDS" } },
},
},
],
}
);
attrs,
desiredVariants,
desiredOptions,
weightValue,
});
if (bulk.productVariantsBulkUpdate.userErrors?.length) {
throw new Error(`variant update failed: ${bulk.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
const inventoryTargets = syncedVariants
.map((variant, index) => ({
inventoryItemId: variant.inventoryItem?.id,
quantity: Number(desiredVariants[index]?.quantity ?? attrs.total_quantity ?? attrs.quantity ?? 0),
}))
.filter((item) => item.inventoryItemId);
if (inventoryItemId) {
for (const inventoryTarget of inventoryTargets) {
const inv = await gql(
client,
`mutation($id: ID!, $input: InventoryItemInput!) {
@ -426,7 +607,7 @@ async function upsertShopifyProductFull({
}
}`,
{
id: inventoryItemId,
id: inventoryTarget.inventoryItemId,
input: {
cost: Number(attrs.purchase_cost || 0),
tracked: true,
@ -437,9 +618,11 @@ async function upsertShopifyProductFull({
if (inv.inventoryItemUpdate.userErrors?.length) {
throw new Error(`inventoryItemUpdate failed: ${inv.inventoryItemUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
}
if (locationId) {
if (locationId && inventoryTargets.length) {
try {
for (const inventoryTarget of inventoryTargets) {
const act = await gql(
client,
`mutation($inventoryItemId: ID!, $locationId: ID!) {
@ -448,14 +631,14 @@ async function upsertShopifyProductFull({
userErrors { field message }
}
}`,
{ inventoryItemId, locationId }
{ inventoryItemId: inventoryTarget.inventoryItemId, locationId }
);
if (act.inventoryActivate.userErrors?.length) {
throw new Error(`inventoryActivate failed: ${act.inventoryActivate.userErrors.map((e) => e.message).join(", ")}`);
}
}
const totalQty = Number(attrs.total_quantity ?? attrs.quantity ?? 0);
const setQty = await gql(
client,
`mutation($input: InventorySetQuantitiesInput!) {
@ -468,14 +651,12 @@ async function upsertShopifyProductFull({
name: "available",
reason: "correction",
ignoreCompareQuantity: true,
quantities: [
{
inventoryItemId,
quantities: inventoryTargets.map((inventoryTarget) => ({
inventoryItemId: inventoryTarget.inventoryItemId,
locationId,
quantity: totalQty,
quantity: inventoryTarget.quantity,
compareQuantity: 0,
},
],
})),
},
}
);
@ -488,10 +669,9 @@ async function upsertShopifyProductFull({
`[INVENTORY-WARN] Skipping inventory activation/set for product ${productId}. Reason: ${inventoryError.message}`
);
}
} else {
} else if (!locationId) {
console.log(`[INVENTORY-WARN] No locationId provided. Skipping inventory activation/set for product ${productId}.`);
}
}
await publishToOnlineStore(client, productId);
@ -527,8 +707,9 @@ async function upsertShopifyProductFull({
ok: true,
action,
productId,
variantId,
inventoryItemId,
variantId: syncedVariants[0]?.id || null,
inventoryItemId: syncedVariants[0]?.inventoryItem?.id || null,
variantsSynced: syncedVariants.length,
handle,
};
}

View File

@ -0,0 +1,52 @@
# Import Source Modules
Each client website gets one source folder here. A source owns the parts that change from website to website:
- where scraped data is stored
- how website data is fetched
- how raw website records are converted to Shopify-ready products
- optionally, how image download/upload tasks are discovered if the raw data shape is different
The shared pipeline still handles:
- job progress
- image watermarking
- Shopify Files upload state
- Shopify product upsert
- logs and summaries
## Folder Shape
Every client should follow the same shape:
```txt
sources/
client-key/
index.js source metadata, paths, fetchWebsiteData orchestration
scraper.js website/API scraping only
converter.js source-normalized JSON to Shopify-ready products
```
The source registry loads each folder through its `index.js`.
## Source Contract
`index.js` exports an object like this:
```js
module.exports = {
sourceKey: "client-key",
label: "Client Name",
defaultBrand: "Client Brand",
defaultImageBaseUrl: "https://example.com/images",
envImageBaseUrl: "CLIENT_IMAGE_BASE_URL",
paths,
fetchWebsiteData,
convertToShopifyProducts,
// optional:
// downloadImages,
// uploadImagesToShopifyFiles,
};
```
Register the new source in `sources/index.js`.

View File

@ -0,0 +1,250 @@
const path = require("node:path");
function slugify(str) {
return String(str || "")
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-+|-+$/g, "");
}
function escapeHtml(input) {
return String(input || "")
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&#39;");
}
function cleanText(value) {
return String(value || "")
.replace(/\u2026/g, "...")
.replace(/\s+\n/g, "\n")
.replace(/\n\s+/g, "\n")
.replace(/[ \t]+/g, " ")
.replace(/\n{3,}/g, "\n\n")
.trim();
}
function getUploadedImageUrl(imgPath, uploadedImageMap) {
if (!uploadedImageMap || typeof uploadedImageMap !== "object") return null;
const bySourcePath = uploadedImageMap.bySourcePath && typeof uploadedImageMap.bySourcePath === "object"
? uploadedImageMap.bySourcePath
: {};
return bySourcePath[String(imgPath)]?.url || null;
}
function getImageFileName(imagePath) {
try {
return path.basename(new URL(imagePath).pathname);
} catch {
return path.basename(String(imagePath || "").split(/[?#]/)[0]);
}
}
function descriptionToHtml(description, sections = []) {
const blocks = [];
const clean = cleanText(description);
if (clean) {
blocks.push(
...clean
.split(/\n{2,}/)
.map((block) => `<p>${escapeHtml(block).replace(/\n/g, "<br/>")}</p>`)
);
}
for (const section of sections) {
const title = cleanText(section.title);
const paragraphs = Array.isArray(section.paragraphs) ? section.paragraphs.map(cleanText).filter(Boolean) : [];
const bullets = Array.isArray(section.bullets) ? section.bullets.map(cleanText).filter(Boolean) : [];
if (title) {
blocks.push(`<h3>${escapeHtml(title)}</h3>`);
}
for (const paragraph of paragraphs) {
blocks.push(`<p>${escapeHtml(paragraph)}</p>`);
}
if (bullets.length) {
blocks.push(`<ul>${bullets.map((bullet) => `<li>${escapeHtml(bullet)}</li>`).join("")}</ul>`);
}
}
return blocks.join("\n");
}
function normalizeOptionName(name) {
const clean = cleanText(name).replace(/^select\s+/i, "");
return clean || "Option";
}
function getSourceOptions(record, data) {
const options = Array.isArray(record?.sourceOptions) ? record.sourceOptions : Array.isArray(data?.options) ? data.options : [];
return options
.map((option) => {
const name = normalizeOptionName(option.name || option.label);
const values = Array.isArray(option.values) ? option.values : [];
return {
name,
values: values
.map((value) => ({
label: cleanText(value.label || value.name),
sourceValue: value.value || null,
selected: Boolean(value.selected),
}))
.filter((value) => value.label && !/^choose options$/i.test(value.label)),
};
})
.filter((option) => option.values.length);
}
function buildOptionCombinations(options) {
if (!options.length) {
return [];
}
return options.reduce((combinations, option) => {
const next = [];
for (const combo of combinations.length ? combinations : [[]]) {
for (const value of option.values) {
next.push([...combo, { name: option.name, value: value.label, sourceValue: value.sourceValue }]);
}
}
return next;
}, []);
}
function buildVariants({ options, baseSku, price, quantity, compareAtPrice = null }) {
const combinations = buildOptionCombinations(options);
if (!combinations.length) {
return [
{
sku: baseSku,
price,
compare_price: compareAtPrice,
quantity,
optionValues: [],
},
];
}
return combinations.map((combo, index) => {
const suffix = combo
.map((item) => slugify(item.value).slice(0, 28))
.filter(Boolean)
.join("-");
return {
sku: index === 0 ? baseSku : `${baseSku}-${suffix || index + 1}`.slice(0, 64),
price,
compare_price: compareAtPrice,
quantity,
optionValues: combo,
};
});
}
function convertToShopifyProducts(input, options = {}) {
const records = Array.isArray(input?.products) ? input.products : [];
const brand = options.brand || "Brock's Performance";
return records.map((record, index) => {
const data = record?.productDetails?.data || {};
const sourceRecord = record?.sourceRecord || {};
const productId = record?.productId || data.id || slugify(sourceRecord.url || data.name || `brocks-${index + 1}`);
const title = cleanText(data.name || sourceRecord.title || `Brock's Performance Product ${index + 1}`);
const sku = cleanText(data.code || sourceRecord.sku || productId);
const imagePaths = Array.isArray(data.img) ? data.img : [];
const sourceOptions = getSourceOptions(record, data);
const quantity = Number(data?.stocks?.total_units ?? sourceRecord?.stock?.currentStock ?? 0) || 0;
const descriptionHtml = descriptionToHtml(
data?.details?.description || sourceRecord.description,
data?.details?.descriptionSections || record?.descriptionSections || sourceRecord.descriptionSections || []
);
const price = Number(data?.cost?.mrp ?? sourceRecord.price ?? 0) || 0;
const files = imagePaths
.map((imagePath) => ({
type: "Image",
url: getUploadedImageUrl(imagePath, options.uploadedImageMap) || imagePath,
media_content: getImageFileName(imagePath),
source_path: imagePath,
}))
.filter((file) => file.url);
const variants = buildVariants({
options: sourceOptions,
baseSku: sku,
price,
quantity,
});
const category = "Exhaust";
const subcategory = "Titanium Full Systems, Suzuki";
const tags = [
brand,
category,
"Titanium Full Systems",
"Suzuki",
"Motorcycle Exhaust",
sku,
].filter(Boolean);
return {
id: productId,
source: {
productId,
sourceKey: "brocks-performance",
url: sourceRecord.url || data.url || null,
image_paths: imagePaths,
},
attributes: {
product_name: title,
brand,
category,
subcategory,
part_number: sku,
mfr_part_number: sku,
price,
compare_price: null,
purchase_cost: null,
barcode: "",
price_group: null,
units_per_sku: null,
part_description: descriptionHtml,
descriptions: descriptionHtml
? [{ type: "Market Description", description: descriptionHtml }]
: [],
files,
image_paths: imagePaths,
dimensions: [{ weight: 0 }],
total_quantity: quantity,
inventorydata: {
inventory: {
main: quantity,
},
},
options: sourceOptions.map((option) => ({
name: option.name,
values: option.values.map((value) => value.label),
})),
variants,
fitmentTags: {
make: ["Suzuki"],
model: [],
year: [],
drive: [],
baseModel: [],
},
tags,
handle: slugify(`${brand}-${title}-${productId}`),
source_url: sourceRecord.url || data.url || null,
page_meta: sourceRecord.pageMeta || record.pageMeta || {},
},
};
});
}
module.exports = {
cleanText,
convertToShopifyProducts,
slugify,
};

View File

@ -0,0 +1,215 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const { cleanText, scrapeBrocksPerformanceProducts, TARGET_URL } = require("./scraper");
const { convertToShopifyProducts, slugify } = require("./converter");
const sourceKey = "brocks-performance";
const label = "Brock's Performance";
const dataDir = path.join("data", "sources", sourceKey);
function paths() {
return {
aggregatedJson: path.join(dataDir, "01_products_aggregated.json"),
historyJson: path.join(dataDir, "01_products_run_history.json"),
rawJson: path.join(dataDir, "01_raw_scrape.json"),
downloadedImagesDir: path.join(dataDir, "02_downloaded_product_images"),
watermarkState: path.join(dataDir, "03_watermark_state.json"),
imageUploadState: path.join(dataDir, "04_shopify_image_upload_state.json"),
uploadedMapJson: path.join(dataDir, "04_shopify_uploaded_images_map.json"),
shopifyReadyJson: path.join(dataDir, "05_shopify_products_ready.json"),
logsDir: path.join(dataDir, "99_run_logs"),
};
}
function uniqueBy(values, keyFn) {
const seen = new Set();
const out = [];
for (const value of values) {
const key = keyFn(value);
if (!key || seen.has(key)) {
continue;
}
seen.add(key);
out.push(value);
}
return out;
}
function bestImageUrl(image) {
if (!image) {
return null;
}
if (typeof image === "string") {
return image;
}
const variants = Array.isArray(image.variants) ? image.variants.filter(Boolean) : [];
return variants[variants.length - 1] || image.src || null;
}
function getImageList(record) {
const candidates = [
record.image,
...(Array.isArray(record.images) ? record.images.map(bestImageUrl) : []),
].filter(Boolean);
return uniqueBy(candidates, (url) => String(url).split("?")[0]);
}
function normalizeOptions(options = []) {
return options
.map((option) => {
const name = cleanText(option.label) || "Option";
const values = Array.isArray(option.values) ? option.values : [];
const normalizedValues = values
.map((value) => ({
label: cleanText(value.label),
value: value.value || null,
selected: Boolean(value.selected || value.checked),
disabled: Boolean(value.disabled),
}))
.filter((value) => value.label && !/^choose options$/i.test(value.label));
return {
name,
required: Boolean(option.required),
type: option.type || null,
values: uniqueBy(normalizedValues, (value) => value.label.toLowerCase()),
};
})
.filter((option) => option.values.length > 0);
}
function getStockQuantity(record) {
const raw = record?.stock?.currentStock || record?.productInfo?.["Quantity Available"] || "";
const numeric = Number.parseInt(String(raw).replace(/[^0-9-]/g, ""), 10);
return Number.isFinite(numeric) ? numeric : 0;
}
function normalizeRecord(record, index) {
const title = cleanText(record.title) || `Brock's Performance Product ${index + 1}`;
const sku = cleanText(record.sku) || slugify(title);
const productId = record.productId || sku || slugify(record.url || title);
const imageList = getImageList(record);
const price = Number.isFinite(record.price) ? record.price : 0;
const description = cleanText(record.description);
const options = normalizeOptions(record.options);
const stockQuantity = getStockQuantity(record);
return {
productId,
sourceKey,
sourceRecord: record,
sourceOptions: options,
sourceImages: Array.isArray(record.images) ? record.images : [],
descriptionSections: Array.isArray(record.descriptionSections) ? record.descriptionSections : [],
pageMeta: record.pageMeta || {},
reviews: record.reviews || {},
productSummary: {
id: productId,
name: title,
cost: { mrp: price },
img: imageList,
sub_category_name: "Titanium Full Systems",
model_series_name: "Suzuki",
},
productDetails: {
servercode: "OK",
data: {
id: productId,
code: sku,
name: title,
category_name: "Exhaust",
sub_category_name: "Titanium Full Systems",
model_series_name: "Suzuki",
cost: { mrp: price },
stocks: { total_units: stockQuantity },
img: imageList,
url: record.url,
mpn: record.mpn || null,
brand: record.brand || null,
availability: record.availability || null,
currency: record.currency || null,
canonicalUrl: record.canonicalUrl || null,
productInfo: record.productInfo || {},
options,
details: {
description,
specifications: [
{ name: "SKU", text: sku },
{ name: "MPN", text: record.mpn },
{ name: "Brand", text: record.brand },
{ name: "Availability", text: record.availability },
{ name: "Quantity Available", text: record?.stock?.display },
{ name: "Source URL", text: record.url },
].filter((spec) => spec.text),
weight: {},
descriptionSections: Array.isArray(record.descriptionSections) ? record.descriptionSections : [],
},
},
},
};
}
async function fetchWebsiteData({ paths: runPaths }) {
const targetUrl = process.env.BROCKS_PERFORMANCE_TARGET_URL || TARGET_URL;
const rawProducts = await scrapeBrocksPerformanceProducts(targetUrl);
const products = rawProducts.map((record, index) => normalizeRecord(record, index));
const now = new Date().toISOString();
const analysis = {
timestamp: now,
targetUrl,
rawProductsFound: rawProducts.length,
totalProductsUnique: products.length,
detailSuccess: products.length,
detailFailed: 0,
cachedDetailReused: 0,
detailFetchedNow: products.length,
};
const payload = {
generatedAt: now,
sourceKey,
sourceLabel: label,
analysis,
products,
};
await fs.mkdir(path.dirname(runPaths.aggregatedJson), { recursive: true });
await fs.writeFile(runPaths.rawJson, `${JSON.stringify(rawProducts, null, 2)}\n`, "utf8");
await fs.writeFile(runPaths.aggregatedJson, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
let history = [];
try {
history = JSON.parse(await fs.readFile(runPaths.historyJson, "utf8"));
if (!Array.isArray(history)) history = [];
} catch {
history = [];
}
history.push(analysis);
await fs.writeFile(runPaths.historyJson, `${JSON.stringify(history, null, 2)}\n`, "utf8");
console.log(`[BROCKS] Saved ${products.length} products to ${runPaths.aggregatedJson}`);
return {
analysis,
outputJsonPath: path.resolve(process.cwd(), runPaths.aggregatedJson),
rawJsonPath: path.resolve(process.cwd(), runPaths.rawJson),
historyPath: path.resolve(process.cwd(), runPaths.historyJson),
};
}
module.exports = {
sourceKey,
label,
defaultBrand: "Brock's Performance",
defaultImageBaseUrl: "",
envImageBaseUrl: "BROCKS_PERFORMANCE_IMAGE_BASE_URL",
paths,
fetchWebsiteData,
convertToShopifyProducts,
};

View File

@ -0,0 +1,411 @@
const { chromium } = require('playwright');
const fs = require('fs/promises');
const path = require('path');
const { URL } = require('url');
const TARGET_URL = 'https://brocksperformance.com/exhausts/titanium-full-systems/suzuki/';
const OUTPUT_DIR = path.join(__dirname, '..', 'outputs');
const OUTPUT_FILE = path.join(OUTPUT_DIR, 'brocksperformance-suzuki-exhausts-full-details.json');
function cleanText(value) {
return String(value || '')
.replace(/\u2026/g, '...')
.replace(/\s+\n/g, '\n')
.replace(/\n\s+/g, '\n')
.replace(/[ \t]+/g, ' ')
.replace(/\n{3,}/g, '\n\n')
.trim();
}
function toNumber(value) {
if (!value) return null;
const cleaned = String(value).replace(/[^0-9.\-]/g, '');
const n = Number.parseFloat(cleaned);
return Number.isNaN(n) ? null : n;
}
async function expandLoadMore(page) {
// Click any obvious "load more" buttons until they disappear
const loadMoreSelectors = ['button.load-more', '.load-more', 'button.show-more', '.show-more'];
for (const sel of loadMoreSelectors) {
// Try repeatedly because some sites require multiple clicks
while (true) {
const btn = await page.$(sel);
if (!btn) break;
try {
await Promise.all([page.waitForResponse(r => r.status() === 200, { timeout: 5000 }).catch(() => {}), btn.click()]);
await page.waitForTimeout(800);
} catch (e) {
break;
}
}
}
}
async function autoScroll(page, timeout = 2000) {
const start = Date.now();
let lastHeight = await page.evaluate(() => document.body.scrollHeight);
while (Date.now() - start < timeout) {
await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight));
await page.waitForTimeout(300);
const newHeight = await page.evaluate(() => document.body.scrollHeight);
if (newHeight === lastHeight) break;
lastHeight = newHeight;
}
}
async function scrape(targetUrl = TARGET_URL, options = {}) {
const outputFile = options.outputFile || OUTPUT_FILE;
const writeOutput = options.writeOutput !== false;
const onDetailProgress = typeof options.onDetailProgress === 'function' ? options.onDetailProgress : null;
const browser = await chromium.launch({ headless: true });
const page = await browser.newPage();
await page.setExtraHTTPHeaders({ 'accept-language': 'en-US,en;q=0.9' });
try {
await page.goto(targetUrl, { waitUntil: 'networkidle' });
// Try to load more items (click buttons, then scroll)
await expandLoadMore(page);
await autoScroll(page, 3000);
// BigCommerce category pages render product cards as li.product inside .productGrid.
const candidateSelector = '.productGrid .product';
const products = await page.$$eval(candidateSelector, (nodes, baseUrl) => {
const siteOrigin = new URL(baseUrl).origin;
const results = [];
for (const node of nodes) {
const anchor = node.querySelector('.card-title a, .card-figure__link[href], a[href]');
if (!anchor) continue;
const href = anchor.href || (anchor.getAttribute && anchor.getAttribute('href'));
if (!href || href.includes('#') || !href.startsWith(siteOrigin)) continue;
const titleEl = node.querySelector('.card-title a, .card-title, .product-title, .title, h2, h3, .prod-title') || anchor;
const title = titleEl ? (titleEl.textContent || '').trim() : '';
const priceEl = node.querySelector('[data-product-price-without-tax], [data-product-price-with-tax], .price--withoutTax, .price--withTax, .price-final, .product-price');
const priceText = priceEl ? (priceEl.textContent || '').trim() : '';
const imgEl = node.querySelector('img');
const image = imgEl ? (imgEl.src || imgEl.getAttribute('data-src') || '') : '';
const skuEl = node.querySelector('.sku, .product-sku, .prod-sku');
const skuText = skuEl ? (skuEl.textContent || '').trim() : (node.textContent || '').match(/SKU:\s*([A-Z0-9-]+)/i)?.[1] || '';
const sku = skuText.replace(/^SKU:\s*/i, '').trim();
const descEl = node.querySelector('.description, .product-description, p');
const description = descEl ? (descEl.textContent || '').trim() : '';
results.push({
title: title || null,
url: href ? new URL(href, baseUrl).href : null,
priceRaw: priceText || null,
image: image || null,
sku: sku || null,
description: description || null,
});
}
return results;
}, page.url());
// Deduplicate by URL
const map = new Map();
for (const p of products) {
if (!p.url) continue;
if (!map.has(p.url)) map.set(p.url, p);
}
console.log(`[BROCKS] Found ${map.size} unique product URLs from ${targetUrl}`);
// Helper to fetch product page details
async function fetchProductDetails(productPage, url) {
try {
await productPage.goto(url, { waitUntil: 'networkidle', timeout: 30000 });
return await productPage.evaluate(() => {
const clean = (value) => (value || '').replace(/\s+/g, ' ').trim();
const toAbsolute = (value) => {
if (!value) return null;
try {
return new URL(value, document.location.href).href;
} catch (e) {
return value;
}
};
const pick = (selectors) => {
for (const s of selectors) {
const el = document.querySelector(s);
if (el) return el;
}
return null;
};
const unique = (values) => [...new Set(values.filter(Boolean))];
const textFrom = (el) => {
if (!el) return null;
return clean(el.textContent || el.getAttribute('content') || el.getAttribute('value') || '');
};
const metaContent = (selector) => {
const el = document.querySelector(selector);
return el ? el.getAttribute('content') : null;
};
const extractSrcset = (value) => {
if (!value) return [];
return value.split(',')
.map((part) => part.trim().split(/\s+/)[0])
.map(toAbsolute)
.filter(Boolean);
};
const priceEl = pick([
'[data-product-price-without-tax]',
'[data-product-price-with-tax]',
'.productView-price .price--withoutTax',
'.productView-price .price--withTax',
'.productView-price [itemprop="price"]',
'[itemprop="price"]',
'.price--main',
'.price-final',
'.product-price'
]);
const priceText = textFrom(priceEl);
const productId = document.querySelector('input[name="product_id"]')?.value || null;
const title = textFrom(pick(['.productView-title', 'h1', '[itemprop="name"]']));
const skuEl = pick(['[itemprop="sku"]', '.sku', '.productView-sku', '.product-sku', '.prod-sku']);
const sku = textFrom(skuEl);
const brandEl = pick(['[itemprop="brand"] [itemprop="name"]', '.brand-value span', '.brand-value']);
const brand = textFrom(brandEl);
const descEl = pick(['.productView-description', '.product-description', '#product-description', '.description', '.prod-desc']);
const description = descEl ? clean(descEl.textContent) : null;
const imgEl = pick(['.productView-image img', '.product-hero img', '.productImage img', '.primary-image img', 'img']);
const image = imgEl ? toAbsolute(imgEl.getAttribute('src') || imgEl.getAttribute('data-src')) : null;
const productInfo = {};
document.querySelectorAll('.productView-info .pdp__info-wrapper > div').forEach((row) => {
const key = clean(row.querySelector('dt')?.textContent || '').replace(/:$/, '');
const value = clean(row.querySelector('dd')?.textContent || '');
if (key && value && !productInfo[key]) productInfo[key] = value;
});
const options = [...document.querySelectorAll('[data-product-option-change] .form-field')].map((field) => {
const label = clean(field.querySelector('.ob-option-name, .form-label')?.textContent || '').replace(/\s*\*$/, '').replace(/:$/, '');
const select = field.querySelector('select');
const inputs = [...field.querySelectorAll('input[type="radio"], input[type="checkbox"]')];
return {
label: label || null,
required: Boolean(field.querySelector('[required]') || clean(field.textContent).includes('*')),
type: select ? 'select' : (inputs[0]?.type || field.getAttribute('data-product-attribute') || null),
name: select?.name || inputs[0]?.name || null,
values: select
? [...select.options].map((option) => ({
label: clean(option.textContent),
value: option.value || null,
selected: option.selected,
disabled: option.disabled,
})).filter((option) => option.label)
: inputs.map((input) => ({
label: clean(field.querySelector(`label[for="${input.id}"]`)?.textContent || input.closest('label')?.textContent || input.value),
value: input.value || null,
checked: input.checked,
disabled: input.disabled,
})),
};
});
const descriptionSections = [...document.querySelectorAll('.productView-description .tabs .tab-title')].map((tab) => {
const id = tab.getAttribute('href');
const panel = id ? document.querySelector(id) : null;
return {
title: clean(tab.textContent) || null,
text: panel ? clean(panel.textContent) : null,
bullets: panel ? [...panel.querySelectorAll('li')].map((li) => clean(li.textContent)).filter(Boolean) : [],
paragraphs: panel ? [...panel.querySelectorAll('p')].map((p) => clean(p.textContent)).filter(Boolean) : [],
links: panel ? [...panel.querySelectorAll('a[href]')].map((link) => ({
text: clean(link.textContent),
url: toAbsolute(link.getAttribute('href')),
})).filter((link) => link.text || link.url) : [],
images: panel ? [...panel.querySelectorAll('img')].map((img) => ({
src: toAbsolute(img.getAttribute('src') || img.getAttribute('data-src')),
alt: img.getAttribute('alt') || null,
title: img.getAttribute('title') || null,
})).filter((item) => item.src) : [],
tables: panel ? [...panel.querySelectorAll('table')].map((table) =>
[...table.querySelectorAll('tr')].map((row) =>
[...row.children].map((cell) => clean(cell.textContent))
).filter((row) => row.length)
) : [],
};
});
const images = [...document.querySelectorAll('.productView-images img, .productCarousel img')]
.flatMap((img) => {
const variants = unique([
toAbsolute(img.getAttribute('src')),
toAbsolute(img.getAttribute('data-src')),
...extractSrcset(img.getAttribute('srcset')),
...extractSrcset(img.getAttribute('data-srcset')),
]);
if (!variants.length) return [];
return [{
src: variants[0],
variants,
alt: img.getAttribute('alt') || null,
title: img.getAttribute('title') || null,
}];
});
const reviews = [...document.querySelectorAll('.productReviews-list .productReview')].map((review) => ({
rating: Number.parseFloat(review.querySelector('[itemprop="ratingValue"]')?.getAttribute('content') || '') || null,
title: clean(review.querySelector('.productReview-title, h5, [itemprop="name"]')?.textContent || ''),
author: clean(review.querySelector('[itemprop="author"], .productReview-author')?.textContent || ''),
date: review.querySelector('time, [itemprop="datePublished"]')?.getAttribute('datetime') || clean(review.querySelector('time, [itemprop="datePublished"]')?.textContent || ''),
body: clean(review.querySelector('[itemprop="reviewBody"], .productReview-body')?.textContent || review.textContent || ''),
}));
return {
productId,
title,
sku,
mpn: metaContent('[itemprop="mpn"]'),
brand,
url: document.location.href,
canonicalUrl: document.querySelector('link[rel="canonical"]')?.href || null,
priceText,
currency: metaContent('meta[property="product:price:currency"]') || metaContent('[itemprop="priceCurrency"]'),
availability: clean(document.querySelector('.pdp__info--red')?.textContent || metaContent('meta[property="og:availability"]') || ''),
stock: {
display: productInfo['Quantity Available'] || null,
currentStock: textFrom(document.querySelector('[data-product-stock]')),
},
brandUrl: document.querySelector('.brand-value a')?.href || null,
image,
images,
description,
descriptionSections,
productInfo,
options,
reviews: {
ratingValue: Number.parseFloat(metaContent('[itemprop="ratingValue"]') || '') || null,
ratingCount: Number.parseInt(metaContent('[itemprop="ratingCount"]') || '', 10) || null,
reviewCount: Number.parseInt(metaContent('[itemprop="reviewCount"]') || '', 10) || reviews.length,
items: reviews,
},
pageMeta: {
title: document.title,
description: metaContent('meta[name="description"]'),
keywords: metaContent('meta[name="keywords"]'),
ogTitle: metaContent('meta[property="og:title"]'),
ogDescription: metaContent('meta[property="og:description"]'),
ogImage: metaContent('meta[property="og:image"]'),
},
};
});
} catch (e) {
return { error: e.message, priceText: null, sku: null, description: null, image: null };
}
}
// For detail extraction we'll open a secondary page to navigate product pages
const productPage = await browser.newPage();
const origin = new URL(targetUrl).origin;
const final = [];
const productItems = Array.from(map.values());
for (let index = 0; index < productItems.length; index += 1) {
const item = productItems[index];
const out = {
recordType: 'product',
source: 'page-scrape',
title: item.title,
url: item.url,
price: toNumber(item.priceRaw),
priceRaw: item.priceRaw,
sku: item.sku,
image: item.image,
description: item.description,
};
// Only attempt to fetch details for same-origin product pages
try {
const u = new URL(item.url);
if (u.origin === origin) {
const details = await fetchProductDetails(productPage, item.url);
if ((!out.priceRaw || out.priceRaw.trim() === '' || out.price === null) && details.priceText) {
out.priceRaw = details.priceText;
out.price = toNumber(details.priceText);
}
if (details.title) out.title = details.title;
if (details.sku) out.sku = details.sku;
if (details.description) out.description = details.description;
if (details.image) out.image = details.image;
Object.assign(out, {
productId: details.productId || null,
mpn: details.mpn || null,
brand: details.brand || null,
brandUrl: details.brandUrl || null,
canonicalUrl: details.canonicalUrl || null,
currency: details.currency || null,
availability: details.availability || null,
stock: details.stock || {},
productInfo: details.productInfo || {},
options: details.options || [],
images: details.images || [],
descriptionSections: details.descriptionSections || [],
reviews: details.reviews || {},
pageMeta: details.pageMeta || {},
scrapeError: details.error || null,
});
}
} catch (e) {
// ignore malformed URLs
}
final.push(out);
if ((index + 1) % 5 === 0 || index === productItems.length - 1) {
console.log(`[DETAIL] ${index + 1}/${productItems.length} completed`);
if (onDetailProgress) {
onDetailProgress({
done: index + 1,
total: productItems.length,
product: out.title || out.sku || out.url || null,
});
}
}
}
await productPage.close();
if (writeOutput) {
await fs.mkdir(path.dirname(outputFile), { recursive: true });
await fs.writeFile(outputFile, `${JSON.stringify(final, null, 2)}\n`, 'utf8');
console.log(`Saved ${final.length} products to ${outputFile}`);
}
return final;
} catch (err) {
console.error('Scrape failed:', err);
throw err;
} finally {
await browser.close();
}
}
if (require.main === module) {
scrape().catch(() => {
process.exitCode = 1;
});
}
module.exports = {
TARGET_URL,
cleanText,
scrape,
scrapeBrocksPerformanceProducts(targetUrl = TARGET_URL, options = {}) {
return scrape(targetUrl, { ...options, writeOutput: false });
},
toNumber,
};

View File

@ -0,0 +1 @@
module.exports = require("./scrape_brocks_products");

View File

@ -0,0 +1,33 @@
const kyt = require("./kyt");
const brocksPerformance = require("./brocks-performance");
const sources = {
[kyt.sourceKey]: kyt,
[brocksPerformance.sourceKey]: brocksPerformance,
};
function normalizeSourceKey(sourceKey) {
return String(sourceKey || "kyt").trim().toLowerCase();
}
function getSource(sourceKey = "kyt") {
const key = normalizeSourceKey(sourceKey);
const source = sources[key];
if (!source) {
const available = Object.keys(sources).join(", ");
throw new Error(`Unknown import source "${sourceKey}". Available sources: ${available}`);
}
return source;
}
function listSources() {
return Object.values(sources).map((source) => ({
sourceKey: source.sourceKey,
label: source.label,
}));
}
module.exports = {
getSource,
listSources,
};

View File

@ -0,0 +1,48 @@
const path = require("node:path");
const { getKytIndiaWebsiteData } = require("./scraper");
const { convertKytJsonToShopifyProducts } = require("./converter");
const sourceKey = "kyt";
const label = "KYT India";
const dataDir = path.join("data", "sources", sourceKey);
function paths() {
return {
aggregatedJson: path.join(dataDir, "01_products_aggregated.json"),
historyJson: path.join(dataDir, "01_products_run_history.json"),
aggregatedXlsx: path.join(dataDir, "01_products_aggregated.xlsx"),
downloadedImagesDir: path.join(dataDir, "02_downloaded_product_images"),
watermarkState: path.join(dataDir, "03_watermark_state.json"),
imageUploadState: path.join(dataDir, "04_shopify_image_upload_state.json"),
uploadedMapJson: path.join(dataDir, "04_shopify_uploaded_images_map.json"),
shopifyReadyJson: path.join(dataDir, "05_shopify_products_ready.json"),
logsDir: path.join(dataDir, "99_run_logs"),
};
}
async function fetchWebsiteData({ paths: runPaths }) {
return getKytIndiaWebsiteData({
outputJson: runPaths.aggregatedJson,
outputHistoryJson: runPaths.historyJson,
outputXlsx: runPaths.aggregatedXlsx,
});
}
function convertToShopifyProducts(input, options = {}) {
return convertKytJsonToShopifyProducts(input, {
imageBaseUrl: options.imageBaseUrl || "",
brand: options.brand || "KYT",
uploadedImageMap: options.uploadedImageMap,
});
}
module.exports = {
sourceKey,
label,
defaultBrand: "KYT",
defaultImageBaseUrl: "https://kytindia.com/server/public/uploads",
envImageBaseUrl: "KYT_IMAGE_BASE_URL",
paths,
fetchWebsiteData,
convertToShopifyProducts,
};

View File

@ -253,14 +253,18 @@ function writeExcel(filePath, rows, analysis) {
XLSX.writeFile(wb, filePath);
}
async function getKytIndiaWebsiteData() {
async function getKytIndiaWebsiteData(options = {}) {
try {
const outputJsonPath = path.resolve(process.cwd(), OUT_JSON);
const outJson = options.outputJson || OUT_JSON;
const outHistoryJson = options.outputHistoryJson || OUT_HISTORY_JSON;
const outXlsx = options.outputXlsx || OUT_XLSX;
const outputJsonPath = path.resolve(process.cwd(), outJson);
const outputDir = path.dirname(outputJsonPath);
await fs.mkdir(outputDir, { recursive: true });
const existingAggregated = await readExistingAggregated(outputJsonPath);
console.log(
`[CACHE] Loaded ${existingAggregated.products.length} existing products from ${OUT_JSON}` +
`[CACHE] Loaded ${existingAggregated.products.length} existing products from ${outJson}` +
(existingAggregated.generatedAt ? ` (generatedAt: ${existingAggregated.generatedAt})` : "")
);
@ -438,14 +442,16 @@ async function getKytIndiaWebsiteData() {
console.log("[4/5] Writing JSON outputs...");
await fs.writeFile(outputJsonPath, JSON.stringify(finalPayload, null, 2), "utf8");
const historyPath = path.resolve(process.cwd(), OUT_HISTORY_JSON);
const historyPath = path.resolve(process.cwd(), outHistoryJson);
await fs.mkdir(path.dirname(historyPath), { recursive: true });
const history = await readHistory(historyPath);
history.push(analysis);
await fs.writeFile(historyPath, JSON.stringify(history, null, 2), "utf8");
console.log("[5/5] Writing Excel output...");
const excelRows = buildExcelRows(detailedProducts);
const excelPath = path.resolve(process.cwd(), OUT_XLSX);
const excelPath = path.resolve(process.cwd(), outXlsx);
await fs.mkdir(path.dirname(excelPath), { recursive: true });
writeExcel(excelPath, excelRows, analysis);
console.log("\n=== FINAL ANALYSIS ===");

View File

@ -1,540 +0,0 @@
const { getKytIndiaWebsiteData } = require("./01_get_kytindia_website_data");
const { downloadProductImagesFromAggregatedJson } = require("./02_download_product_images");
const { applyWatermarkToDownloadedImages } = require("./03_watermark_downloaded_images");
const fs = require("node:fs/promises");
const path = require("node:path");
const fsSync = require("node:fs");
const { convertKytJsonToShopifyProducts } = require("./05_kyt_to_shopify_converter");
const { upsertShopifyProductFull } = require("./06_shopify_product_upsert");
const { uploadKytWatermarkedImagesToShopifyFiles } = require("./04_shopify_image_file_uploader");
const DEFAULT_AGGREGATED_JSON = "data/01_products_aggregated.json";
const DEFAULT_SHOPIFY_READY_JSON = "data/05_shopify_products_ready.json";
const DEFAULT_UPLOADED_MAP_JSON = "data/04_shopify_uploaded_images_map.json";
const DEFAULT_LOGS_DIR = "data/99_run_logs";
function nowIsoLocal() {
const d = new Date();
return d.toISOString();
}
function initRunLogger(logsDir = DEFAULT_LOGS_DIR) {
const absLogsDir = path.resolve(process.cwd(), logsDir);
fsSync.mkdirSync(absLogsDir, { recursive: true });
const stamp = new Date().toISOString().replace(/[:.]/g, "-");
const logPath = path.join(absLogsDir, `${stamp}.log`);
const stream = fsSync.createWriteStream(logPath, { flags: "a" });
const original = {
log: console.log.bind(console),
info: console.info.bind(console),
warn: console.warn.bind(console),
error: console.error.bind(console)
};
function writeLine(level, args) {
const text = args.map((a) => {
if (typeof a === "string") return a;
try {
return JSON.stringify(a);
} catch {
return String(a);
}
}).join(" ");
stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`);
}
console.log = (...args) => {
writeLine("LOG", args);
original.log(...args);
};
console.info = (...args) => {
writeLine("INFO", args);
original.info(...args);
};
console.warn = (...args) => {
writeLine("WARN", args);
original.warn(...args);
};
console.error = (...args) => {
writeLine("ERROR", args);
original.error(...args);
};
console.log(`[RUN-LOG] Writing logs to ${logPath}`);
return { logPath };
}
function loadDotEnvFile(filePath = ".env") {
const abs = path.resolve(process.cwd(), filePath);
if (!fsSync.existsSync(abs)) return;
const raw = fsSync.readFileSync(abs, "utf8");
const lines = raw.split(/\r?\n/);
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eq = trimmed.indexOf("=");
if (eq <= 0) continue;
const key = trimmed.slice(0, eq).trim();
const value = trimmed.slice(eq + 1).trim();
if (key && process.env[key] == null) {
process.env[key] = value;
}
}
}
loadDotEnvFile(".env");
const RUN_LOGGER = initRunLogger(DEFAULT_LOGS_DIR);
function parseCliArgs(argv = process.argv.slice(2)) {
const out = {
limit: null
};
for (let i = 0; i < argv.length; i += 1) {
const a = argv[i];
if ((a === "--limit" || a === "-n") && argv[i + 1]) {
const n = Number.parseInt(argv[i + 1], 10);
if (Number.isFinite(n) && n > 0) out.limit = n;
}
}
return out;
}
async function buildLimitedAggregatedJson(inputPath, limit) {
const absInputPath = path.resolve(process.cwd(), inputPath);
const raw = await fs.readFile(absInputPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
const limited = products.slice(0, limit);
const outPath = path.resolve(process.cwd(), `data/01_products_aggregated.limit_${limit}.json`);
const payload = {
...parsed,
generatedAt: new Date().toISOString(),
products: limited,
analysis: {
...(parsed.analysis || {}),
totalProductsUnique: limited.length,
limitedFrom: products.length,
limitApplied: limit
}
};
await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8");
return {
inputPath: absInputPath,
outputPath: outPath,
totalBefore: products.length,
totalAfter: limited.length
};
}
function readBooleanEnv(name, fallback = false) {
const val = process.env[name];
if (val == null) return fallback;
return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase());
}
function logStageSummary(stepKey, summary = {}) {
const parts = Object.entries(summary)
.filter(([, value]) => value !== undefined && value !== null && value !== "")
.map(([key, value]) => `${key}=${typeof value === "object" ? JSON.stringify(value) : value}`);
console.log(`[STAGE-SUMMARY] ${stepKey} | ${parts.join(" | ")}`);
}
async function convertAggregatedToShopifyReady({
inputPath = DEFAULT_AGGREGATED_JSON,
outputPath = DEFAULT_SHOPIFY_READY_JSON,
uploadedImageMapPath = DEFAULT_UPLOADED_MAP_JSON,
imageBaseUrl = process.env.KYT_IMAGE_BASE_URL || "",
brand = process.env.SHOPIFY_BRAND || "KYT"
} = {}) {
const absInputPath = path.resolve(process.cwd(), inputPath);
const absOutputPath = path.resolve(process.cwd(), outputPath);
const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath);
const raw = await fs.readFile(absInputPath, "utf8");
const parsed = JSON.parse(raw);
let uploadedImageMap = null;
try {
const mapRaw = await fs.readFile(absUploadedMapPath, "utf8");
uploadedImageMap = JSON.parse(mapRaw);
} catch {
uploadedImageMap = null;
}
const convertedProducts = convertKytJsonToShopifyProducts(parsed, {
imageBaseUrl,
brand,
uploadedImageMap
});
const payload = {
generatedAt: new Date().toISOString(),
sourceFile: absInputPath,
totalProducts: convertedProducts.length,
products: convertedProducts
};
await fs.mkdir(path.dirname(absOutputPath), { recursive: true });
await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8");
return {
inputPath: absInputPath,
outputPath: absOutputPath,
uploadedImageMapPath: absUploadedMapPath,
totalProducts: convertedProducts.length
};
}
async function upsertShopifyProductsFromConverted({
convertedPath = DEFAULT_SHOPIFY_READY_JSON,
shop = process.env.SHOPIFY_SHOP,
accessToken = process.env.SHOPIFY_ACCESS_TOKEN,
locationId = process.env.SHOPIFY_LOCATION_ID || null,
enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10"
} = {}) {
if (!shop) {
throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage.");
}
if (!accessToken) {
throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage.");
}
const absConvertedPath = path.resolve(process.cwd(), convertedPath);
const raw = await fs.readFile(absConvertedPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
const startedAtMs = Date.now();
const metrics = {
sourcePath: absConvertedPath,
total: products.length,
processed: 0,
created: 0,
updated: 0,
failed: 0,
errors: [],
startedAt: new Date(startedAtMs).toISOString(),
finishedAt: null,
durationSeconds: 0,
successRate: 0
};
for (let i = 0; i < products.length; i += 1) {
const product = products[i];
const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`;
try {
const result = await upsertShopifyProductFull({
shop,
accessToken,
product,
locationId,
enableSeo,
apiVersion
});
metrics.processed += 1;
if (result.action === "created") metrics.created += 1;
if (result.action === "updated") metrics.updated += 1;
if ((i + 1) % 10 === 0 || i === products.length - 1) {
console.log(
`[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}`
);
}
} catch (error) {
metrics.processed += 1;
metrics.failed += 1;
metrics.errors.push({
index: i + 1,
product: label,
error: error.message
});
console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`);
}
}
const endedAtMs = Date.now();
metrics.finishedAt = new Date(endedAtMs).toISOString();
metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2));
metrics.successRate = metrics.total > 0
? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2))
: 0;
return metrics;
}
async function runFullKytPipeline(options = {}) {
const cli = parseCliArgs();
const onProgress = typeof options.onProgress === "function" ? options.onProgress : null;
const emitProgress = (stepIndex, stepKey, message) => {
if (onProgress) {
onProgress({
stepIndex,
totalSteps: 6,
stepKey,
message
});
}
};
emitProgress(1, "fetchWebsiteData", "Fetching KYT website data");
console.log("[PIPELINE 1/6] Fetching KYT website data...");
const dataSummary = await getKytIndiaWebsiteData();
let aggregatedPathForRun = DEFAULT_AGGREGATED_JSON;
let imagesDirForRun = "data/02_downloaded_product_images";
let limitSummary = null;
if (cli.limit) {
limitSummary = await buildLimitedAggregatedJson(DEFAULT_AGGREGATED_JSON, cli.limit);
aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/");
imagesDirForRun = `data/02_downloaded_product_images.limit_${cli.limit}`;
console.log(
`[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}`
);
}
emitProgress(2, "downloadImages", "Downloading product images");
console.log("\n[PIPELINE 2/6] Downloading product images...");
const downloadSummary = await downloadProductImagesFromAggregatedJson({
jsonPath: aggregatedPathForRun,
outputDir: imagesDirForRun,
concurrency: Math.max(1, Number.parseInt(process.env.IMAGE_DOWNLOAD_CONCURRENCY || "8", 10) || 8)
});
logStageSummary("downloadImages", {
total: downloadSummary?.totalImagesFound ?? 0,
downloaded: downloadSummary?.downloaded ?? 0,
skipped: downloadSummary?.skipped ?? 0,
failed: downloadSummary?.failed ?? 0,
products: downloadSummary?.productsCount ?? 0
});
emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
console.log("\n[PIPELINE 3/6] Applying watermark in-place...");
const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png";
let watermarkSummary;
const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun);
if (!fsSync.existsSync(absWatermarkPath)) {
watermarkSummary = {
skipped: true,
reason: `Watermark file not found: ${absWatermarkPath}`,
imagesDir: path.resolve(process.cwd(), imagesDirForRun),
watermarkPath: absWatermarkPath,
totalImagesFound: 0,
processed: 0,
skippedCount: 0,
failed: 0
};
console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`);
} else {
const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false);
try {
watermarkSummary = await applyWatermarkToDownloadedImages({
imagesDir: imagesDirForRun,
watermarkPath: watermarkPathForRun,
concurrency: Math.max(1, Number.parseInt(process.env.WATERMARK_CONCURRENCY || "4", 10) || 4)
});
} catch (error) {
if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) {
watermarkSummary = {
skipped: true,
reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`,
imagesDir: path.resolve(process.cwd(), imagesDirForRun),
watermarkPath: absWatermarkPath,
totalImagesFound: 0,
processed: 0,
skippedCount: 0,
failed: 0
};
console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`);
} else {
throw error;
}
}
}
logStageSummary("watermarkImages", {
total: watermarkSummary?.totalImagesFound ?? 0,
processed: watermarkSummary?.processed ?? 0,
skipped: watermarkSummary?.skipped ?? watermarkSummary?.skippedCount ?? 0,
failed: watermarkSummary?.failed ?? 0,
concurrency: watermarkSummary?.concurrency ?? ""
});
emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true);
const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false);
let imageUploadSummary;
if (!imageUploadEnabled) {
imageUploadSummary = {
skipped: true,
reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false",
totalTasks: 0,
processed: 0,
uploaded: 0,
failed: 0
};
console.log("[PIPELINE 4/6] Image upload stage skipped by config.");
} else {
try {
imageUploadSummary = await uploadKytWatermarkedImagesToShopifyFiles({
shop: process.env.SHOPIFY_SHOP,
accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
aggregatedJsonPath: aggregatedPathForRun,
imagesDir: imagesDirForRun,
statePath: "data/04_shopify_image_upload_state.json",
mapPath: DEFAULT_UPLOADED_MAP_JSON,
concurrency: Math.max(1, Number.parseInt(process.env.SHOPIFY_IMAGE_UPLOAD_CONCURRENCY || "3", 10) || 3)
});
} catch (error) {
const message = String(error?.message || "Unknown image upload error");
imageUploadSummary = {
skipped: true,
reason: message,
totalTasks: 0,
processed: 0,
uploaded: 0,
failed: 0
};
console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`);
if (imageUploadRequired) {
throw error;
}
console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false).");
}
}
logStageSummary("uploadImagesToShopifyFiles", {
total: imageUploadSummary?.totalTasks ?? 0,
processed: imageUploadSummary?.processed ?? 0,
uploaded: imageUploadSummary?.uploaded ?? 0,
skipped: imageUploadSummary?.skipped ?? 0,
failed: imageUploadSummary?.failed ?? 0,
concurrency: imageUploadSummary?.concurrency ?? ""
});
emitProgress(5, "convertToShopifyReady", "Converting KYT data to Shopify-ready products");
console.log("\n[PIPELINE 5/6] Converting KYT data to Shopify-ready products...");
const conversionSummary = await convertAggregatedToShopifyReady({
inputPath: aggregatedPathForRun,
outputPath: DEFAULT_SHOPIFY_READY_JSON,
uploadedImageMapPath: DEFAULT_UPLOADED_MAP_JSON,
imageBaseUrl: process.env.KYT_IMAGE_BASE_URL || "",
brand: process.env.SHOPIFY_BRAND || "KYT"
});
logStageSummary("convertToShopifyReady", {
total: conversionSummary?.totalProducts ?? 0
});
emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({
convertedPath: DEFAULT_SHOPIFY_READY_JSON,
shop: process.env.SHOPIFY_SHOP,
accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
locationId: process.env.SHOPIFY_LOCATION_ID || null,
enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10"
});
logStageSummary("upsertToShopify", {
total: shopifyUpsertSummary?.total ?? 0,
processed: shopifyUpsertSummary?.processed ?? 0,
created: shopifyUpsertSummary?.created ?? 0,
updated: shopifyUpsertSummary?.updated ?? 0,
failed: shopifyUpsertSummary?.failed ?? 0,
successRate: shopifyUpsertSummary?.successRate ?? 0
});
const summary = {
completedAt: new Date().toISOString(),
runLogPath: RUN_LOGGER.logPath,
limit: cli.limit || null,
limitSummary,
steps: {
fetchWebsiteData: dataSummary,
downloadImages: downloadSummary,
watermarkImages: watermarkSummary,
uploadImagesToShopifyFiles: imageUploadSummary,
convertToShopifyReady: conversionSummary,
upsertToShopify: shopifyUpsertSummary
},
upcomingSteps: []
};
emitProgress(6, "completed", "KYT pipeline completed");
console.log("\n=== FULL PIPELINE SUMMARY ===");
console.log(JSON.stringify(summary, null, 2));
console.log("\n=== SUMMARY TABLE ===");
console.table([
{
step: "fetchWebsiteData",
total: dataSummary?.analysis?.totalProductsUnique ?? "",
processed: dataSummary?.analysis?.detailFetchedNow ?? "",
skipped: dataSummary?.analysis?.cachedDetailReused ?? "",
failed: dataSummary?.analysis?.detailFailed ?? ""
},
{
step: "downloadImages",
total: downloadSummary?.totalImagesFound ?? "",
processed: downloadSummary?.downloaded ?? "",
skipped: downloadSummary?.skipped ?? "",
failed: downloadSummary?.failed ?? ""
},
{
step: "watermarkImages",
total: watermarkSummary?.totalImagesFound ?? "",
processed: watermarkSummary?.processed ?? "",
skipped: watermarkSummary?.skipped ?? "",
failed: watermarkSummary?.failed ?? ""
},
{
step: "uploadImagesToShopifyFiles",
total: imageUploadSummary?.totalTasks ?? "",
processed: imageUploadSummary?.processed ?? "",
skipped: imageUploadSummary?.skipped ?? "",
failed: imageUploadSummary?.failed ?? ""
},
{
step: "convertToShopifyReady",
total: conversionSummary?.totalProducts ?? "",
processed: conversionSummary?.totalProducts ?? "",
skipped: "",
failed: ""
},
{
step: "upsertToShopify",
total: shopifyUpsertSummary?.total ?? "",
processed: shopifyUpsertSummary?.processed ?? "",
skipped: "",
failed: shopifyUpsertSummary?.failed ?? ""
}
]);
return summary;
}
module.exports = {
runFullKytPipeline,
convertAggregatedToShopifyReady,
upsertShopifyProductsFromConverted
};
if (require.main === module) {
runFullKytPipeline().catch((error) => {
console.error("Full pipeline failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -10,17 +10,26 @@ function getJob(jobId) {
return jobs[jobId] || null;
}
function canStartJob(shop) {
function getJobIdForPayload(payload = {}) {
const shop = String(payload.shop || "").trim();
const source = String(payload.source || "kyt").trim().toLowerCase();
if (!shop) {
return "";
}
return source === "kyt" ? shop : `${shop}::${source}`;
}
function canStartJob(shop, source = "kyt") {
if (!shop) {
return false;
}
const existing = jobs[shop];
const existing = jobs[getJobIdForPayload({ shop, source })];
return !existing || existing.status === "done" || existing.status === "error";
}
function createJob(payload = {}) {
const id = String(payload.shop || "").trim();
const id = getJobIdForPayload(payload);
if (!id) {
throw new Error("Shop is required to create a job.");
}
@ -94,4 +103,5 @@ module.exports = {
createJob,
updateJob,
appendJobLog,
getJobIdForPayload,
};

View File

@ -1,7 +1,6 @@
const path = require("node:path");
const { getToken } = require("../tokenStore");
const { log } = require("../logger");
const { runFullKytPipeline } = require("./business-logic/kyt-pipeline/00_index");
const { runFullSourcePipeline } = require("./business-logic/import-pipeline/runSourcePipeline");
const { updateJob, appendJobLog } = require("./pipelineJobs");
function stringifyLogArgs(args) {
@ -189,7 +188,7 @@ function deriveLiveStats(job, line) {
}
async function runKytPipelineJob(job) {
const { shop, limit } = job.payload || {};
const { shop, limit, source = "kyt" } = job.payload || {};
const tokenRecord = getToken(shop);
if (!tokenRecord) {
@ -223,7 +222,7 @@ async function runKytPipelineJob(job) {
updateJob(job.id, {
status: "running",
step: "starting",
detail: `Starting KYT pipeline for ${shop}`,
detail: `Starting ${source} import pipeline for ${shop}`,
});
const originalConsole = {
@ -247,7 +246,10 @@ async function runKytPipelineJob(job) {
console.error = capture("error");
try {
const summary = await runFullKytPipeline({
const summary = await runFullSourcePipeline({
sourceKey: source,
limit,
argv: [],
onProgress(progress) {
updateJob(job.id, {
status: "running",
@ -267,7 +269,7 @@ async function runKytPipelineJob(job) {
detail: "Pipeline completed successfully",
summary,
});
log(shop, `KYT pipeline completed for job ${job.id}`);
log(shop, `${source} import pipeline completed for job ${job.id}`);
} finally {
console.log = originalConsole.log;
console.info = originalConsole.info;
@ -280,7 +282,7 @@ async function runKytPipelineJob(job) {
error: error.message,
detail: `Pipeline failed: ${error.message}`,
});
log(shop, `KYT pipeline failed for job ${job.id}: ${error.message}`);
log(shop, `${source} import pipeline failed for job ${job.id}: ${error.message}`);
} finally {
process.argv = originalArgv;
process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP;