first commit

This commit is contained in:
MOHAN 2026-04-13 17:31:26 +05:30
commit e87bd907ea
26 changed files with 4975 additions and 0 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
@"
node_modules/
.env
.env.*
dist/
build/
.next/
coverage/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
.DS_Store
Thumbs.db
.vscode/
.idea/
"@ | Out-File -Encoding utf8 .gitignore

80
auth.js Normal file
View File

@ -0,0 +1,80 @@
const express = require("express");
const axios = require("axios");
const { log } = require("./logger");
const { saveToken, deleteToken } = require("./tokenStore");
const { createFulfillmentService } = require("./fulfillmentService");
const router = express.Router();
const CLIENT_ID = process.env.SHOPIFY_CLIENT_ID;
const CLIENT_SECRET = process.env.SHOPIFY_CLIENT_SECRET;
router.get("/auth/login", (req, res) => {
const { shop } = req.query;
if (!shop) {
return res.status(400).json({ error: "Missing shop query parameter." });
}
const redirectUri = process.env.SHOPIFY_REDIRECT_URI || `${process.env.APP_URL || "http://localhost:3002"}/auth/callback`;
const scopes = process.env.SHOPIFY_SCOPES || "write_products,write_files,write_inventory,write_publications";
const installUrl = `https://${shop}/admin/oauth/authorize?client_id=${CLIENT_ID}&scope=${encodeURIComponent(scopes)}&redirect_uri=${encodeURIComponent(redirectUri)}`;
return res.redirect(installUrl);
});
router.get("/auth/callback", async (req, res) => {
const { shop, code } = req.query;
if (!shop || !code) {
log("general", `Missing shop or code in callback: ${JSON.stringify(req.query)}`);
return res.status(400).send("Missing shop or code parameter.");
}
try {
log(shop, "Exchanging OAuth code for access token");
const resp = await axios.post(
`https://${shop}/admin/oauth/access_token`,
{
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET,
code,
},
{
headers: { "Content-Type": "application/json" },
}
);
const { access_token, scope } = resp.data;
saveToken(shop, access_token, scope);
log(shop, "Token saved to data/tokens.json");
const fulfillment = await createFulfillmentService(shop, access_token);
if (fulfillment?.success) {
saveToken(shop, access_token, scope, fulfillment.fulfillmentService, fulfillment.locationId);
log(shop, "Fulfillment service and location stored");
} else {
log(shop, `Fulfillment setup skipped/failed: ${JSON.stringify(fulfillment?.errors || fulfillment?.error || null)}`);
}
const redirectTarget = process.env.SHOPIFY_AFTER_AUTH_REDIRECT || "https://admin.shopify.com";
return res.redirect(redirectTarget);
} catch (err) {
const errMsg = err.response?.data || err.message;
log(shop, `OAuth error: ${JSON.stringify(errMsg)}`);
return res.status(500).send("Failed to get access token");
}
});
router.post("/auth/logout", (req, res) => {
const { shop } = req.body || {};
if (!shop) {
return res.status(400).json({ error: "Missing shop." });
}
deleteToken(shop);
log(shop, "Shop token removed");
return res.json({ ok: true, shop });
});
module.exports = router;

View File

@ -0,0 +1,2 @@
[2026-04-13T05:47:05.171Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T05-47-05-170Z.log
[2026-04-13T05:47:05.171Z] [LOG] function

View File

@ -0,0 +1,2 @@
[2026-04-13T05:47:05.267Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T05-47-05-267Z.log
[2026-04-13T05:47:05.272Z] [LOG] [2026-04-13T05:47:05.271Z] [general] Server listening on port 3002

View File

@ -0,0 +1,2 @@
[2026-04-13T11:54:39.523Z] [LOG] [RUN-LOG] Writing logs to D:\2026\Race-Nation-Shopify-Backend\Race-Nation-Shopify-App-Backend\data\99_run_logs\2026-04-13T11-54-39-523Z.log
[2026-04-13T11:54:39.527Z] [LOG] [2026-04-13T11:54:39.527Z] [general] Server listening on port 3002

1
data/tokens.json Normal file
View File

@ -0,0 +1 @@
{}

BIN
data/watermark.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

134
fulfillmentService.js Normal file
View File

@ -0,0 +1,134 @@
const axios = require("axios");
const { log } = require("./logger");
const getLocationQuery = `
query {
locations(first: 1, query: "name:'Shop location'") {
nodes {
id
name
address {
address1
address2
city
province
provinceCode
country
countryCode
zip
phone
}
}
}
}
`;
async function getStoreAddress(client) {
const response = await client.post("", { query: getLocationQuery });
const location = response.data?.data?.locations?.nodes?.[0];
return location?.address || null;
}
function createLocationMutation(address) {
return `
mutation {
locationAdd(input: {
name: "(App) Race Nation Distribution"
address: {
address1: "${address?.address1 || ""}"
address2: "${address?.address2 || ""}"
city: "${address?.city || ""}"
provinceCode: "${address?.provinceCode || "ON"}"
countryCode: ${JSON.stringify(address?.countryCode || "US")}
zip: "${address?.zip || ""}"
phone: "${address?.phone || ""}"
}
fulfillsOnlineOrders: true
}) {
location {
id
name
}
userErrors {
code
field
message
}
}
}
`;
}
async function createCustomLocation(address, client) {
const response = await client.post("", { query: createLocationMutation(address) });
return response.data?.data?.locationAdd || { location: null, userErrors: [] };
}
async function createFulfillmentService(shop, accessToken) {
const client = axios.create({
baseURL: `https://${shop}/admin/api/2025-10/graphql.json`,
headers: {
"X-Shopify-Access-Token": accessToken,
"Content-Type": "application/json",
},
});
const mutation = `
mutation {
fulfillmentServiceCreate(
name: "Race Nation Distribution"
callbackUrl: "https://backend.race-nation.com/fulfillment"
) {
fulfillmentService {
id
serviceName
callbackUrl
handle
location {
id
}
}
userErrors {
field
message
}
}
}
`;
try {
log(shop, "Creating fulfillment service...");
const response = await client.post("", { query: mutation });
const data = response.data?.data?.fulfillmentServiceCreate;
if (data?.userErrors?.length) {
return { success: false, errors: data.userErrors };
}
const address = await getStoreAddress(client);
let locationId = data?.fulfillmentService?.location?.id || null;
if (address) {
const customLocation = await createCustomLocation(address, client);
if (!customLocation?.userErrors?.length && customLocation?.location?.id) {
locationId = customLocation.location.id;
}
}
return {
success: true,
fulfillmentService: data?.fulfillmentService || null,
locationId,
};
} catch (error) {
log(shop, `Fulfillment service request failed: ${error.response ? JSON.stringify(error.response.data) : error.message}`);
return {
success: false,
error: error.response ? error.response.data : error.message,
};
}
}
module.exports = {
createFulfillmentService,
};

21
logger.js Normal file
View File

@ -0,0 +1,21 @@
const fs = require("node:fs");
const path = require("node:path");
const logsDir = path.resolve(__dirname, "logs");
if (!fs.existsSync(logsDir)) {
fs.mkdirSync(logsDir, { recursive: true });
}
const masterLogFile = path.join(logsDir, "master.log");
function log(scope, message) {
const line = `[${new Date().toISOString()}] [${scope}] ${message}\n`;
fs.appendFileSync(masterLogFile, line, "utf8");
const scopeFile = path.join(logsDir, `${String(scope || "general").replace(/\W+/g, "_")}.log`);
fs.appendFileSync(scopeFile, line, "utf8");
console.log(line.trim());
}
module.exports = {
log,
};

2
logs/general.log Normal file
View File

@ -0,0 +1,2 @@
[2026-04-13T05:47:05.271Z] [general] Server listening on port 3002
[2026-04-13T11:54:39.527Z] [general] Server listening on port 3002

2
logs/master.log Normal file
View File

@ -0,0 +1,2 @@
[2026-04-13T05:47:05.271Z] [general] Server listening on port 3002
[2026-04-13T11:54:39.527Z] [general] Server listening on port 3002

1614
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

19
package.json Normal file
View File

@ -0,0 +1,19 @@
{
"name": "race-nation-shopify-app-backend",
"version": "1.0.0",
"private": true,
"main": "server.js",
"scripts": {
"start": "node server.js",
"dev": "node server.js"
},
"dependencies": {
"axios": "^1.13.2",
"cors": "^2.8.5",
"dotenv": "^17.2.0",
"express": "^5.1.0",
"sharp": "^0.33.5",
"uuid": "^11.1.0",
"xlsx": "^0.18.5"
}
}

55
routes/pipeline.js Normal file
View File

@ -0,0 +1,55 @@
const express = require("express");
const { createJob, updateJob, getJob, listJobs, canStartJob } = require("../src/pipelineJobs");
const { runKytPipelineJob } = require("../src/runKytPipelineJob");
const { getToken } = require("../tokenStore");
const router = express.Router();
router.post("/run", async (req, res) => {
const { shop, limit = null } = req.body || {};
if (!shop) {
return res.status(400).json({ error: "Missing shop." });
}
if (!getToken(shop)) {
return res.status(400).json({ error: "No stored Shopify token found for this shop. Complete app auth first." });
}
if (!canStartJob(shop)) {
return res.status(409).json({ error: `A KYT pipeline job is already running for ${shop}.` });
}
const job = createJob({ shop, limit });
updateJob(job.id, {
status: "queued",
step: "queued",
detail: "Job queued",
});
setImmediate(() => {
runKytPipelineJob(job);
});
return res.json({
jobId: job.id,
status: "queued",
shop,
limit,
});
});
router.get("/status/:jobId", (req, res) => {
const job = getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found." });
}
return res.json(job);
});
router.get("/jobs", (req, res) => {
return res.json({ jobs: listJobs() });
});
module.exports = router;

View File

@ -0,0 +1,56 @@
require("dotenv").config();
const express = require("express");
const crypto = require("crypto");
const router = express.Router();
router.use(express.raw({ type: "*/*" }));
const SHOPIFY_API_SECRET = process.env.SHOPIFY_API_SECRET;
function verifyHmac(rawBody, hmacHeader) {
if (!SHOPIFY_API_SECRET || !hmacHeader || !rawBody) {
return false;
}
const digest = crypto
.createHmac("sha256", SHOPIFY_API_SECRET)
.update(rawBody)
.digest("base64");
const generated = Buffer.from(digest, "utf8");
const received = Buffer.from(hmacHeader, "utf8");
if (generated.length !== received.length) {
return false;
}
return crypto.timingSafeEqual(generated, received);
}
function parseJsonSafe(buf) {
try {
return JSON.parse(buf.toString("utf8"));
} catch {
return null;
}
}
function handleWebhook(req, res, topicName) {
const hmacHeader = req.header("x-shopify-hmac-sha256");
const shop = req.header("x-shopify-shop-domain");
const topic = req.header("x-shopify-topic") || topicName;
if (!verifyHmac(req.body, hmacHeader)) {
return res.status(401).send("Invalid HMAC");
}
const payload = parseJsonSafe(req.body) || {};
console.log(`[WEBHOOK:${topic}] shop=${shop}`, payload);
return res.status(200).json({ status: "ok", topic, shop, received: payload });
}
router.post("/customers/data_request", (req, res) => handleWebhook(req, res, "customers/data_request"));
router.post("/customers/redact", (req, res) => handleWebhook(req, res, "customers/redact"));
router.post("/shop/redact", (req, res) => handleWebhook(req, res, "shop/redact"));
module.exports = router;

69
server.js Normal file
View File

@ -0,0 +1,69 @@
require("dotenv").config();
const express = require("express");
const cors = require("cors");
const { log } = require("./logger");
const auth = require("./auth");
const privacyLawWebhooks = require("./routes/privacyLawWebhooks");
const pipelineRoutes = require("./routes/pipeline");
const { getToken, listTokens } = require("./tokenStore");
const app = express();
const PORT = process.env.PORT || 3002;
app.use(cors());
app.get("/health", (req, res) => {
res.json({ ok: true, service: "race-nation-shopify-app-backend" });
});
app.get("/shops/:shop", (req, res) => {
const shop = req.params.shop;
const tokenRecord = getToken(shop);
if (!tokenRecord) {
return res.status(404).json({ status: 0, message: "Shop not found" });
}
return res.json({
status: 1,
shop,
fields: {
accessToken: tokenRecord.accessToken ? "present" : "missing",
scope: tokenRecord.scope ? "present" : "missing",
savedAt: tokenRecord.savedAt ? "present" : "missing",
locationId: tokenRecord.locationId ? "present" : "missing",
fulfillmentService: tokenRecord.fulfillmentService ? "present" : "missing",
},
});
});
app.get("/shops", (req, res) => {
res.json({ shops: listTokens() });
});
app.use("/webhooks", privacyLawWebhooks);
app.use(express.json({ limit: "10mb" }));
app.use(express.urlencoded({ limit: "10mb", extended: true }));
app.use("/", auth);
app.post("/fulfillment", (req, res) => {
console.log("POST /fulfillment:", req.body);
res.sendStatus(200);
});
app.use("/pipeline", pipelineRoutes);
const server = app.listen(PORT, () => {
log("general", `Server listening on port ${PORT}`);
});
server.on("error", (err) => {
if (err.code === "EADDRINUSE") {
console.error(`Port ${PORT} is already in use.`);
process.exit(1);
}
console.error("Server error:", err);
process.exit(1);
});

View File

@ -0,0 +1,496 @@
const { getKytIndiaWebsiteData } = require("./01_get_kytindia_website_data");
const { downloadProductImagesFromAggregatedJson } = require("./02_download_product_images");
const { applyWatermarkToDownloadedImages } = require("./03_watermark_downloaded_images");
const fs = require("node:fs/promises");
const path = require("node:path");
const fsSync = require("node:fs");
const { convertKytJsonToShopifyProducts } = require("./05_kyt_to_shopify_converter");
const { upsertShopifyProductFull } = require("./06_shopify_product_upsert");
const { uploadKytWatermarkedImagesToShopifyFiles } = require("./04_shopify_image_file_uploader");
const DEFAULT_AGGREGATED_JSON = "data/01_products_aggregated.json";
const DEFAULT_SHOPIFY_READY_JSON = "data/05_shopify_products_ready.json";
const DEFAULT_UPLOADED_MAP_JSON = "data/04_shopify_uploaded_images_map.json";
const DEFAULT_LOGS_DIR = "data/99_run_logs";
function nowIsoLocal() {
const d = new Date();
return d.toISOString();
}
function initRunLogger(logsDir = DEFAULT_LOGS_DIR) {
const absLogsDir = path.resolve(process.cwd(), logsDir);
fsSync.mkdirSync(absLogsDir, { recursive: true });
const stamp = new Date().toISOString().replace(/[:.]/g, "-");
const logPath = path.join(absLogsDir, `${stamp}.log`);
const stream = fsSync.createWriteStream(logPath, { flags: "a" });
const original = {
log: console.log.bind(console),
info: console.info.bind(console),
warn: console.warn.bind(console),
error: console.error.bind(console)
};
function writeLine(level, args) {
const text = args.map((a) => {
if (typeof a === "string") return a;
try {
return JSON.stringify(a);
} catch {
return String(a);
}
}).join(" ");
stream.write(`[${nowIsoLocal()}] [${level}] ${text}\n`);
}
console.log = (...args) => {
writeLine("LOG", args);
original.log(...args);
};
console.info = (...args) => {
writeLine("INFO", args);
original.info(...args);
};
console.warn = (...args) => {
writeLine("WARN", args);
original.warn(...args);
};
console.error = (...args) => {
writeLine("ERROR", args);
original.error(...args);
};
console.log(`[RUN-LOG] Writing logs to ${logPath}`);
return { logPath };
}
function loadDotEnvFile(filePath = ".env") {
const abs = path.resolve(process.cwd(), filePath);
if (!fsSync.existsSync(abs)) return;
const raw = fsSync.readFileSync(abs, "utf8");
const lines = raw.split(/\r?\n/);
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eq = trimmed.indexOf("=");
if (eq <= 0) continue;
const key = trimmed.slice(0, eq).trim();
const value = trimmed.slice(eq + 1).trim();
if (key && process.env[key] == null) {
process.env[key] = value;
}
}
}
loadDotEnvFile(".env");
const RUN_LOGGER = initRunLogger(DEFAULT_LOGS_DIR);
function parseCliArgs(argv = process.argv.slice(2)) {
const out = {
limit: null
};
for (let i = 0; i < argv.length; i += 1) {
const a = argv[i];
if ((a === "--limit" || a === "-n") && argv[i + 1]) {
const n = Number.parseInt(argv[i + 1], 10);
if (Number.isFinite(n) && n > 0) out.limit = n;
}
}
return out;
}
async function buildLimitedAggregatedJson(inputPath, limit) {
const absInputPath = path.resolve(process.cwd(), inputPath);
const raw = await fs.readFile(absInputPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
const limited = products.slice(0, limit);
const outPath = path.resolve(process.cwd(), `data/01_products_aggregated.limit_${limit}.json`);
const payload = {
...parsed,
generatedAt: new Date().toISOString(),
products: limited,
analysis: {
...(parsed.analysis || {}),
totalProductsUnique: limited.length,
limitedFrom: products.length,
limitApplied: limit
}
};
await fs.writeFile(outPath, JSON.stringify(payload, null, 2), "utf8");
return {
inputPath: absInputPath,
outputPath: outPath,
totalBefore: products.length,
totalAfter: limited.length
};
}
function readBooleanEnv(name, fallback = false) {
const val = process.env[name];
if (val == null) return fallback;
return ["1", "true", "yes", "y", "on"].includes(String(val).toLowerCase());
}
async function convertAggregatedToShopifyReady({
inputPath = DEFAULT_AGGREGATED_JSON,
outputPath = DEFAULT_SHOPIFY_READY_JSON,
uploadedImageMapPath = DEFAULT_UPLOADED_MAP_JSON,
imageBaseUrl = process.env.KYT_IMAGE_BASE_URL || "",
brand = process.env.SHOPIFY_BRAND || "KYT"
} = {}) {
const absInputPath = path.resolve(process.cwd(), inputPath);
const absOutputPath = path.resolve(process.cwd(), outputPath);
const absUploadedMapPath = path.resolve(process.cwd(), uploadedImageMapPath);
const raw = await fs.readFile(absInputPath, "utf8");
const parsed = JSON.parse(raw);
let uploadedImageMap = null;
try {
const mapRaw = await fs.readFile(absUploadedMapPath, "utf8");
uploadedImageMap = JSON.parse(mapRaw);
} catch {
uploadedImageMap = null;
}
const convertedProducts = convertKytJsonToShopifyProducts(parsed, {
imageBaseUrl,
brand,
uploadedImageMap
});
const payload = {
generatedAt: new Date().toISOString(),
sourceFile: absInputPath,
totalProducts: convertedProducts.length,
products: convertedProducts
};
await fs.mkdir(path.dirname(absOutputPath), { recursive: true });
await fs.writeFile(absOutputPath, JSON.stringify(payload, null, 2), "utf8");
return {
inputPath: absInputPath,
outputPath: absOutputPath,
uploadedImageMapPath: absUploadedMapPath,
totalProducts: convertedProducts.length
};
}
async function upsertShopifyProductsFromConverted({
convertedPath = DEFAULT_SHOPIFY_READY_JSON,
shop = process.env.SHOPIFY_SHOP,
accessToken = process.env.SHOPIFY_ACCESS_TOKEN,
locationId = process.env.SHOPIFY_LOCATION_ID || null,
enableSeo = readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10"
} = {}) {
if (!shop) {
throw new Error("Missing SHOPIFY_SHOP for Shopify upsert stage.");
}
if (!accessToken) {
throw new Error("Missing SHOPIFY_ACCESS_TOKEN for Shopify upsert stage.");
}
const absConvertedPath = path.resolve(process.cwd(), convertedPath);
const raw = await fs.readFile(absConvertedPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
const startedAtMs = Date.now();
const metrics = {
sourcePath: absConvertedPath,
total: products.length,
processed: 0,
created: 0,
updated: 0,
failed: 0,
errors: [],
startedAt: new Date(startedAtMs).toISOString(),
finishedAt: null,
durationSeconds: 0,
successRate: 0
};
for (let i = 0; i < products.length; i += 1) {
const product = products[i];
const label = product?.attributes?.product_name || product?.id || `item-${i + 1}`;
try {
const result = await upsertShopifyProductFull({
shop,
accessToken,
product,
locationId,
enableSeo,
apiVersion
});
metrics.processed += 1;
if (result.action === "created") metrics.created += 1;
if (result.action === "updated") metrics.updated += 1;
if ((i + 1) % 10 === 0 || i === products.length - 1) {
console.log(
`[SHOPIFY] ${i + 1}/${products.length} processed | created=${metrics.created} updated=${metrics.updated} failed=${metrics.failed}`
);
}
} catch (error) {
metrics.processed += 1;
metrics.failed += 1;
metrics.errors.push({
index: i + 1,
product: label,
error: error.message
});
console.log(`[SHOPIFY-FAIL] ${label} -> ${error.message}`);
}
}
const endedAtMs = Date.now();
metrics.finishedAt = new Date(endedAtMs).toISOString();
metrics.durationSeconds = Number(((endedAtMs - startedAtMs) / 1000).toFixed(2));
metrics.successRate = metrics.total > 0
? Number((((metrics.total - metrics.failed) / metrics.total) * 100).toFixed(2))
: 0;
return metrics;
}
async function runFullKytPipeline(options = {}) {
const cli = parseCliArgs();
const onProgress = typeof options.onProgress === "function" ? options.onProgress : null;
const emitProgress = (stepIndex, stepKey, message) => {
if (onProgress) {
onProgress({
stepIndex,
totalSteps: 6,
stepKey,
message
});
}
};
emitProgress(1, "fetchWebsiteData", "Fetching KYT website data");
console.log("[PIPELINE 1/6] Fetching KYT website data...");
const dataSummary = await getKytIndiaWebsiteData();
let aggregatedPathForRun = DEFAULT_AGGREGATED_JSON;
let imagesDirForRun = "data/02_downloaded_product_images";
let limitSummary = null;
if (cli.limit) {
limitSummary = await buildLimitedAggregatedJson(DEFAULT_AGGREGATED_JSON, cli.limit);
aggregatedPathForRun = path.relative(process.cwd(), limitSummary.outputPath).replace(/\\/g, "/");
imagesDirForRun = `data/02_downloaded_product_images.limit_${cli.limit}`;
console.log(
`[PIPELINE] Limit applied: first ${limitSummary.totalAfter} of ${limitSummary.totalBefore} products -> ${aggregatedPathForRun}`
);
}
emitProgress(2, "downloadImages", "Downloading product images");
console.log("\n[PIPELINE 2/6] Downloading product images...");
const downloadSummary = await downloadProductImagesFromAggregatedJson({
jsonPath: aggregatedPathForRun,
outputDir: imagesDirForRun
});
emitProgress(3, "watermarkImages", "Applying watermark to downloaded images");
console.log("\n[PIPELINE 3/6] Applying watermark in-place...");
const watermarkPathForRun = process.env.WATERMARK_PATH || "data/watermark.png";
let watermarkSummary;
const absWatermarkPath = path.resolve(process.cwd(), watermarkPathForRun);
if (!fsSync.existsSync(absWatermarkPath)) {
watermarkSummary = {
skipped: true,
reason: `Watermark file not found: ${absWatermarkPath}`,
imagesDir: path.resolve(process.cwd(), imagesDirForRun),
watermarkPath: absWatermarkPath,
totalImagesFound: 0,
processed: 0,
skippedCount: 0,
failed: 0
};
console.log(`[PIPELINE 3/6] Watermark stage skipped. File missing: ${absWatermarkPath}`);
} else {
const watermarkRequired = readBooleanEnv("WATERMARK_REQUIRED", false);
try {
watermarkSummary = await applyWatermarkToDownloadedImages({
imagesDir: imagesDirForRun,
watermarkPath: watermarkPathForRun
});
} catch (error) {
if (String(error?.message || "").includes("ENOENT") && !watermarkRequired) {
watermarkSummary = {
skipped: true,
reason: `Watermark stage failed with ENOENT and was skipped: ${error.message}`,
imagesDir: path.resolve(process.cwd(), imagesDirForRun),
watermarkPath: absWatermarkPath,
totalImagesFound: 0,
processed: 0,
skippedCount: 0,
failed: 0
};
console.log(`[PIPELINE 3/6] Watermark stage skipped after ENOENT: ${error.message}`);
} else {
throw error;
}
}
}
emitProgress(4, "uploadImagesToShopifyFiles", "Uploading watermarked images to Shopify Files");
console.log("\n[PIPELINE 4/6] Uploading watermarked images to Shopify Files...");
const imageUploadEnabled = readBooleanEnv("SHOPIFY_ENABLE_IMAGE_UPLOAD", true);
const imageUploadRequired = readBooleanEnv("SHOPIFY_IMAGE_UPLOAD_REQUIRED", false);
let imageUploadSummary;
if (!imageUploadEnabled) {
imageUploadSummary = {
skipped: true,
reason: "SHOPIFY_ENABLE_IMAGE_UPLOAD=false",
totalTasks: 0,
processed: 0,
uploaded: 0,
failed: 0
};
console.log("[PIPELINE 4/6] Image upload stage skipped by config.");
} else {
try {
imageUploadSummary = await uploadKytWatermarkedImagesToShopifyFiles({
shop: process.env.SHOPIFY_SHOP,
accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10",
aggregatedJsonPath: aggregatedPathForRun,
imagesDir: imagesDirForRun,
statePath: "data/04_shopify_image_upload_state.json",
mapPath: DEFAULT_UPLOADED_MAP_JSON
});
} catch (error) {
const message = String(error?.message || "Unknown image upload error");
imageUploadSummary = {
skipped: true,
reason: message,
totalTasks: 0,
processed: 0,
uploaded: 0,
failed: 0
};
console.log(`[PIPELINE 4/6] Image upload stage failed: ${message}`);
if (imageUploadRequired) {
throw error;
}
console.log("[PIPELINE 4/6] Continuing pipeline without image upload (SHOPIFY_IMAGE_UPLOAD_REQUIRED=false).");
}
}
emitProgress(5, "convertToShopifyReady", "Converting KYT data to Shopify-ready products");
console.log("\n[PIPELINE 5/6] Converting KYT data to Shopify-ready products...");
const conversionSummary = await convertAggregatedToShopifyReady({
inputPath: aggregatedPathForRun,
outputPath: DEFAULT_SHOPIFY_READY_JSON,
uploadedImageMapPath: DEFAULT_UPLOADED_MAP_JSON,
imageBaseUrl: process.env.KYT_IMAGE_BASE_URL || "",
brand: process.env.SHOPIFY_BRAND || "KYT"
});
emitProgress(6, "upsertToShopify", "Upserting products to Shopify");
console.log("\n[PIPELINE 6/6] Upserting products to Shopify...");
const shopifyUpsertSummary = await upsertShopifyProductsFromConverted({
convertedPath: DEFAULT_SHOPIFY_READY_JSON,
shop: process.env.SHOPIFY_SHOP,
accessToken: process.env.SHOPIFY_ACCESS_TOKEN,
locationId: process.env.SHOPIFY_LOCATION_ID || null,
enableSeo: readBooleanEnv("SHOPIFY_ENABLE_SEO", false),
apiVersion: process.env.SHOPIFY_API_VERSION || "2025-10"
});
const summary = {
completedAt: new Date().toISOString(),
runLogPath: RUN_LOGGER.logPath,
limit: cli.limit || null,
limitSummary,
steps: {
fetchWebsiteData: dataSummary,
downloadImages: downloadSummary,
watermarkImages: watermarkSummary,
uploadImagesToShopifyFiles: imageUploadSummary,
convertToShopifyReady: conversionSummary,
upsertToShopify: shopifyUpsertSummary
},
upcomingSteps: []
};
emitProgress(6, "completed", "KYT pipeline completed");
console.log("\n=== FULL PIPELINE SUMMARY ===");
console.log(JSON.stringify(summary, null, 2));
console.log("\n=== SUMMARY TABLE ===");
console.table([
{
step: "fetchWebsiteData",
total: dataSummary?.analysis?.totalProductsUnique ?? "",
processed: dataSummary?.analysis?.detailFetchedNow ?? "",
skipped: dataSummary?.analysis?.cachedDetailReused ?? "",
failed: dataSummary?.analysis?.detailFailed ?? ""
},
{
step: "downloadImages",
total: downloadSummary?.totalImagesFound ?? "",
processed: downloadSummary?.downloaded ?? "",
skipped: downloadSummary?.skipped ?? "",
failed: downloadSummary?.failed ?? ""
},
{
step: "watermarkImages",
total: watermarkSummary?.totalImagesFound ?? "",
processed: watermarkSummary?.processed ?? "",
skipped: watermarkSummary?.skipped ?? "",
failed: watermarkSummary?.failed ?? ""
},
{
step: "uploadImagesToShopifyFiles",
total: imageUploadSummary?.totalTasks ?? "",
processed: imageUploadSummary?.processed ?? "",
skipped: imageUploadSummary?.skipped ?? "",
failed: imageUploadSummary?.failed ?? ""
},
{
step: "convertToShopifyReady",
total: conversionSummary?.totalProducts ?? "",
processed: conversionSummary?.totalProducts ?? "",
skipped: "",
failed: ""
},
{
step: "upsertToShopify",
total: shopifyUpsertSummary?.total ?? "",
processed: shopifyUpsertSummary?.processed ?? "",
skipped: "",
failed: shopifyUpsertSummary?.failed ?? ""
}
]);
return summary;
}
module.exports = {
runFullKytPipeline,
convertAggregatedToShopifyReady,
upsertShopifyProductsFromConverted
};
if (require.main === module) {
runFullKytPipeline().catch((error) => {
console.error("Full pipeline failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1,479 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const XLSX = require("xlsx");
const BASE_URL = "https://kytindia.com/server/public/api/products";
const DATA_DIR = "data";
const OUT_JSON = path.join(DATA_DIR, "01_products_aggregated.json");
const OUT_HISTORY_JSON = path.join(DATA_DIR, "01_products_run_history.json");
const OUT_XLSX = path.join(DATA_DIR, "01_products_aggregated.xlsx");
const COMMON_HEADERS = {
accept: "application/json, text/plain, */*",
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8,ta;q=0.7",
"cache-control": "no-cache",
"content-type": "application/json",
pragma: "no-cache",
priority: "u=1, i",
"sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin"
};
const REQUEST_BASE = {
method: "POST",
mode: "cors",
credentials: "omit",
referrer: "https://kytindia.com/products/racing",
headers: COMMON_HEADERS
};
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function postJson(endpoint, body, extraHeaders = {}) {
const response = await fetch(`${BASE_URL}/${endpoint}`, {
...REQUEST_BASE,
headers: { ...COMMON_HEADERS, ...extraHeaders },
body: JSON.stringify(body)
});
if (!response.ok) {
const err = new Error(`HTTP ${response.status} ${response.statusText} for ${endpoint}`);
err.status = response.status;
const retryAfter = response.headers.get("retry-after");
if (retryAfter) {
err.retryAfterMs = Number.parseInt(retryAfter, 10) * 1000;
}
throw err;
}
return response.json();
}
async function postJsonWithRetry(endpoint, body, extraHeaders = {}, maxAttempts = 5) {
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
return await postJson(endpoint, body, extraHeaders);
} catch (error) {
const isRetriable = error?.status === 429 || (error?.status >= 500 && error?.status <= 599);
const isLast = attempt === maxAttempts;
if (!isRetriable || isLast) {
throw error;
}
const waitMs = error?.retryAfterMs || attempt * 1200;
console.log(`[RETRY] ${endpoint} attempt ${attempt}/${maxAttempts} failed. Waiting ${waitMs}ms...`);
await sleep(waitMs);
}
}
throw new Error(`Retry loop failed unexpectedly for ${endpoint}`);
}
function getProductsArray(payload) {
if (!payload || typeof payload !== "object") {
return [];
}
if (Array.isArray(payload.data)) {
return payload.data;
}
if (Array.isArray(payload.products)) {
return payload.products;
}
if (payload.data && Array.isArray(payload.data.products)) {
return payload.data.products;
}
return [];
}
function getUniqueProducts(combinedProducts) {
const uniqueById = new Map();
for (const item of combinedProducts) {
const id = item?.product?.id;
const key = id || `${item.subCategory}::${item.modelSeries}::${item?.product?.name || "unknown"}`;
if (!uniqueById.has(key)) {
uniqueById.set(key, item);
}
}
return uniqueById;
}
async function mapWithConcurrency(items, concurrency, mapper) {
const results = new Array(items.length);
let nextIndex = 0;
async function worker() {
while (true) {
const current = nextIndex;
nextIndex += 1;
if (current >= items.length) {
return;
}
results[current] = await mapper(items[current], current);
}
}
const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker());
await Promise.all(workers);
return results;
}
async function readHistory(filePath) {
try {
const raw = await fs.readFile(filePath, "utf8");
const parsed = JSON.parse(raw);
return Array.isArray(parsed) ? parsed : [];
} catch {
return [];
}
}
async function readExistingAggregated(filePath) {
try {
const raw = await fs.readFile(filePath, "utf8");
const parsed = JSON.parse(raw);
if (!parsed || typeof parsed !== "object") {
return { products: [], generatedAt: null };
}
return {
generatedAt: parsed.generatedAt || null,
products: Array.isArray(parsed.products) ? parsed.products : []
};
} catch {
return { products: [], generatedAt: null };
}
}
function getProductKeyFromListing(item) {
const id = item?.product?.id || "";
if (id) {
return id;
}
return `${item?.subCategory || ""}::${item?.modelSeries || ""}::${item?.product?.name || "unknown"}`;
}
function getProductKeyFromDetailed(item) {
const id = item?.productId || item?.productSummary?.id || "";
if (id) {
return id;
}
return `${item?.subCategory || ""}::${item?.modelSeries || ""}::${item?.productSummary?.name || "unknown"}`;
}
function normalizeImageList(images) {
if (!Array.isArray(images)) {
return [];
}
return images.map((x) => String(x || ""));
}
function getListingFingerprint(item) {
const id = item?.product?.id || "";
const name = item?.product?.name || "";
const mrp = item?.product?.cost?.mrp ?? null;
const sortOrder = item?.product?.sort_order ?? null;
const images = normalizeImageList(item?.product?.img);
const subCategory = item?.subCategory || "";
const modelSeries = item?.modelSeries || "";
return JSON.stringify({ id, name, mrp, sortOrder, images, subCategory, modelSeries });
}
function getDetailedFingerprint(item) {
const id = item?.productId || item?.productSummary?.id || "";
const name = item?.productSummary?.name || "";
const mrp = item?.productSummary?.cost?.mrp ?? null;
const sortOrder = item?.productSummary?.sort_order ?? null;
const images = normalizeImageList(item?.productSummary?.img);
const subCategory = item?.subCategory || "";
const modelSeries = item?.modelSeries || "";
return JSON.stringify({ id, name, mrp, sortOrder, images, subCategory, modelSeries });
}
function buildExcelRows(aggregatedProducts) {
return aggregatedProducts.map((item) => {
const product = item.productSummary || {};
const details = item.productDetails || {};
return {
product_id: item.productId || "",
sub_category_code: item.subCategory || "",
model_series_code: item.modelSeries || "",
product_name: product.name || "",
sub_category_name: product.sub_category_name || "",
model_series_name: product.model_series_name || "",
mrp: product?.cost?.mrp ?? "",
image_count: Array.isArray(product?.img) ? product.img.length : 0,
detail_servercode: details?.servercode || "",
detail_name: details?.data?.name || "",
detail_sort_order: details?.data?.sort_order ?? "",
detail_payload_json: JSON.stringify(details)
};
});
}
function writeExcel(filePath, rows, analysis) {
const wb = XLSX.utils.book_new();
const productsSheet = XLSX.utils.json_to_sheet(rows);
XLSX.utils.book_append_sheet(wb, productsSheet, "Products");
const analysisRows = [
{ key: "timestamp", value: analysis.timestamp },
{ key: "totalSubCategories", value: analysis.totalSubCategories },
{ key: "totalModelSeriesChecked", value: analysis.totalModelSeriesChecked },
{ key: "successfulCalls", value: analysis.successfulCalls },
{ key: "failedCalls", value: analysis.failedCalls },
{ key: "totalProductsCollectedRaw", value: analysis.totalProductsCollectedRaw },
{ key: "totalProductsUnique", value: analysis.totalProductsUnique },
{ key: "detailSuccess", value: analysis.detailSuccess },
{ key: "detailFailed", value: analysis.detailFailed }
];
const analysisSheet = XLSX.utils.json_to_sheet(analysisRows);
XLSX.utils.book_append_sheet(wb, analysisSheet, "Analysis");
XLSX.writeFile(wb, filePath);
}
async function getKytIndiaWebsiteData() {
try {
const outputJsonPath = path.resolve(process.cwd(), OUT_JSON);
const outputDir = path.dirname(outputJsonPath);
await fs.mkdir(outputDir, { recursive: true });
const existingAggregated = await readExistingAggregated(outputJsonPath);
console.log(
`[CACHE] Loaded ${existingAggregated.products.length} existing products from ${OUT_JSON}` +
(existingAggregated.generatedAt ? ` (generatedAt: ${existingAggregated.generatedAt})` : "")
);
console.log("[1/5] Fetching sidebar master data...");
const sidebar = await postJsonWithRetry("get-sidebar-master-data", {});
const subcategories = Array.isArray(sidebar?.data) ? sidebar.data : [];
const allRuns = [];
const combinedProducts = [];
console.log("[2/5] Fetching product lists by subcategory/model...");
for (const subcategory of subcategories) {
const subCategoryCode = subcategory?.code;
const models = Array.isArray(subcategory?.models) ? subcategory.models : [];
if (!subCategoryCode || models.length === 0) {
continue;
}
for (const model of models) {
const modelSeries = model?.url;
if (!modelSeries) {
continue;
}
const payload = {
sub_category: subCategoryCode,
model_series: modelSeries
};
try {
const productRes = await postJsonWithRetry("get-product_list-with-model_series", payload);
const products = getProductsArray(productRes);
allRuns.push({
subCategory: subCategoryCode,
modelSeries,
productsCount: products.length,
servercode: productRes?.servercode || "UNKNOWN"
});
for (const product of products) {
combinedProducts.push({
subCategory: subCategoryCode,
modelSeries,
product
});
}
console.log(`[OK] ${subCategoryCode} / ${modelSeries} -> ${products.length} products`);
} catch (error) {
allRuns.push({
subCategory: subCategoryCode,
modelSeries,
productsCount: 0,
servercode: "FAILED",
error: error.message
});
console.log(`[FAIL] ${subCategoryCode} / ${modelSeries} -> ${error.message}`);
}
}
}
const uniqueById = getUniqueProducts(combinedProducts);
const uniqueProducts = Array.from(uniqueById.values());
const existingByKey = new Map();
for (const item of existingAggregated.products) {
existingByKey.set(getProductKeyFromDetailed(item), item);
}
const reusedProducts = [];
const productsToFetch = [];
for (const item of uniqueProducts) {
const key = getProductKeyFromListing(item);
const cached = existingByKey.get(key);
if (!cached) {
productsToFetch.push(item);
continue;
}
const currentFp = getListingFingerprint(item);
const cachedFp = getDetailedFingerprint(cached);
if (currentFp === cachedFp) {
reusedProducts.push({
...cached,
productId: item?.product?.id || cached.productId || "",
subCategory: item.subCategory,
modelSeries: item.modelSeries,
productSummary: item.product
});
} else {
productsToFetch.push(item);
}
}
console.log(
`[3/5] Incremental sync: total unique=${uniqueProducts.length}, ` +
`reused=${reusedProducts.length}, toFetch=${productsToFetch.length}`
);
const fetchedDetailedProducts = await mapWithConcurrency(productsToFetch, 2, async (item, index) => {
const productId = item?.product?.id;
if (!productId) {
return {
productId: "",
subCategory: item.subCategory,
modelSeries: item.modelSeries,
productSummary: item.product,
productDetails: { servercode: "FAILED", error: "Missing product id" }
};
}
try {
const details = await postJsonWithRetry(
"get-product-details-by-id",
{ product_id: productId },
{ Referer: `https://kytindia.com/product/${productId}` },
6
);
if ((index + 1) % 20 === 0 || index === productsToFetch.length - 1) {
console.log(`[DETAIL] ${index + 1}/${productsToFetch.length} completed`);
}
return {
productId,
subCategory: item.subCategory,
modelSeries: item.modelSeries,
productSummary: item.product,
productDetails: details
};
} catch (error) {
console.log(`[DETAIL-FAIL] ${productId} -> ${error.message}`);
return {
productId,
subCategory: item.subCategory,
modelSeries: item.modelSeries,
productSummary: item.product,
productDetails: { servercode: "FAILED", error: error.message }
};
}
});
const detailedProducts = [...reusedProducts, ...fetchedDetailedProducts];
const detailSuccess = detailedProducts.filter((x) => x?.productDetails?.servercode !== "FAILED").length;
const detailFailed = detailedProducts.length - detailSuccess;
const analysis = {
timestamp: new Date().toISOString(),
totalSubCategories: subcategories.length,
totalModelSeriesChecked: allRuns.length,
successfulCalls: allRuns.filter((x) => x.servercode !== "FAILED").length,
failedCalls: allRuns.filter((x) => x.servercode === "FAILED").length,
totalProductsCollectedRaw: combinedProducts.length,
totalProductsUnique: uniqueProducts.length,
detailSuccess,
detailFailed,
cachedDetailReused: reusedProducts.length,
detailFetchedNow: fetchedDetailedProducts.length,
perModelCounts: allRuns
};
const finalPayload = {
generatedAt: analysis.timestamp,
analysis,
products: detailedProducts
};
console.log("[4/5] Writing JSON outputs...");
await fs.writeFile(outputJsonPath, JSON.stringify(finalPayload, null, 2), "utf8");
const historyPath = path.resolve(process.cwd(), OUT_HISTORY_JSON);
const history = await readHistory(historyPath);
history.push(analysis);
await fs.writeFile(historyPath, JSON.stringify(history, null, 2), "utf8");
console.log("[5/5] Writing Excel output...");
const excelRows = buildExcelRows(detailedProducts);
const excelPath = path.resolve(process.cwd(), OUT_XLSX);
writeExcel(excelPath, excelRows, analysis);
console.log("\n=== FINAL ANALYSIS ===");
console.log(JSON.stringify(analysis, null, 2));
console.log("\nSaved files:");
console.log(`- ${outputJsonPath}`);
console.log(`- ${historyPath}`);
console.log(`- ${excelPath}`);
return {
analysis,
outputJsonPath,
historyPath,
excelPath
};
} catch (error) {
console.error("Pipeline failed:", error.message);
throw error;
}
}
module.exports = {
getKytIndiaWebsiteData
};
if (require.main === module) {
getKytIndiaWebsiteData().catch(() => {
process.exitCode = 1;
});
}

View File

@ -0,0 +1,220 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const DEFAULT_JSON_PATH = path.join("data", "01_products_aggregated.json");
const DEFAULT_OUTPUT_DIR = path.join("data", "02_downloaded_product_images");
const DEFAULT_BASE_IMAGE_URL = "https://kytindia.com/server/public/uploads";
function sanitizeName(value) {
return String(value || "")
.replace(/[<>:"/\\|?*\x00-\x1F]/g, "_")
.replace(/\s+/g, " ")
.trim()
.slice(0, 150);
}
function encodePathKeepingSlashes(relativePath) {
return String(relativePath || "")
.split("/")
.map((segment) => encodeURIComponent(segment))
.join("/");
}
function getExtFromPathOrType(imagePath, contentType) {
const parsed = path.extname(imagePath || "").toLowerCase();
if (parsed) {
return parsed;
}
if (!contentType) {
return ".jpg";
}
if (contentType.includes("png")) return ".png";
if (contentType.includes("webp")) return ".webp";
if (contentType.includes("gif")) return ".gif";
if (contentType.includes("jpeg") || contentType.includes("jpg")) return ".jpg";
return ".jpg";
}
async function downloadWithRetry(url, retries = 3) {
let lastError;
for (let attempt = 1; attempt <= retries; attempt += 1) {
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status} ${response.statusText}`);
}
const buffer = Buffer.from(await response.arrayBuffer());
return { buffer, contentType: response.headers.get("content-type") || "" };
} catch (error) {
lastError = error;
if (attempt < retries) {
await new Promise((resolve) => setTimeout(resolve, attempt * 500));
}
}
}
throw lastError;
}
async function mapWithConcurrency(items, concurrency, worker) {
const results = new Array(items.length);
let index = 0;
async function runWorker() {
while (true) {
const current = index;
index += 1;
if (current >= items.length) {
return;
}
results[current] = await worker(items[current], current);
}
}
const workers = Array.from({ length: Math.max(1, Math.min(concurrency, items.length)) }, () => runWorker());
await Promise.all(workers);
return results;
}
async function downloadProductImagesFromAggregatedJson(options = {}) {
const {
jsonPath = DEFAULT_JSON_PATH,
outputDir = DEFAULT_OUTPUT_DIR,
baseImageUrl = DEFAULT_BASE_IMAGE_URL,
concurrency = 8,
skipExisting = true
} = options;
const absJsonPath = path.resolve(process.cwd(), jsonPath);
const absOutputDir = path.resolve(process.cwd(), outputDir);
const raw = await fs.readFile(absJsonPath, "utf8");
const parsed = JSON.parse(raw);
const products = Array.isArray(parsed?.products) ? parsed.products : [];
await fs.mkdir(absOutputDir, { recursive: true });
const usedFolderNames = new Set();
const tasks = [];
for (let i = 0; i < products.length; i += 1) {
const product = products[i] || {};
const productId = product.productId || `unknown-${i + 1}`;
const productNameRaw =
product?.productSummary?.name ||
product?.productDetails?.data?.name ||
`product-${i + 1}`;
let folderName = sanitizeName(productNameRaw) || `product-${i + 1}`;
if (usedFolderNames.has(folderName)) {
folderName = `${folderName}__${String(productId).slice(0, 8)}`;
}
usedFolderNames.add(folderName);
const productDir = path.join(absOutputDir, folderName);
const imagePaths = Array.isArray(product?.productSummary?.img)
? product.productSummary.img
: Array.isArray(product?.productDetails?.data?.img)
? product.productDetails.data.img
: [];
const usedFileNames = new Set();
for (let idx = 0; idx < imagePaths.length; idx += 1) {
const relPath = imagePaths[idx];
const encoded = encodePathKeepingSlashes(relPath);
const imageUrl = `${baseImageUrl.replace(/\/+$/, "")}/${encoded.replace(/^\/+/, "")}`;
const originalBase = path.basename(String(relPath || ""), path.extname(String(relPath || ""))) || `image_${idx + 1}`;
const originalExt = path.extname(String(relPath || "")) || ".png";
let localBase = sanitizeName(originalBase) || `image_${idx + 1}`;
let fileName = `${localBase}${originalExt}`;
let dupCounter = 2;
while (usedFileNames.has(fileName.toLowerCase())) {
fileName = `${localBase}_${dupCounter}${originalExt}`;
dupCounter += 1;
}
usedFileNames.add(fileName.toLowerCase());
tasks.push({ productDir, fileName, imageUrl, relPath, productName: productNameRaw, productId });
}
}
let downloaded = 0;
let skipped = 0;
let failed = 0;
await mapWithConcurrency(tasks, concurrency, async (task, taskIndex) => {
const filePath = path.join(task.productDir, task.fileName);
try {
await fs.mkdir(task.productDir, { recursive: true });
if (skipExisting) {
try {
await fs.access(filePath);
skipped += 1;
return;
} catch {
// file does not exist, continue
}
}
const { buffer, contentType } = await downloadWithRetry(task.imageUrl);
const currentExt = path.extname(task.fileName);
const expectedExt = getExtFromPathOrType(task.relPath, contentType);
let finalFilePath = filePath;
if (!currentExt && expectedExt) {
finalFilePath = `${filePath}${expectedExt}`;
}
await fs.writeFile(finalFilePath, buffer);
downloaded += 1;
if ((taskIndex + 1) % 50 === 0 || taskIndex === tasks.length - 1) {
console.log(`[IMAGES] ${taskIndex + 1}/${tasks.length} processed`);
}
} catch (error) {
failed += 1;
console.log(`[IMAGE-FAIL] ${task.productName} | ${task.imageUrl} -> ${error.message}`);
}
});
const summary = {
jsonPath: absJsonPath,
outputDir: absOutputDir,
productsCount: products.length,
totalImagesFound: tasks.length,
downloaded,
skipped,
failed
};
return summary;
}
module.exports = {
downloadProductImagesFromAggregatedJson
};
if (require.main === module) {
downloadProductImagesFromAggregatedJson()
.then((summary) => {
console.log("\nDownload summary:");
console.log(JSON.stringify(summary, null, 2));
})
.catch((error) => {
console.error("Image download failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1,250 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const sharp = require("sharp");
const DEFAULT_IMAGES_DIR = path.join("data", "02_downloaded_product_images");
const DEFAULT_WATERMARK_PATH = path.join("data", "watermark.png");
const DEFAULT_STATE_PATH = path.join("data", "03_watermark_state.json");
const WATERMARK_ENGINE_VERSION = 1;
const IMAGE_EXTENSIONS = new Set([
".jpg",
".jpeg",
".png",
".webp",
".tif",
".tiff",
".avif"
]);
async function getAllImageFilesRecursively(rootDir) {
const files = [];
async function walk(currentDir) {
const entries = await fs.readdir(currentDir, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(currentDir, entry.name);
if (entry.isDirectory()) {
await walk(fullPath);
continue;
}
if (!entry.isFile()) {
continue;
}
const ext = path.extname(entry.name).toLowerCase();
if (IMAGE_EXTENSIONS.has(ext)) {
files.push(fullPath);
}
}
}
await walk(rootDir);
return files;
}
async function readStateFile(statePath) {
try {
const raw = await fs.readFile(statePath, "utf8");
const parsed = JSON.parse(raw);
if (!parsed || typeof parsed !== "object") {
return {};
}
return parsed;
} catch {
return {};
}
}
async function getFileFingerprint(filePath) {
const stat = await fs.stat(filePath);
return `${stat.size}:${Math.trunc(stat.mtimeMs)}`;
}
async function createTiledWatermark(imageBuffer, watermarkPath) {
const image = sharp(imageBuffer);
const imageMeta = await image.metadata();
const width = imageMeta.width || 0;
const height = imageMeta.height || 0;
if (width === 0 || height === 0) {
throw new Error("Image has invalid dimensions");
}
const scaledWatermark = await sharp(watermarkPath)
.resize({ width: Math.floor(Math.min(width, height) / 3) })
.toBuffer();
const wmMeta = await sharp(scaledWatermark).metadata();
const wmWidth = wmMeta.width || 1;
const wmHeight = wmMeta.height || 1;
const positions = [];
for (let y = 0; y < height; y += wmHeight) {
for (let x = 0; x < width; x += wmWidth) {
positions.push({ input: scaledWatermark, left: x, top: y });
}
}
return sharp(imageBuffer).composite(positions).toBuffer();
}
async function watermarkImageInPlace(imagePath, watermarkPath) {
const ext = path.extname(imagePath).toLowerCase();
let image = sharp(imagePath, { animated: false });
let meta = await image.metadata();
let width = meta.width || 0;
let height = meta.height || 0;
const format = (meta.format || "").toLowerCase();
if (width === 0 || height === 0) {
throw new Error("Image has invalid metadata");
}
// Replicates the old logic: minimum size 500x500 before watermarking.
if (width < 500 || height < 500) {
const newWidth = width < 500 ? 500 : width;
const newHeight = height < 500 ? 500 : height;
image = image.resize(newWidth, newHeight);
width = newWidth;
height = newHeight;
}
let baseBuffer;
if (format === "png" || ext === ".png") {
const background = sharp({
create: {
width,
height,
channels: 4,
background: { r: 255, g: 255, b: 255, alpha: 1 }
}
});
baseBuffer = await background
.composite([{ input: await image.toBuffer(), gravity: "center" }])
.png()
.toBuffer();
} else {
baseBuffer = await image.jpeg().toBuffer();
}
const watermarkedBuffer = await createTiledWatermark(baseBuffer, watermarkPath);
const writer = sharp(watermarkedBuffer);
if (ext === ".png") {
await writer.png().toFile(imagePath);
} else if (ext === ".webp") {
await writer.webp().toFile(imagePath);
} else if (ext === ".avif") {
await writer.avif().toFile(imagePath);
} else if (ext === ".tif" || ext === ".tiff") {
await writer.tiff().toFile(imagePath);
} else {
await writer.jpeg().toFile(imagePath);
}
}
async function applyWatermarkToDownloadedImages(options = {}) {
const {
imagesDir = DEFAULT_IMAGES_DIR,
watermarkPath = DEFAULT_WATERMARK_PATH,
statePath = DEFAULT_STATE_PATH
} = options;
const absImagesDir = path.resolve(process.cwd(), imagesDir);
const absWatermarkPath = path.resolve(process.cwd(), watermarkPath);
const absStatePath = path.resolve(process.cwd(), statePath);
await fs.access(absImagesDir);
await fs.access(absWatermarkPath);
await fs.mkdir(path.dirname(absStatePath), { recursive: true });
const watermarkFingerprint = await getFileFingerprint(absWatermarkPath);
const state = await readStateFile(absStatePath);
const stateVersion = Number(state.engineVersion || 0);
const stateWatermarkFingerprint = String(state.watermarkFingerprint || "");
const previousFiles = state.files && typeof state.files === "object" ? state.files : {};
const stateFiles =
stateVersion === WATERMARK_ENGINE_VERSION && stateWatermarkFingerprint === watermarkFingerprint
? { ...previousFiles }
: {};
const imageFiles = await getAllImageFilesRecursively(absImagesDir);
let processed = 0;
let skipped = 0;
let failed = 0;
for (let i = 0; i < imageFiles.length; i += 1) {
const imagePath = imageFiles[i];
const relativePath = path.relative(absImagesDir, imagePath);
try {
const beforeFingerprint = await getFileFingerprint(imagePath);
if (stateFiles[relativePath] === beforeFingerprint) {
skipped += 1;
} else {
await watermarkImageInPlace(imagePath, absWatermarkPath);
const afterFingerprint = await getFileFingerprint(imagePath);
stateFiles[relativePath] = afterFingerprint;
processed += 1;
}
} catch (error) {
failed += 1;
console.log(`[WATERMARK-FAIL] ${imagePath} -> ${error.message}`);
}
if ((i + 1) % 50 === 0 || i === imageFiles.length - 1) {
console.log(`[WATERMARK] ${i + 1}/${imageFiles.length} processed`);
}
}
// Keep state only for files that still exist.
const currentSet = new Set(imageFiles.map((x) => path.relative(absImagesDir, x)));
for (const rel of Object.keys(stateFiles)) {
if (!currentSet.has(rel)) {
delete stateFiles[rel];
}
}
const finalState = {
engineVersion: WATERMARK_ENGINE_VERSION,
watermarkFingerprint,
updatedAt: new Date().toISOString(),
files: stateFiles
};
await fs.writeFile(absStatePath, JSON.stringify(finalState, null, 2), "utf8");
return {
imagesDir: absImagesDir,
watermarkPath: absWatermarkPath,
statePath: absStatePath,
totalImagesFound: imageFiles.length,
processed,
skipped,
failed
};
}
module.exports = {
applyWatermarkToDownloadedImages
};
if (require.main === module) {
applyWatermarkToDownloadedImages()
.then((summary) => {
console.log("\nWatermark summary:");
console.log(JSON.stringify(summary, null, 2));
})
.catch((error) => {
console.error("Watermark run failed:", error.message);
process.exitCode = 1;
});
}

View File

@ -0,0 +1,436 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const DEFAULT_AGGREGATED_JSON = path.join("data", "01_products_aggregated.json");
const DEFAULT_IMAGES_DIR = path.join("data", "02_downloaded_product_images");
const DEFAULT_STATE_PATH = path.join("data", "04_shopify_image_upload_state.json");
const DEFAULT_MAP_PATH = path.join("data", "04_shopify_uploaded_images_map.json");
function sanitizeName(value) {
return String(value || "")
.replace(/[<>:"/\\|?*\x00-\x1F]/g, "_")
.replace(/\s+/g, " ")
.trim()
.slice(0, 150);
}
function getMimeType(filePath) {
const ext = path.extname(filePath).toLowerCase();
if (ext === ".png") return "image/png";
if (ext === ".webp") return "image/webp";
if (ext === ".gif") return "image/gif";
if (ext === ".avif") return "image/avif";
if (ext === ".jpg" || ext === ".jpeg") return "image/jpeg";
if (ext === ".tif" || ext === ".tiff") return "image/tiff";
return "application/octet-stream";
}
async function fileFingerprint(filePath) {
const stat = await fs.stat(filePath);
return `${stat.size}:${Math.trunc(stat.mtimeMs)}`;
}
function createShopifyClient(shop, accessToken, apiVersion = "2025-10") {
return {
baseURL: `https://${shop}/admin/api/${apiVersion}/graphql.json`,
headers: {
"X-Shopify-Access-Token": accessToken,
"Content-Type": "application/json"
},
timeout: 30000
};
}
async function postGraphQL(client, query, variables = {}) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), client.timeout || 30000);
try {
const response = await fetch(client.baseURL, {
method: "POST",
headers: client.headers,
body: JSON.stringify({ query, variables }),
signal: controller.signal
});
const json = await response.json();
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${JSON.stringify(json)}`);
}
if (json.errors?.length) {
throw new Error(`GraphQL errors: ${json.errors.map((e) => e.message).join(", ")}`);
}
return json.data;
} finally {
clearTimeout(timer);
}
}
async function stagedUploadOneImage(client, localPath) {
const stat = await fs.stat(localPath);
const filename = path.basename(localPath);
const mimeType = getMimeType(localPath);
const staged = await postGraphQL(
client,
`mutation($input: [StagedUploadInput!]!) {
stagedUploadsCreate(input: $input) {
stagedTargets {
url
resourceUrl
parameters { name value }
}
userErrors { field message }
}
}`,
{
input: [
{
filename,
mimeType,
resource: "FILE",
fileSize: String(stat.size),
httpMethod: "POST"
}
]
}
);
const stagedErrors = staged.stagedUploadsCreate.userErrors || [];
if (stagedErrors.length) {
throw new Error(`stagedUploadsCreate failed: ${stagedErrors.map((e) => e.message).join(", ")}`);
}
const target = staged.stagedUploadsCreate.stagedTargets?.[0];
if (!target?.url || !target?.resourceUrl) {
throw new Error("stagedUploadsCreate returned no target.");
}
const bytes = await fs.readFile(localPath);
const form = new FormData();
for (const p of target.parameters || []) {
form.append(p.name, p.value);
}
form.append("file", new Blob([bytes], { type: mimeType }), filename);
const uploadRes = await fetch(target.url, { method: "POST", body: form });
if (!uploadRes.ok) {
const txt = await uploadRes.text();
throw new Error(`staged binary upload failed: HTTP ${uploadRes.status} ${txt.slice(0, 240)}`);
}
const created = await postGraphQL(
client,
`mutation($files: [FileCreateInput!]!) {
fileCreate(files: $files) {
files {
id
alt
fileStatus
... on MediaImage {
image { url }
}
... on GenericFile {
url
}
}
userErrors { field message }
}
}`,
{
files: [
{
alt: filename,
contentType: "IMAGE",
originalSource: target.resourceUrl
}
]
}
);
const createErrors = created.fileCreate.userErrors || [];
if (createErrors.length) {
throw new Error(`fileCreate failed: ${createErrors.map((e) => e.message).join(", ")}`);
}
const fileNode = created.fileCreate.files?.[0];
if (!fileNode?.id) {
throw new Error("fileCreate returned no file id.");
}
return { id: fileNode.id, fileStatus: fileNode.fileStatus || "UPLOADED", url: fileNode.image?.url || fileNode.url || "" };
}
async function waitForFileReady(client, fileId, maxAttempts = 20, delayMs = 1500) {
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
const data = await postGraphQL(
client,
`query($id: ID!) {
node(id: $id) {
id
... on File {
fileStatus
... on MediaImage {
image { url }
}
... on GenericFile {
url
}
}
}
}`,
{ id: fileId }
);
const node = data.node;
const status = node?.fileStatus || "UNKNOWN";
const url = node?.image?.url || node?.url || "";
if (status === "READY") {
return { status, url };
}
if (status === "FAILED") {
throw new Error(`File processing failed for ${fileId}`);
}
if (attempt < maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
throw new Error(`Timed out waiting for READY status for ${fileId}`);
}
function buildLocalImageTasks(aggregatedPayload, imagesDir) {
const products = Array.isArray(aggregatedPayload?.products) ? aggregatedPayload.products : [];
const absImagesDir = path.resolve(process.cwd(), imagesDir);
const usedFolderNames = new Set();
const tasks = [];
for (let i = 0; i < products.length; i += 1) {
const product = products[i] || {};
const productId = product.productId || product?.productDetails?.data?.id || product?.productSummary?.id || `unknown-${i + 1}`;
const productNameRaw =
product?.productSummary?.name ||
product?.productDetails?.data?.name ||
`product-${i + 1}`;
let folderName = sanitizeName(productNameRaw) || `product-${i + 1}`;
if (usedFolderNames.has(folderName)) {
folderName = `${folderName}__${String(productId).slice(0, 8)}`;
}
usedFolderNames.add(folderName);
const productDir = path.join(absImagesDir, folderName);
const imagePaths = Array.isArray(product?.productSummary?.img)
? product.productSummary.img
: Array.isArray(product?.productDetails?.data?.img)
? product.productDetails.data.img
: [];
const usedFileNames = new Set();
for (let idx = 0; idx < imagePaths.length; idx += 1) {
const sourcePath = imagePaths[idx];
const originalBase = path.basename(String(sourcePath || ""), path.extname(String(sourcePath || ""))) || `image_${idx + 1}`;
const originalExt = path.extname(String(sourcePath || "")) || ".png";
const localBase = sanitizeName(originalBase) || `image_${idx + 1}`;
let fileName = `${localBase}${originalExt}`;
let dupCounter = 2;
while (usedFileNames.has(fileName.toLowerCase())) {
fileName = `${localBase}_${dupCounter}${originalExt}`;
dupCounter += 1;
}
usedFileNames.add(fileName.toLowerCase());
tasks.push({
productId: String(productId),
productName: String(productNameRaw),
sourcePath: String(sourcePath),
localPath: path.join(productDir, fileName)
});
}
}
return tasks;
}
async function readJsonOrDefault(filePath, fallback) {
try {
const raw = await fs.readFile(filePath, "utf8");
const parsed = JSON.parse(raw);
return parsed && typeof parsed === "object" ? parsed : fallback;
} catch {
return fallback;
}
}
async function uploadKytWatermarkedImagesToShopifyFiles(options = {}) {
const {
shop = process.env.SHOPIFY_SHOP,
accessToken = process.env.SHOPIFY_ACCESS_TOKEN,
apiVersion = process.env.SHOPIFY_API_VERSION || "2025-10",
aggregatedJsonPath = DEFAULT_AGGREGATED_JSON,
imagesDir = DEFAULT_IMAGES_DIR,
statePath = DEFAULT_STATE_PATH,
mapPath = DEFAULT_MAP_PATH
} = options;
if (!shop) throw new Error("Missing shop (or SHOPIFY_SHOP).");
if (!accessToken) throw new Error("Missing accessToken (or SHOPIFY_ACCESS_TOKEN).");
const client = createShopifyClient(shop, accessToken, apiVersion);
const absAggregatedPath = path.resolve(process.cwd(), aggregatedJsonPath);
const absStatePath = path.resolve(process.cwd(), statePath);
const absMapPath = path.resolve(process.cwd(), mapPath);
const aggregated = await readJsonOrDefault(absAggregatedPath, { products: [] });
const tasks = buildLocalImageTasks(aggregated, imagesDir);
const state = await readJsonOrDefault(absStatePath, {
version: 1,
updatedAt: null,
files: {}
});
const stateFiles = state.files && typeof state.files === "object" ? state.files : {};
const byProduct = {};
const bySourcePath = {};
let processed = 0;
let uploaded = 0;
let skipped = 0;
let failed = 0;
for (let i = 0; i < tasks.length; i += 1) {
const task = tasks[i];
processed += 1;
try {
await fs.access(task.localPath);
} catch {
failed += 1;
continue;
}
try {
const fp = await fileFingerprint(task.localPath);
const prev = stateFiles[task.localPath];
if (prev && prev.fingerprint === fp && prev.status === "READY" && prev.url) {
skipped += 1;
console.log(`[IMG-UPLOAD-SKIP] ${task.productId} | ${task.sourcePath} | ${task.localPath}`);
if (!byProduct[task.productId]) byProduct[task.productId] = [];
byProduct[task.productId].push({
sourcePath: task.sourcePath,
localPath: task.localPath,
fileId: prev.fileId,
status: prev.status,
url: prev.url
});
bySourcePath[task.sourcePath] = {
productId: task.productId,
localPath: task.localPath,
fileId: prev.fileId,
status: prev.status,
url: prev.url
};
} else {
const created = await stagedUploadOneImage(client, task.localPath);
const ready = created.fileStatus === "READY"
? { status: "READY", url: created.url || "" }
: await waitForFileReady(client, created.id);
stateFiles[task.localPath] = {
fingerprint: fp,
uploadedAt: new Date().toISOString(),
fileId: created.id,
status: ready.status,
url: ready.url
};
uploaded += 1;
console.log(`[IMG-UPLOAD-OK] ${task.productId} | ${task.sourcePath} | fileId=${created.id} | status=${ready.status}`);
if (!byProduct[task.productId]) byProduct[task.productId] = [];
byProduct[task.productId].push({
sourcePath: task.sourcePath,
localPath: task.localPath,
fileId: created.id,
status: ready.status,
url: ready.url
});
bySourcePath[task.sourcePath] = {
productId: task.productId,
localPath: task.localPath,
fileId: created.id,
status: ready.status,
url: ready.url
};
}
} catch (error) {
failed += 1;
stateFiles[task.localPath] = {
fingerprint: null,
uploadedAt: new Date().toISOString(),
status: "FAILED",
error: error.message
};
console.log(`[IMG-UPLOAD-FAIL] ${task.productName} | ${task.localPath} -> ${error.message}`);
}
if ((i + 1) % 25 === 0 || i === tasks.length - 1) {
console.log(`[IMG-UPLOAD] ${i + 1}/${tasks.length} processed | uploaded=${uploaded} skipped=${skipped} failed=${failed}`);
}
}
const finalState = {
version: 1,
updatedAt: new Date().toISOString(),
files: stateFiles
};
const finalMap = {
generatedAt: new Date().toISOString(),
sourceAggregatedPath: absAggregatedPath,
totalTasks: tasks.length,
byProduct,
bySourcePath
};
await fs.mkdir(path.dirname(absStatePath), { recursive: true });
await fs.mkdir(path.dirname(absMapPath), { recursive: true });
await fs.writeFile(absStatePath, JSON.stringify(finalState, null, 2), "utf8");
await fs.writeFile(absMapPath, JSON.stringify(finalMap, null, 2), "utf8");
return {
aggregatedJsonPath: absAggregatedPath,
statePath: absStatePath,
mapPath: absMapPath,
totalTasks: tasks.length,
processed,
uploaded,
skipped,
failed
};
}
async function runStandaloneImageUpload() {
const summary = await uploadKytWatermarkedImagesToShopifyFiles();
console.log("\nImage upload summary:");
console.log(JSON.stringify(summary, null, 2));
}
if (require.main === module) {
runStandaloneImageUpload().catch((error) => {
console.error("Image upload pipeline failed:", error.message);
process.exitCode = 1;
});
}
module.exports = {
uploadKytWatermarkedImagesToShopifyFiles,
runStandaloneImageUpload
};

View File

@ -0,0 +1,213 @@
const fs = require("fs");
const path = require("path");
function escapeHtml(input) {
return String(input || "")
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/\"/g, "&quot;")
.replace(/'/g, "&#39;");
}
function slugify(str) {
return String(str || "")
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-+|-+$/g, "");
}
function buildDescriptionHtml(details) {
const blocks = [];
if (details?.description) {
const paragraph = escapeHtml(details.description).replace(/\n+/g, "<br/>");
blocks.push(`<p>${paragraph}</p>`);
}
if (Array.isArray(details?.specifications) && details.specifications.length > 0) {
const specItems = details.specifications
.filter((s) => s && s.name && s.text)
.map((s) => `<li><strong>${escapeHtml(s.name)}:</strong> ${escapeHtml(s.text)}</li>`);
if (specItems.length) {
blocks.push("<h3>Specifications</h3>");
blocks.push(`<ul>${specItems.join("")}</ul>`);
}
}
if (details?.weight?.text) {
blocks.push(`<p><strong>Weight:</strong> ${escapeHtml(details.weight.text)}</p>`);
}
return blocks.join("\n") || "";
}
function toAbsoluteImageUrl(imgPath, imageBaseUrl) {
if (!imgPath) return null;
if (/^https?:\/\//i.test(imgPath)) return imgPath;
if (!imageBaseUrl) return null;
return `${String(imageBaseUrl).replace(/\/+$/, "")}/${String(imgPath).replace(/^\/+/, "")}`;
}
function getUploadedImageUrl(imgPath, uploadedImageMap) {
if (!uploadedImageMap || typeof uploadedImageMap !== "object") return null;
const bySourcePath = uploadedImageMap.bySourcePath && typeof uploadedImageMap.bySourcePath === "object"
? uploadedImageMap.bySourcePath
: {};
const entry = bySourcePath[String(imgPath)];
return entry?.url || null;
}
function convertKytRecordToShopifyReady(record, options = {}) {
const brand = options.brand || "KYT";
const defaultInventoryName = options.inventoryName || "main";
const summary = record?.productSummary || {};
const data = record?.productDetails?.data || {};
const details = data?.details || {};
const productId = record?.productId || data?.id || summary?.id || cryptoRandomFallback();
const productName = data?.name || summary?.name || "Unnamed Product";
const categoryName = data?.category_name || "Helmet";
const subCategoryName = data?.sub_category_name || record?.subCategory || "";
const modelSeriesName = data?.model_series_name || record?.modelSeries || "";
const mrp = Number(data?.cost?.mrp ?? summary?.cost?.mrp ?? 0);
const totalUnits = Number(data?.stocks?.total_units ?? 0);
const weightValue = Number(details?.weight?.value ?? 0);
const descriptionHtml = buildDescriptionHtml(details);
const imagePaths = Array.isArray(data?.img)
? data.img
: Array.isArray(summary?.img)
? summary.img
: [];
const imageFiles = imagePaths
.map((p) => ({
type: "Image",
url: getUploadedImageUrl(p, options.uploadedImageMap) || toAbsoluteImageUrl(p, options.imageBaseUrl),
media_content: path.basename(String(p || "")),
source_path: p,
}))
.filter((f) => Boolean(f.url));
const subcategoryCombined = [subCategoryName, modelSeriesName].filter(Boolean).join(", ");
const tags = [
brand,
categoryName,
subCategoryName,
modelSeriesName,
"Helmet",
details?.sizechart?.name || null,
].filter(Boolean);
return {
id: productId,
source: {
productId,
subCategory: record?.subCategory || null,
modelSeries: record?.modelSeries || null,
image_paths: imagePaths,
},
attributes: {
product_name: productName,
brand,
category: categoryName,
subcategory: subcategoryCombined,
part_number: data?.code || productId,
mfr_part_number: data?.code || productId,
price: mrp,
compare_price: null,
purchase_cost: null,
barcode: "",
price_group: null,
units_per_sku: null,
part_description: descriptionHtml,
descriptions: descriptionHtml
? [{ type: "Market Description", description: descriptionHtml }]
: [],
files: imageFiles,
image_paths: imagePaths,
dimensions: [{ weight: weightValue }],
total_quantity: totalUnits,
inventorydata: {
inventory: {
[defaultInventoryName]: totalUnits,
},
},
fitmentTags: {
make: [],
model: [],
year: [],
drive: [],
baseModel: [],
},
tags,
handle: slugify(`${brand}-${productName}-${productId}`),
},
};
}
function convertKytJsonToShopifyProducts(input, options = {}) {
const records = Array.isArray(input?.products) ? input.products : [];
return records.map((record) => convertKytRecordToShopifyReady(record, options));
}
function cryptoRandomFallback() {
return `kyt-${Math.random().toString(36).slice(2, 10)}`;
}
function parseCliArgs(argv) {
const out = {
input: "data/01_products_aggregated.json",
output: "data/05_shopify_products_ready.json",
imageBaseUrl: "",
brand: "KYT",
};
for (let i = 0; i < argv.length; i += 1) {
const a = argv[i];
if (a === "--input") out.input = argv[i + 1];
if (a === "--output") out.output = argv[i + 1];
if (a === "--image-base-url") out.imageBaseUrl = argv[i + 1];
if (a === "--brand") out.brand = argv[i + 1];
}
return out;
}
function runCli() {
const args = parseCliArgs(process.argv.slice(2));
const inputPath = path.resolve(args.input);
const outputPath = path.resolve(args.output);
const raw = fs.readFileSync(inputPath, "utf8");
const json = JSON.parse(raw);
const converted = convertKytJsonToShopifyProducts(json, {
imageBaseUrl: args.imageBaseUrl || "",
brand: args.brand || "KYT",
});
const payload = {
generatedAt: new Date().toISOString(),
sourceFile: inputPath,
totalProducts: converted.length,
products: converted,
};
fs.writeFileSync(outputPath, JSON.stringify(payload, null, 2));
console.log(`Converted ${converted.length} products -> ${outputPath}`);
}
if (require.main === module) {
runCli();
}
module.exports = {
convertKytRecordToShopifyReady,
convertKytJsonToShopifyProducts,
};

View File

@ -0,0 +1,595 @@
const crypto = require("crypto");
function slugify(str) {
return String(str || "")
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-+|-+$/g, "");
}
function extractFirstJsonObject(text) {
if (typeof text !== "string") return null;
let s = text.trim()
.replace(/^```json\s*/i, "")
.replace(/^```\s*/i, "")
.replace(/```$/i, "")
.trim();
const start = s.indexOf("{");
const end = s.lastIndexOf("}");
if (start === -1 || end === -1 || end <= start) return null;
s = s.slice(start, end + 1);
s = s.replace(/(\{|,)\s*(seo_title|seo_description)\s*:/g, '$1"$2":');
s = s.replace(/"seo_description\s*:\s*/g, '"seo_description":');
return s;
}
function createShopifyClient(shop, accessToken, apiVersion = "2025-10") {
return {
baseURL: `https://${shop}/admin/api/${apiVersion}/graphql.json`,
headers: {
"X-Shopify-Access-Token": accessToken,
"Content-Type": "application/json",
},
timeout: 30000,
};
}
function createSeoLlmClient(baseURL = "https://llm.thedomainnest.com") {
return {
baseURL,
headers: { "Content-Type": "application/json" },
timeout: 30000,
};
}
async function postJson(client, endpoint, body) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), client.timeout || 30000);
try {
const response = await fetch(`${client.baseURL}${endpoint}`, {
method: "POST",
headers: client.headers,
body: JSON.stringify(body),
signal: controller.signal,
});
const json = await response.json();
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${JSON.stringify(json)}`);
}
return json;
} finally {
clearTimeout(timer);
}
}
async function gql(client, query, variables = {}) {
const data = await postJson(client, "", { query, variables });
if (data.errors?.length) {
throw new Error(`GraphQL errors: ${data.errors.map((e) => e.message).join(", ")}`);
}
return data.data;
}
function normalizeFitmentTags(input) {
const map = {
make: new Set(),
model: new Set(),
year: new Set(),
drive: new Set(),
baseModel: new Set(),
};
const src = input || {};
for (const key of Object.keys(map)) {
const arr = Array.isArray(src[key]) ? src[key] : [];
arr.forEach((v) => map[key].add(String(v)));
}
const fitmentMap = {};
Object.keys(map).forEach((k) => {
fitmentMap[k] = Array.from(map[k]);
});
const fitmentFlat = Array.from(new Set(Object.values(fitmentMap).flat()));
return { fitmentMap, fitmentFlat };
}
async function generateSeo(attrs, seoClient) {
const seoInput = {
product_name: attrs.product_name,
descriptions: attrs.descriptions,
brand: attrs.brand,
category: attrs.category,
subcategory: attrs.subcategory,
part_number: attrs.part_number,
price: attrs.price,
};
const requestBody = {
message: `Use the following product JSON as the only source of truth. Create:\n1) seo_title (max 70 characters min 60 characters)\n2) seo_description (max 160 characters min 140 characters)\n\nProduct JSON:\n${JSON.stringify(seoInput)}`,
mode: "quality",
system_prompt: `You are an SEO metadata generator for automotive performance parts. Output ONLY valid JSON with schema: {"seo_title":"","seo_description":""}.`,
session_id: crypto.randomUUID(),
image_base64: null,
file_name: null,
file_base64: null,
};
try {
const res = await postJson(seoClient, "/chat-json", requestBody);
const extracted = extractFirstJsonObject(res?.reply || "");
if (!extracted) return { seo_title: "", seo_description: "" };
const parsed = JSON.parse(extracted);
return {
seo_title: parsed.seo_title || "",
seo_description: parsed.seo_description || "",
};
} catch {
return { seo_title: "", seo_description: "" };
}
}
async function getOrCreateManualCollections(client, titles) {
const ids = [];
for (const title of titles) {
const clean = String(title || "").trim();
if (!clean) continue;
const lookup = await gql(
client,
`query ($q: String!) { collections(first: 1, query: $q) { nodes { id } } }`,
{ q: `title:"${clean}" AND collection_type:manual` }
);
const existing = lookup.collections.nodes?.[0];
if (existing) {
ids.push(existing.id);
continue;
}
const created = await gql(
client,
`mutation($input: CollectionInput!) {
collectionCreate(input: $input) {
collection { id }
userErrors { field message }
}
}`,
{ input: { title: clean } }
);
const errs = created.collectionCreate.userErrors || [];
if (errs.length) {
throw new Error(`collectionCreate failed for "${clean}": ${errs.map((e) => e.message).join(", ")}`);
}
ids.push(created.collectionCreate.collection.id);
}
return ids;
}
async function getPricingConfig(client) {
const data = await gql(
client,
`query {
shop { metafield(namespace: "turn14", key: "pricing_config") { value } }
}`
);
let priceType = "map";
let percentage = 0;
const value = data.shop?.metafield?.value;
if (value) {
try {
const parsed = JSON.parse(value);
priceType = parsed.priceType || "map";
percentage = Number(parsed.percentage) || 0;
} catch {
priceType = "map";
percentage = 0;
}
}
return { priceType, percentage };
}
async function publishToOnlineStore(client, productId) {
const pubs = await gql(
client,
`query {
publications(first: 20) { edges { node { id name } } }
}`
);
const publication = pubs.publications.edges.find((e) => e.node.name === "Online Store");
if (!publication) throw new Error("Online Store publication not found.");
const res = await gql(
client,
`mutation($id: ID!, $publicationId: ID!) {
publishablePublish(id: $id, input: { publicationId: $publicationId }) {
userErrors { field message }
}
}`,
{ id: productId, publicationId: publication.node.id }
);
const errs = res.publishablePublish.userErrors || [];
if (errs.length) throw new Error(`publishablePublish failed: ${errs.map((e) => e.message).join(", ")}`);
}
async function upsertShopifyProductFull({
shop,
accessToken,
product,
locationId = null,
enableSeo = true,
apiVersion = "2025-10",
seoBaseURL = "https://llm.thedomainnest.com",
}) {
if (!shop) throw new Error("shop is required");
if (!accessToken) throw new Error("accessToken is required");
if (!product) throw new Error("product is required");
const client = createShopifyClient(shop, accessToken, apiVersion);
const seoClient = createSeoLlmClient(seoBaseURL);
const attrs = product.attributes || product;
const handle = slugify(attrs.handle || product.id || attrs.part_number || attrs.product_name);
const subcats = String(attrs.subcategory || "")
.split(/[,/]/)
.map((s) => s.trim())
.filter(Boolean);
const { fitmentFlat } = normalizeFitmentTags(attrs.fitmentTags || attrs.fitmmentTags);
const collectionTitles = Array.from(
new Set([attrs.category, ...subcats, attrs.brand, ...fitmentFlat].filter(Boolean))
);
const collectionIds = await getOrCreateManualCollections(client, collectionTitles);
const productTags = [
attrs.category,
...subcats,
...fitmentFlat,
attrs.brand,
attrs.part_number,
attrs.mfr_part_number,
attrs.price_group,
attrs.units_per_sku ? `${attrs.units_per_sku} per SKU` : null,
attrs.barcode,
...(Array.isArray(attrs.tags) ? attrs.tags : []),
]
.filter(Boolean)
.map((t) => String(t).trim());
const mediaInputs = (attrs.files || [])
.filter((f) => f.type === "Image" && f.url)
.map((f) => ({
originalSource: f.url,
mediaContentType: "IMAGE",
alt: `${attrs.product_name || attrs.part_number || "Product"} - ${f.media_content || "image"}`,
}));
const marketDescs = (attrs.descriptions || [])
.filter((d) => d.type === "Market Description")
.map((d) => d.description)
.filter(Boolean);
const descriptionHtml = marketDescs.length
? marketDescs.reduce((a, b) => (b.length > a.length ? b : a))
: attrs.part_description || "";
const existingSearch = await gql(
client,
`query($q: String!) {
products(first: 1, query: $q) {
nodes {
id
handle
variants(first: 1) { nodes { id inventoryItem { id } } }
}
}
}`,
{ q: `handle:${handle}` }
);
const existing = existingSearch.products.nodes?.[0];
let productId;
let variantId;
let inventoryItemId;
let action;
if (existing) {
const upd = await gql(
client,
`mutation($product: ProductUpdateInput!, $media: [CreateMediaInput!]) {
productUpdate(product: $product, media: $media) {
product { id }
userErrors { field message }
}
}`,
{
product: {
id: existing.id,
title: attrs.product_name || attrs.part_number || "Untitled",
descriptionHtml,
vendor: attrs.brand || "",
productType: attrs.category || "",
handle,
tags: productTags,
},
media: mediaInputs.length ? mediaInputs : null
}
);
const errs = upd.productUpdate.userErrors || [];
if (errs.length) throw new Error(`productUpdate failed: ${errs.map((e) => e.message).join(", ")}`);
productId = existing.id;
variantId = existing.variants?.nodes?.[0]?.id;
inventoryItemId = existing.variants?.nodes?.[0]?.inventoryItem?.id;
action = "updated";
} else {
const crt = await gql(
client,
`mutation($product: ProductCreateInput!, $media: [CreateMediaInput!]) {
productCreate(product: $product, media: $media) {
product {
id
variants(first: 1) { nodes { id inventoryItem { id } } }
}
userErrors { field message }
}
}`,
{
product: {
title: attrs.product_name || attrs.part_number || "Untitled",
descriptionHtml,
vendor: attrs.brand || "",
productType: attrs.category || "",
handle,
tags: productTags,
collectionsToJoin: collectionIds,
status: "ACTIVE",
},
media: mediaInputs,
}
);
const errs = crt.productCreate.userErrors || [];
if (errs.length) throw new Error(`productCreate failed: ${errs.map((e) => e.message).join(", ")}`);
productId = crt.productCreate.product.id;
variantId = crt.productCreate.product.variants?.nodes?.[0]?.id;
inventoryItemId = crt.productCreate.product.variants?.nodes?.[0]?.inventoryItem?.id;
action = "created";
}
if (!variantId) throw new Error("No variant ID found after upsert.");
const { priceType, percentage } = await getPricingConfig(client);
const basePrice = Number(attrs.price || 0);
let finalPrice = basePrice;
if (priceType === "percentage") {
finalPrice = basePrice + (basePrice * percentage) / 100;
}
const compareAtPrice = attrs.compare_price != null ? Number(attrs.compare_price) : null;
const weightValue = Number(attrs.dimensions?.[0]?.weight || 0);
const bulk = await gql(
client,
`mutation($productId: ID!, $variants: [ProductVariantsBulkInput!]!) {
productVariantsBulkUpdate(productId: $productId, variants: $variants) {
productVariants { id price barcode inventoryItem { id sku } }
userErrors { field message }
}
}`,
{
productId,
variants: [
{
id: variantId,
price: Number(finalPrice).toFixed(2),
...(compareAtPrice !== null ? { compareAtPrice: Number(compareAtPrice).toFixed(2) } : {}),
...(attrs.barcode ? { barcode: attrs.barcode } : {}),
inventoryItem: {
sku: attrs.part_number || "",
measurement: { weight: { value: weightValue, unit: "POUNDS" } },
},
},
],
}
);
if (bulk.productVariantsBulkUpdate.userErrors?.length) {
throw new Error(`variant update failed: ${bulk.productVariantsBulkUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
if (inventoryItemId) {
const inv = await gql(
client,
`mutation($id: ID!, $input: InventoryItemInput!) {
inventoryItemUpdate(id: $id, input: $input) {
inventoryItem { id sku tracked }
userErrors { field message }
}
}`,
{
id: inventoryItemId,
input: {
cost: Number(attrs.purchase_cost || 0),
tracked: true,
},
}
);
if (inv.inventoryItemUpdate.userErrors?.length) {
throw new Error(`inventoryItemUpdate failed: ${inv.inventoryItemUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
if (locationId) {
try {
const act = await gql(
client,
`mutation($inventoryItemId: ID!, $locationId: ID!) {
inventoryActivate(inventoryItemId: $inventoryItemId, locationId: $locationId) {
inventoryLevel { id }
userErrors { field message }
}
}`,
{ inventoryItemId, locationId }
);
if (act.inventoryActivate.userErrors?.length) {
throw new Error(`inventoryActivate failed: ${act.inventoryActivate.userErrors.map((e) => e.message).join(", ")}`);
}
const totalQty = Number(attrs.total_quantity ?? attrs.quantity ?? 0);
const setQty = await gql(
client,
`mutation($input: InventorySetQuantitiesInput!) {
inventorySetQuantities(input: $input) {
userErrors { field message }
}
}`,
{
input: {
name: "available",
reason: "correction",
ignoreCompareQuantity: true,
quantities: [
{
inventoryItemId,
locationId,
quantity: totalQty,
compareQuantity: 0,
},
],
},
}
);
if (setQty.inventorySetQuantities.userErrors?.length) {
throw new Error(`inventorySetQuantities failed: ${setQty.inventorySetQuantities.userErrors.map((e) => e.message).join(", ")}`);
}
} catch (inventoryError) {
console.log(
`[INVENTORY-WARN] Skipping inventory activation/set for product ${productId}. Reason: ${inventoryError.message}`
);
}
} else {
console.log(`[INVENTORY-WARN] No locationId provided. Skipping inventory activation/set for product ${productId}.`);
}
}
await publishToOnlineStore(client, productId);
if (enableSeo) {
const seo = await generateSeo(attrs, seoClient);
const seoUpd = await gql(
client,
`mutation($product: ProductUpdateInput!) {
productUpdate(product: $product) {
product { id seo { title description } }
userErrors { field message }
}
}`,
{
product: {
id: productId,
seo: {
title: seo.seo_title || `${attrs.product_name || "Product"} | Performance Auto Parts`,
description:
seo.seo_description ||
`Find high-quality ${attrs.product_name || "automotive parts"} built for reliability and performance.`,
},
},
}
);
if (seoUpd.productUpdate.userErrors?.length) {
throw new Error(`SEO productUpdate failed: ${seoUpd.productUpdate.userErrors.map((e) => e.message).join(", ")}`);
}
}
return {
ok: true,
action,
productId,
variantId,
inventoryItemId,
handle,
};
}
async function runStandaloneSelfTest() {
const shop = process.env.SHOPIFY_SHOP;
const accessToken = process.env.SHOPIFY_ACCESS_TOKEN;
const locationId = process.env.SHOPIFY_LOCATION_ID || null;
if (!shop || !accessToken) {
throw new Error("Missing env. Set SHOPIFY_SHOP and SHOPIFY_ACCESS_TOKEN (optional SHOPIFY_LOCATION_ID)");
}
const dummyProduct = {
id: "kyt-demo-001",
attributes: {
product_name: "KYT Demo Helmet Race Edition",
brand: "KYT",
category: "Helmet",
subcategory: "Racing, Demo Series",
part_number: "KYT-DEMO-001",
mfr_part_number: "KYT-DEMO-001",
price: 57000,
compare_price: 59000,
purchase_cost: 42000,
barcode: "",
price_group: "premium",
units_per_sku: 1,
part_description: "<p>Demo KYT helmet description for Shopify upsert testing.</p>",
descriptions: [
{ type: "Market Description", description: "<p>Demo KYT helmet for end-to-end Shopify test.</p>" },
],
files: [],
dimensions: [{ weight: 1450 }],
total_quantity: 5,
inventorydata: { inventory: { main: 5 } },
fitmentTags: { make: [], model: [], year: [], drive: [], baseModel: [] },
tags: ["KYT", "Helmet", "Racing", "Demo"],
handle: slugify(`kyt-demo-001-${Date.now()}`),
},
};
const result = await upsertShopifyProductFull({
shop,
accessToken,
product: dummyProduct,
locationId,
enableSeo: true,
});
console.log("Self-test success:", result);
}
if (require.main === module) {
runStandaloneSelfTest().catch((err) => {
console.error("Self-test failed:", err.message);
process.exit(1);
});
}
module.exports = {
upsertShopifyProductFull,
runStandaloneSelfTest,
};

64
src/pipelineJobs.js Normal file
View File

@ -0,0 +1,64 @@
const jobs = {};
function listJobs() {
return Object.values(jobs)
.sort((a, b) => String(b.startedAt || "").localeCompare(String(a.startedAt || "")));
}
function getJob(jobId) {
return jobs[jobId] || null;
}
function canStartJob(shop) {
if (!shop) {
return false;
}
const existing = jobs[shop];
return !existing || existing.status === "done" || existing.status === "error";
}
function createJob(payload = {}) {
const id = String(payload.shop || "").trim();
if (!id) {
throw new Error("Shop is required to create a job.");
}
jobs[id] = {
id,
status: "queued",
step: "queued",
stepIndex: 0,
totalSteps: 6,
detail: null,
summary: null,
error: null,
payload,
startedAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
return jobs[id];
}
function updateJob(jobId, patch) {
const current = jobs[jobId];
if (!current) {
return null;
}
jobs[jobId] = {
...current,
...patch,
updatedAt: new Date().toISOString(),
};
return jobs[jobId];
}
module.exports = {
listJobs,
getJob,
canStartJob,
createJob,
updateJob,
};

84
src/runKytPipelineJob.js Normal file
View File

@ -0,0 +1,84 @@
const path = require("node:path");
const { getToken } = require("../tokenStore");
const { log } = require("../logger");
const { runFullKytPipeline } = require("./business-logic/kyt-pipeline/00_index");
const { updateJob } = require("./pipelineJobs");
async function runKytPipelineJob(job) {
const { shop, limit } = job.payload || {};
const tokenRecord = getToken(shop);
if (!tokenRecord) {
updateJob(job.id, {
status: "error",
step: "auth",
error: `No stored Shopify token for shop ${shop}`,
});
return;
}
const previousEnv = {
SHOPIFY_SHOP: process.env.SHOPIFY_SHOP,
SHOPIFY_ACCESS_TOKEN: process.env.SHOPIFY_ACCESS_TOKEN,
SHOPIFY_LOCATION_ID: process.env.SHOPIFY_LOCATION_ID,
WATERMARK_PATH: process.env.WATERMARK_PATH,
};
process.env.SHOPIFY_SHOP = shop;
process.env.SHOPIFY_ACCESS_TOKEN = tokenRecord.accessToken;
process.env.SHOPIFY_LOCATION_ID = tokenRecord.locationId || "";
process.env.WATERMARK_PATH = process.env.WATERMARK_PATH || "data/watermark.png";
const originalArgv = process.argv.slice();
process.argv = [originalArgv[0], originalArgv[1]];
if (limit) {
process.argv.push("--limit", String(limit));
}
try {
updateJob(job.id, {
status: "running",
step: "starting",
detail: `Starting KYT pipeline for ${shop}`,
});
const summary = await runFullKytPipeline({
onProgress(progress) {
updateJob(job.id, {
status: "running",
step: progress.stepKey,
stepIndex: progress.stepIndex,
totalSteps: progress.totalSteps,
detail: progress.message,
});
},
});
updateJob(job.id, {
status: "done",
step: "completed",
stepIndex: 6,
totalSteps: 6,
detail: "Pipeline completed successfully",
summary,
});
log(shop, `KYT pipeline completed for job ${job.id}`);
} catch (error) {
updateJob(job.id, {
status: "error",
error: error.message,
detail: `Pipeline failed: ${error.message}`,
});
log(shop, `KYT pipeline failed for job ${job.id}: ${error.message}`);
} finally {
process.argv = originalArgv;
process.env.SHOPIFY_SHOP = previousEnv.SHOPIFY_SHOP;
process.env.SHOPIFY_ACCESS_TOKEN = previousEnv.SHOPIFY_ACCESS_TOKEN;
process.env.SHOPIFY_LOCATION_ID = previousEnv.SHOPIFY_LOCATION_ID;
process.env.WATERMARK_PATH = previousEnv.WATERMARK_PATH;
}
}
module.exports = {
runKytPipelineJob,
};

63
tokenStore.js Normal file
View File

@ -0,0 +1,63 @@
const fs = require("node:fs");
const path = require("node:path");
const dataFile = path.resolve(__dirname, "data", "tokens.json");
const dataDir = path.dirname(dataFile);
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}
if (!fs.existsSync(dataFile)) {
fs.writeFileSync(dataFile, "{}", "utf8");
}
function readStore() {
return JSON.parse(fs.readFileSync(dataFile, "utf8"));
}
function saveStore(store) {
fs.writeFileSync(dataFile, JSON.stringify(store, null, 2), "utf8");
}
function saveToken(shop, accessToken, scope, fulfillmentService = null, locationId = null) {
if (!shop || accessToken == null || scope == null) {
return;
}
const store = readStore();
store[shop] = {
accessToken,
scope,
savedAt: new Date().toISOString(),
locationId: locationId || store[shop]?.locationId || null,
fulfillmentService: fulfillmentService || store[shop]?.fulfillmentService || null,
};
saveStore(store);
}
function getToken(shop) {
const store = readStore();
return store[shop] || null;
}
function deleteToken(shop) {
if (!shop) {
return;
}
const store = readStore();
delete store[shop];
saveStore(store);
}
function listTokens() {
return readStore();
}
module.exports = {
saveToken,
getToken,
deleteToken,
listTokens,
};