MOHAN 6582ec5641 feat: add live import progress tracking with job store
- Add jobStore.js: in-memory job store with rich job objects (liveStats,
  logs, errors, cancellation, timing, success rate)
- Rewrite manageProducts.js: structured logging ([STATS], [PRODUCT-OK],
  [PRODUCT-FAIL], [SKIP], [FETCH], [CANCEL], etc.), per-product cancel
  checks, jobStore integration
- server.js: expose GET /health, GET /jobs, GET /jobs/:id,
  POST /jobs/:id/cancel, GET /shops endpoints
- tokenStore.js: add listTokens() export

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 02:22:58 +05:30

524 lines
20 KiB
JavaScript
Executable File

// routes/manageProducts.js
const express = require('express');
const axios = require('axios');
const { v4: uuid } = require('uuid');
const { getToken } = require('../tokenStore');
const { log } = require('../logger');
const crypto = require('crypto');
const router = express.Router();
const {
createJob,
updateJob,
appendJobLog,
recordProductResult,
recordProductError,
finishJob,
cancelJob,
isJobCancelled,
getJob,
listJobs,
} = require('../jobStore');
const seo_llm_client = axios.create({
baseURL: 'https://llm.thedomainnest.com',
headers: { 'Content-Type': 'application/json' },
timeout: 0,
});
function slugify(str) {
return str
.toString()
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '');
}
function extractFirstJsonObject(text) {
if (typeof text !== 'string') return text;
let s = text.trim()
.replace(/^```json\s*/i, '')
.replace(/^```\s*/i, '')
.replace(/```$/i, '')
.trim();
const start = s.indexOf('{');
const end = s.lastIndexOf('}');
if (start === -1 || end === -1 || end <= start) return null;
s = s.slice(start, end + 1);
s = s.replace(/(\{|,)\s*(seo_title|seo_description)\s*:/g, '$1"$2":');
s = s.replace(/"seo_description\s*:\s*/g, '"seo_description":');
return s;
}
// ---------------------------------------------------------------------------
// Fetch all products for a brand from Turn14
// ---------------------------------------------------------------------------
const GetAllProductsOfBranch = async (brandId, turn14accessToken, shop, jobId) => {
try {
appendJobLog(jobId, `[FETCH] Fetching products for brand ${brandId} from Turn14...`);
const res = await fetch(
`https://turn14.data4autos.com/v1/items/brandallitemswithfitment/${brandId}`,
{
headers: {
Authorization: `Bearer ${turn14accessToken}`,
'Content-Type': 'application/json',
},
}
);
const res_data = await res.json();
const data = res_data.items || [];
const fitmentTags = res_data.fitmentTags || [];
const validItems = Array.isArray(data)
? data.filter(item => item && item.id && item.attributes)
: [];
appendJobLog(jobId, `[FETCH-OK] Found ${validItems.length} products for brand ${brandId}`);
return { items: validItems, fitmentTags };
} catch (err) {
appendJobLog(jobId, `[FETCH-FAIL] Error fetching brand ${brandId}: ${err.message}`);
return null;
}
};
// ---------------------------------------------------------------------------
// Add one product to Shopify store
// ---------------------------------------------------------------------------
const AddProductToStore = async (shop, accessToken, product, jobId, locationId) => {
const SHOP = shop;
const ACCESS_TOKEN = accessToken;
const item = product;
const attrs = item.attributes;
// SEO (stubbed — returns empty so we use defaults)
let parsed = { seo_title: '', seo_description: '' };
const { seo_title, seo_description } = parsed;
// Fitment tags
const globalUniqueFitmentMap = { make: new Set(), model: new Set(), year: new Set(), drive: new Set(), baseModel: new Set() };
const tags_raw = attrs?.fitmmentTags || {};
for (const key in globalUniqueFitmentMap) {
if (tags_raw[key]) tags_raw[key].forEach(v => globalUniqueFitmentMap[key].add(v));
}
const convertedFitment = {};
for (const key in globalUniqueFitmentMap) convertedFitment[key] = Array.from(globalUniqueFitmentMap[key]);
const allFitmentTags = Array.from(new Set(Object.values(convertedFitment).flat()));
const client = axios.create({
baseURL: `https://${SHOP}/admin/api/2025-10/graphql.json`,
headers: { 'X-Shopify-Access-Token': ACCESS_TOKEN, 'Content-Type': 'application/json' },
});
try {
const inventoryData = attrs.inventorydata?.inventory || {};
const totalQuantity = Object.values(inventoryData).reduce((sum, val) => sum + val, 0);
// Collections
const category = attrs.category || '';
const subcategory = attrs.subcategory || '';
const brand = attrs.brand || '';
const subcats = subcategory.split(/[,\/]/).map(s => s.trim()).filter(Boolean);
const collectionTitles = Array.from(new Set([category, ...subcats, brand, ...allFitmentTags].filter(Boolean)));
const collectionIds = [];
for (const title of collectionTitles) {
const lookupResp = await client.post('', {
query: `query { collections(first: 1, query: "title:\\"${title}\\" AND collection_type:manual") { nodes { id } } }`
});
const existing = lookupResp.data.data.collections.nodes;
if (existing.length) {
collectionIds.push(existing[0].id);
continue;
}
const createResp = await client.post('', {
query: `mutation collectionCreate($input: CollectionInput!) { collectionCreate(input: $input) { collection { id } userErrors { field message } } }`,
variables: { input: { title } },
});
const createData = createResp.data.data.collectionCreate;
if (createData.userErrors.length) {
throw new Error(`Could not create collection "${title}": ${createData.userErrors.map(e => e.message).join(', ')}`);
}
collectionIds.push(createData.collection.id);
}
// Tags
const productTags = [
attrs.category,
...subcats,
...allFitmentTags,
attrs.brand,
attrs.part_number,
attrs.mfr_part_number,
attrs.price_group,
attrs.units_per_sku && `${attrs.units_per_sku} per SKU`,
attrs.barcode,
].filter(Boolean).map(t => t.trim());
// Media
const mediaInputs = (attrs.files || [])
.filter(f => f.type === 'Image' && f.url)
.map(file => ({
originalSource: file.url,
mediaContentType: 'IMAGE',
alt: `${attrs.product_name}${file.media_content}`,
}));
// Description
const marketDescs = (attrs.descriptions || [])
.filter(d => d.type === 'Market Description')
.map(d => d.description);
const descriptionHtml = marketDescs.length
? marketDescs.reduce((a, b) => (b.length > a.length ? b : a))
: attrs.part_description;
const handle = slugify(item.id);
// Dedup check
const searchRes = await client.post('', {
query: `query { products(first: 1, query: "handle:${handle}") { nodes { id handle } } }`
});
const exists = searchRes.data?.data?.products?.nodes?.length > 0;
if (exists) {
appendJobLog(jobId, `[SKIP] ${attrs.part_number} — handle "${handle}" already exists`);
return { action: 'skipped', handle, product: attrs.product_name || attrs.part_number };
}
// Create product
const createProdRes = await client.post('', {
query: `
mutation ProductCreate($product: ProductCreateInput!, $media: [CreateMediaInput!]) {
productCreate(product: $product, media: $media) {
product {
id
variants(first: 1) { nodes { id inventoryItem { id } price compareAtPrice barcode } }
}
userErrors { field message }
}
}
`,
variables: {
product: {
title: attrs.product_name,
descriptionHtml: descriptionHtml,
vendor: attrs.brand,
productType: attrs.category,
handle,
tags: productTags,
collectionsToJoin: collectionIds,
status: 'ACTIVE',
},
media: mediaInputs,
},
});
const prodErrs = createProdRes.data.data?.productCreate?.userErrors || [];
if (prodErrs.length) {
const taken = prodErrs.some(e => /already in use/i.test(e.message));
if (taken) {
appendJobLog(jobId, `[SKIP] ${attrs.part_number} — duplicate handle`);
return { action: 'skipped', handle, product: attrs.product_name || attrs.part_number };
}
throw new Error(`ProductCreate errors: ${prodErrs.map(e => e.message).join(', ')}`);
}
const createdProduct = createProdRes.data.data.productCreate.product;
const variantNode = createdProduct.variants?.nodes?.[0];
if (!variantNode) return null;
const variantId = variantNode.id;
const inventoryItemId = variantNode.inventoryItem?.id;
// Pricing
const pricingConfigRes = await client.post('', {
query: `query { shop { metafield(namespace: "turn14", key: "pricing_config") { value } } }`
});
let priceType = 'map';
let percentage = 0;
const pricingMf = pricingConfigRes.data?.data?.shop?.metafield;
if (pricingMf?.value) {
try {
const p = JSON.parse(pricingMf.value);
priceType = p.priceType || 'map';
percentage = Number(p.percentage) || 0;
} catch {}
}
const baseprice = parseFloat(attrs.price) || 0;
let price = baseprice;
if (priceType === 'percentage') price = baseprice + (baseprice * (percentage / 100));
const comparePrice = parseFloat(attrs.compare_price) || null;
const barcode = attrs.barcode || '';
const weightValue = parseFloat(attrs.dimensions?.[0]?.weight) || 0;
// Variant update
const bulkRes = await client.post('', {
query: `
mutation UpdateProductVariant($productId: ID!, $variants: [ProductVariantsBulkInput!]!) {
productVariantsBulkUpdate(productId: $productId, variants: $variants) {
productVariants { id price compareAtPrice barcode inventoryItem { sku measurement { weight { value unit } } tracked } }
userErrors { field message }
}
}
`,
variables: {
productId: createdProduct.id,
variants: [{
id: variantId,
price,
...(comparePrice !== null && { compareAtPrice: comparePrice }),
...(barcode && { barcode }),
inventoryItem: { sku: attrs.part_number, measurement: { weight: { value: weightValue, unit: 'POUNDS' } } },
}],
},
});
const bulkErrs = bulkRes.data.data.productVariantsBulkUpdate.userErrors;
if (bulkErrs.length) throw new Error(`Bulk update errors: ${bulkErrs.map(e => e.message).join(', ')}`);
// Publish
const publicationsRes = await client.post('', {
query: `query { publications(first: 10) { edges { node { id name } } } }`
});
const onlineStorePub = publicationsRes.data.data.publications.edges.find(p => p.node.name === 'Online Store');
if (onlineStorePub) {
const publishRes = await client.post('', {
query: `mutation($id: ID!, $publicationId: ID!) { publishablePublish(id: $id, input: { publicationId: $publicationId }) { publishable { ... on Product { id } } userErrors { field message } } }`,
variables: { id: createdProduct.id, publicationId: onlineStorePub.node.id },
});
const publishErrs = publishRes.data.data.publishablePublish.userErrors;
if (publishErrs.length) throw new Error(`Publish errors: ${publishErrs.map(e => e.message).join(', ')}`);
}
// Inventory
const invRes = await client.post('', {
query: `
mutation InventoryItemUpdate($id: ID!, $input: InventoryItemInput!) {
inventoryItemUpdate(id: $id, input: $input) {
inventoryItem { id sku unitCost { amount currencyCode } tracked }
userErrors { field message }
}
}
`,
variables: { id: inventoryItemId, input: { cost: parseFloat(attrs.purchase_cost) || 0, tracked: true } },
});
const invErrs = invRes.data.data.inventoryItemUpdate.userErrors;
if (invErrs.length) throw new Error(`Inventory update errors: ${invErrs.map(e => e.message).join(', ')}`);
if (locationId) {
await client.post('', {
query: `mutation ActivateInventoryItem($inventoryItemId: ID!, $locationId: ID!) { inventoryActivate(inventoryItemId: $inventoryItemId, locationId: $locationId) { inventoryLevel { id } userErrors { field message } } }`,
variables: { inventoryItemId, locationId },
});
await client.post('', {
query: `
mutation InventorySet($input: InventorySetQuantitiesInput!) {
inventorySetQuantities(input: $input) {
inventoryAdjustmentGroup { createdAt reason }
userErrors { field message }
}
}
`,
variables: {
input: {
name: 'available',
reason: 'correction',
referenceDocumentUri: 'logistics://turn14.data4autos.com/inventory',
ignoreCompareQuantity: true,
quantities: [{ inventoryItemId, locationId, quantity: totalQuantity, compareQuantity: 1 }],
},
},
});
}
// SEO update
await client.post('', {
query: `
mutation ProductUpdate($product: ProductUpdateInput!) {
productUpdate(product: $product) {
product { id seo { title description } }
userErrors { field message }
}
}
`,
variables: {
product: {
id: createdProduct.id,
seo: {
title: seo_title || `${attrs.product_name} | Auto Parts`,
description: seo_description || `Find high-quality ${attrs.product_name} built for reliability and performance.`,
},
},
},
});
appendJobLog(jobId, `[PRODUCT-OK] Created: ${attrs.product_name} (${attrs.part_number})`);
return { action: 'created', productId: createdProduct.id, handle, product: attrs.product_name };
} catch (err) {
appendJobLog(jobId, `[PRODUCT-FAIL] ${attrs.product_name || attrs.part_number}: ${err.message}`);
return { action: 'failed', product: attrs.product_name || attrs.part_number, error: err.message };
}
};
// ---------------------------------------------------------------------------
// POST /manageproducts — start import job
// ---------------------------------------------------------------------------
router.post('/', async (req, res) => {
const { shop, brandID, brandName, turn14accessToken, productCount, selectedProductIds } = req.body;
if (!shop) return res.status(400).json({ error: 'Missing shop' });
if (!turn14accessToken) return res.status(400).json({ error: 'Missing turn14accessToken' });
if (!brandID) return res.status(400).json({ error: 'Missing brandID' });
if (!Array.isArray(selectedProductIds) || selectedProductIds.length === 0) {
return res.status(400).json({ error: 'selectedProductIds must be a non-empty array' });
}
const job = createJob({
shop,
brandId: brandID,
brandName: brandName || `Brand ${brandID}`,
totalSelected: selectedProductIds.length,
});
log(shop, `[JOB] Created job ${job.id} for brand ${brandID}${selectedProductIds.length} products selected`);
res.json({ processId: job.id, jobId: job.id, status: 'started' });
// Run async — do not await
(async () => {
try {
updateJob(job.id, { status: 'fetching_products', step: 'fetching_products', detail: `Fetching products for brand ${brandName || brandID} from Turn14...` });
const tokenRecord = getToken(shop);
if (!tokenRecord) throw new Error('No token stored for shop — re-authenticate');
const locationId = tokenRecord.locationId || null;
const accessToken = tokenRecord.accessToken;
// Step 1: Fetch products from Turn14
const products_res = await GetAllProductsOfBranch(brandID, turn14accessToken, shop, job.id);
if (!products_res) throw new Error(`Failed to fetch products from Turn14 for brand ${brandID}`);
const allItems = products_res.items;
// Step 2: Filter to selected IDs
const products = allItems.filter(item => selectedProductIds.includes(item.id));
const total = products.length;
updateJob(job.id, {
status: 'importing',
step: 'importing',
detail: `Starting import of ${total} products...`,
liveStats: { total, processed: 0, created: 0, skipped: 0, failed: 0, remaining: total, successRate: 0, label: `0/${total}` },
});
appendJobLog(job.id, `[IMPORT-START] Importing ${total} products for brand ${brandName || brandID}`);
let created = 0, skipped = 0, failed = 0;
for (let i = 0; i < products.length; i++) {
if (isJobCancelled(job.id)) {
appendJobLog(job.id, '[CANCEL] Import cancelled by user');
finishJob(job.id, 'cancelled');
return;
}
const item = products[i];
const attrs = item.attributes;
const productLabel = attrs.product_name || attrs.part_number || `Item ${item.id}`;
const partNum = attrs.part_number || '';
updateJob(job.id, {
currentProduct: { name: productLabel, partNumber: partNum, number: i + 1, total },
detail: `Importing product ${i + 1}/${total}: ${productLabel}`,
});
appendJobLog(job.id, `[PRODUCT] (${i + 1}/${total}) ${productLabel}`);
const result = await AddProductToStore(shop, accessToken, item, job.id, locationId);
if (result?.action === 'created') created++;
else if (result?.action === 'skipped') skipped++;
else if (result?.action === 'failed') {
failed++;
recordProductError(job.id, { index: i + 1, product: productLabel, error: result?.error || 'unknown error' });
}
if (result) recordProductResult(job.id, result);
const processed = i + 1;
const successRate = processed > 0 ? Number((((processed - failed) / processed) * 100).toFixed(1)) : 0;
updateJob(job.id, {
liveStats: { total, processed, created, skipped, failed, remaining: total - processed, successRate, label: `${processed}/${total}` },
});
// Emit structured line every 5 products for dashboard
if (processed % 5 === 0 || processed === total) {
appendJobLog(job.id, `[STATS] total=${total} processed=${processed} created=${created} skipped=${skipped} failed=${failed} rate=${successRate}`);
}
}
updateJob(job.id, { currentProduct: null });
appendJobLog(job.id, `[IMPORT-DONE] Finished: ${created} created, ${skipped} skipped, ${failed} failed`);
finishJob(job.id, 'done');
log(shop, `[JOB] ${job.id} completed — created=${created} skipped=${skipped} failed=${failed}`);
} catch (err) {
appendJobLog(job.id, `[ERROR] ${err.message}`);
updateJob(job.id, { status: 'error', step: 'error', detail: err.message, currentProduct: null });
finishJob(job.id, 'error');
log(shop, `[JOB] ${job.id} error — ${err.message}`);
}
})();
});
// ---------------------------------------------------------------------------
// GET /manageproducts/status/:processId — poll job status (legacy + new)
// ---------------------------------------------------------------------------
router.get('/status/:processId', (req, res) => {
const job = getJob(req.params.processId);
if (!job) return res.status(404).json({ error: 'Job not found' });
const s = job.liveStats;
// Legacy-compatible shape + full job for new dashboard
res.json({
// Legacy fields (managebrand.jsx polling)
status: job.status,
detail: job.detail,
progress: s.total > 0 ? Math.round((s.processed / s.total) * 100) : 0,
current: job.currentProduct,
stats: { total: s.total, processed: s.processed, remaining: s.remaining },
results: job.results,
// Full job object for dashboard
job,
});
});
// ---------------------------------------------------------------------------
// GET /manageproducts/jobs — list all jobs (optionally ?shop=...)
// ---------------------------------------------------------------------------
router.get('/jobs', (req, res) => {
const shop = req.query.shop || null;
res.json({ jobs: listJobs(shop) });
});
// ---------------------------------------------------------------------------
// GET /manageproducts/jobs/:jobId — get single job
// ---------------------------------------------------------------------------
router.get('/jobs/:jobId', (req, res) => {
const job = getJob(req.params.jobId);
if (!job) return res.status(404).json({ error: 'Job not found' });
res.json(job);
});
// ---------------------------------------------------------------------------
// POST /manageproducts/jobs/:jobId/cancel
// ---------------------------------------------------------------------------
router.post('/jobs/:jobId/cancel', (req, res) => {
const job = cancelJob(req.params.jobId);
if (!job) return res.status(404).json({ error: 'Job not found' });
res.json({ ok: true, job });
});
module.exports = router;