From c2b5168a047a0ce665d5fd5c2a18262c50183aa2 Mon Sep 17 00:00:00 2001 From: MOHAN Date: Thu, 18 Jun 2026 00:53:55 +0530 Subject: [PATCH] fix: resolve inventory sync crash loop and parallel execution issues - Remove process.exit(1) that caused PM2 to restart in an infinite loop - Fix && false bug that forced Turn14 token refresh on every run - Remove global SHOP/ACCESS_TOKEN to eliminate race conditions - Make shop loop sequential (for...of + await) to prevent Turn14 429s - Add early return when Turn14 credentials are missing for a shop - Guard non-JSON Turn14 responses (429 plain text) before calling .json() - Ensure logs/ and exports/ dirs exist before writing - Pass shop/accessToken as params to all helper functions Co-Authored-By: Claude Sonnet 4.6 --- JOBS/InventorySync.js | 309 +++++++++++++++------------------------ JOBS/InventorySyncJob.js | 73 +++++---- 2 files changed, 161 insertions(+), 221 deletions(-) diff --git a/JOBS/InventorySync.js b/JOBS/InventorySync.js index 4ce2bf9..6302cef 100755 --- a/JOBS/InventorySync.js +++ b/JOBS/InventorySync.js @@ -3,7 +3,6 @@ import https from 'https'; import fs from 'fs'; import readline from 'readline'; import path from 'path'; -import FormData from 'form-data'; import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); @@ -11,19 +10,19 @@ const __dirname = path.dirname(__filename); // ─── CONFIG ──────────────────────────────────────────────────────── -//const API_VERSION = '2025-10'; -const API_VERSION_25_10 = '2025-10'; const API_VERSION = '2025-10'; const exportDir = path.join(__dirname, '..', 'exports'); +const logDir = path.join(__dirname, '..', 'logs'); +const logFilePath = path.join(logDir, 'BulkInventorySyncJob.log'); const JSONL_FILENAME = 'inventory_data.jsonl'; - -let SHOP, ACCESS_TOKEN, TURN14_ACCESS_TOKEN; - - - +const BATCH_SIZE = 1000; // ─── HELPERS ─────────────────────────────────────────────────────── +function ensureDir(dir) { + if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); +} + function chunkArray(array, size) { const result = []; for (let i = 0; i < array.length; i += size) { @@ -33,104 +32,80 @@ function chunkArray(array, size) { } function timestamp() { - const now = new Date(); - return now.toISOString().replace(/[-:]/g, '').replace('T', '_').split('.')[0]; + return new Date().toISOString().replace(/[-:]/g, '').replace('T', '_').split('.')[0]; } -// function logStep(step, message) { -// console.log(`[${step}] ${message}`); -// } - - -const logFilePath = path.join(__dirname, '..', "logs", 'BulkInventorySyncJob.log'); - function logStep(step, message) { + ensureDir(logDir); const logMessage = `[${new Date().toISOString()}] [${step}] ${message}`; - - // Log to console console.log(logMessage); - - // Append to file fs.appendFileSync(logFilePath, logMessage + '\n', 'utf8'); } -// ─── SHOPIFY GRAPHQL HELPERS ────────────────────────────────────── +// ─── TURN14 TOKEN ────────────────────────────────────────────────── - -async function getTurn14AccessTokenFromMetafield() { - // Step 1: Get credentials from metafield - +async function getTurn14AccessToken(shop, accessToken) { const client = axios.create({ - baseURL: `https://${SHOP}/admin/api/2025-10/graphql.json`, + baseURL: `https://${shop}/admin/api/${API_VERSION}/graphql.json`, headers: { - 'X-Shopify-Access-Token': ACCESS_TOKEN, + 'X-Shopify-Access-Token': accessToken, 'Content-Type': 'application/json', }, }); - - - const query = ` - { - shop { - id - metafield(namespace: "turn14", key: "credentials") { - value - } + const query = `{ + shop { + id + metafield(namespace: "turn14", key: "credentials") { + value } } - `; + }`; const gqlRes = await client.post('', { query }); - - const result = gqlRes.data; - const shopId = result?.data?.shop?.id; - const raw = result?.data?.shop?.metafield?.value; + const shopId = gqlRes.data?.data?.shop?.id; + const raw = gqlRes.data?.data?.shop?.metafield?.value; if (!raw) { - throw new Error("❌ No Turn14 credentials found in Shopify metafield."); + throw new Error('No Turn14 credentials found in Shopify metafield.'); } let creds; try { creds = JSON.parse(raw); - } catch (err) { - console.error("❌ Failed to parse Turn14 metafield JSON:", err); - throw new Error("Malformed Turn14 credential metafield."); + } catch { + throw new Error('Malformed Turn14 credential metafield.'); } - console.log("Turn14 Credentials fetched from metafield:", creds); + logStep('1', `Turn14 credentials loaded from metafield (clientId: ${creds.clientId?.slice(0, 8)}...)`); const now = new Date(); - const expiresAt = new Date(creds.expiresAt); - const isExpired = now > expiresAt; + const expiresAt = creds.expiresAt ? new Date(creds.expiresAt) : new Date(0); + const isExpired = now >= expiresAt; - if (!isExpired && creds.accessToken && false) { - console.log("Turn14 token is still valid."); + if (!isExpired && creds.accessToken) { + logStep('1', 'Turn14 token is still valid, using cached token.'); return creds.accessToken; } - console.log("Turn14 token has expired or is missing. Refreshing..."); - // ⏰ Expired — refresh token from Turn14 API - const response = await axios.post("https://turn14.data4autos.com/v1/auth/token", { - grant_type: "client_credentials", - client_id: creds.clientId, - client_secret: creds.clientSecret, - }, { - headers: { "Content-Type": "application/json" }, - }); + logStep('1', 'Turn14 token expired or missing. Refreshing...'); - console.log("Turn14 token refresh response status:", response.status); - const data = response.data; + const response = await axios.post( + 'https://turn14.data4autos.com/v1/auth/token', + { + grant_type: 'client_credentials', + client_id: creds.clientId, + client_secret: creds.clientSecret, + }, + { headers: { 'Content-Type': 'application/json' } } + ); - if (response.status !== 200) { - console.error("❌ Failed to refresh Turn14 token:", data); - throw new Error(data.error || "Failed to refresh Turn14 token"); + const newToken = response.data.access_token; + if (!newToken) { + throw new Error('Turn14 token refresh returned no access_token.'); } - const newToken = data.access_token; const newExpiresAt = new Date(Date.now() + 3600 * 1000).toISOString(); - const newValue = JSON.stringify({ clientId: creds.clientId, clientSecret: creds.clientSecret, @@ -138,95 +113,47 @@ async function getTurn14AccessTokenFromMetafield() { expiresAt: newExpiresAt, }).replace(/"/g, '\\"'); - // Step 3: Update metafield in Shopify const mutation = ` mutation { - metafieldsSet(metafields: [ - { - ownerId: "${shopId}" - namespace: "turn14" - key: "credentials" - type: "json" - value: "${newValue}" - } - ]) { - userErrors { - field - message - } + metafieldsSet(metafields: [{ + ownerId: "${shopId}" + namespace: "turn14" + key: "credentials" + type: "json" + value: "${newValue}" + }]) { + userErrors { field message } } } `; const updateRes = await client.post('', { query: mutation }); const userErrors = updateRes.data?.data?.metafieldsSet?.userErrors; - if (userErrors && userErrors.length > 0) { + if (userErrors?.length) { throw new Error(`Failed to update metafield: ${JSON.stringify(userErrors)}`); } - console.log("✅ Turn14 token refreshed and metafield updated."); + logStep('1', '✅ Turn14 token refreshed and metafield updated.'); return newToken; } -async function getLocationID() { +// ─── SHOPIFY HELPERS ─────────────────────────────────────────────── + +async function getLocationID(shop, accessToken) { logStep('3', 'Fetching Shopify location ID...'); - - const client = axios.create({ - baseURL: `https://${SHOP}/admin/api/${API_VERSION}/graphql.json`, - headers: { - 'X-Shopify-Access-Token': ACCESS_TOKEN, - 'Content-Type': 'application/json', - }, - }); - - const response = await client.post('', { - query: ` - query { - locations(first: 10) { - edges { node { id name } } - } - } - `, - }); - + const response = await axios.post( + `https://${shop}/admin/api/${API_VERSION}/graphql.json`, + { query: `{ locations(first: 10) { edges { node { id name } } } }` }, + { headers: { 'X-Shopify-Access-Token': accessToken } } + ); const locations = response.data.data.locations.edges; const locationId = locations[0].node.id; logStep('3', `✅ Location ID: ${locationId}`); return locationId; } -async function startBulkExport() { +async function startBulkExport(shop, accessToken) { logStep('4.1', 'Starting Shopify bulk product export...'); - // const mutation = ` - // mutation { - // bulkOperationRunQuery( - // query: """ - // { - // products { - // edges { - // node { - // id - // handle - // variants { - // edges { - // node { - // id - // inventoryItem { id } - // } - // } - // } - // } - // } - // } - // } - // """ - // ) { - // bulkOperation { id status } - // userErrors { field message } - // } - // } - // `; - const mutation = ` mutation { bulkOperationRunQuery( @@ -252,43 +179,35 @@ async function startBulkExport() { """ ) { bulkOperation { id status } - userErrors { - field - message - code - } + userErrors { field message code } } } `; const resp = await axios.post( - `https://${SHOP}/admin/api/${API_VERSION}/graphql.json`, + `https://${shop}/admin/api/${API_VERSION}/graphql.json`, { query: mutation }, - { headers: { 'X-Shopify-Access-Token': ACCESS_TOKEN } } + { headers: { 'X-Shopify-Access-Token': accessToken } } ); - - const err = resp.data.data.bulkOperationRunQuery.userErrors; if (err.length) throw new Error(JSON.stringify(err)); logStep('4.1', `✅ Export started: ${resp.data.data.bulkOperationRunQuery.bulkOperation.id}`); } -async function pollUntilReady() { +async function pollUntilReady(shop, accessToken) { const query = `{ currentBulkOperation { status url errorCode } }`; while (true) { const resp = await axios.post( - `https://${SHOP}/admin/api/${API_VERSION}/graphql.json`, + `https://${shop}/admin/api/${API_VERSION}/graphql.json`, { query }, - { headers: { 'X-Shopify-Access-Token': ACCESS_TOKEN } } + { headers: { 'X-Shopify-Access-Token': accessToken } } ); - - const op = resp.data.data.currentBulkOperation; - logStep('4.2', `Status: ${op.status}`); + logStep('4.2', `Bulk operation status: ${op.status}`); if (op.status === 'COMPLETED') { - logStep('4.2', `✅ File URL: ${op.url}`); + logStep('4.2', `✅ Export URL ready`); return op.url; } if (op.status === 'FAILED') throw new Error(`Bulk export failed: ${op.errorCode}`); @@ -297,16 +216,16 @@ async function pollUntilReady() { } async function downloadToFile(fileUrl) { + ensureDir(exportDir); const fname = path.join(exportDir, `${timestamp()}_bulk_export.ndjson`); - const fileStream = fs.createWriteStream(fname); if (!fileUrl) { - // 🟡 Create empty NDJSON file instead of downloading await fs.promises.writeFile(fname, '', 'utf8'); - logStep('4.3', `⚠️ No file URL provided. Created empty export file at ${fname}`); + logStep('4.3', `⚠️ No file URL — created empty export file`); return fname; } + const fileStream = fs.createWriteStream(fname); await new Promise((resolve, reject) => { https.get(fileUrl, res => { res.pipe(fileStream); @@ -325,7 +244,7 @@ async function buildHandleMap(ndjsonPath, desiredInventoryByHandle) { const rl = readline.createInterface({ input: fs.createReadStream(ndjsonPath), - crlfDelay: Infinity + crlfDelay: Infinity, }); for await (const line of rl) { @@ -364,8 +283,8 @@ function writeBulkInventoryJSONL(handleMap, desiredInventoryByHandle, outputPath return updates; } -async function updateInventoryBatch(batch, index) { - logStep(`7.${index}`, 'Updating inventory batch...'); +async function updateInventoryBatch(batch, index, shop, accessToken) { + logStep(`7.${index}`, `Updating inventory batch of ${batch.length} items...`); const mutation = ` mutation { inventorySetQuantities(input: { @@ -388,78 +307,82 @@ async function updateInventoryBatch(batch, index) { try { const res = await axios.post( - `https://${SHOP}/admin/api/2025-07/graphql.json`, + `https://${shop}/admin/api/2025-07/graphql.json`, { query: mutation }, { headers: { 'Content-Type': 'application/json', - 'X-Shopify-Access-Token': ACCESS_TOKEN, + 'X-Shopify-Access-Token': accessToken, }, } ); const json = res.data; - if (json.errors || json.data.inventorySetQuantities.userErrors.length) { - console.error(`[7.${index}] ❌ Errors:`, JSON.stringify(json.errors || json.data.inventorySetQuantities.userErrors)); + const userErrors = json.data?.inventorySetQuantities?.userErrors; + if (json.errors || userErrors?.length) { + console.error(`[7.${index}] ❌ Errors:`, JSON.stringify(json.errors || userErrors)); } else { logStep(`7.${index}`, '✅ Batch updated successfully'); } } catch (err) { - console.error(`[7.${index}] ❌ Axios Error:`, err.response?.data || err.message); + console.error(`[7.${index}] ❌ Axios error:`, err.response?.data || err.message); } } // ─── MASTER FUNCTION ────────────────────────────────────────────── async function syncTurn14Inventory(shop, accessToken, FULFILLMENTSERVICEID, LOCATIONID) { + logStep('0', `🔧 Starting syncTurn14Inventory for ${shop}...`); + + // Step 1: Get Turn14 token — skip shop if credentials are missing + let turn14Token; try { + turn14Token = await getTurn14AccessToken(shop, accessToken); + } catch (err) { + logStep('1', `⚠️ Skipping ${shop} — could not get Turn14 token: ${err.message}`); + return; + } + if (!turn14Token) { + logStep('1', `⚠️ Skipping ${shop} — Turn14 token is empty.`); + return; + } - SHOP = shop; - ACCESS_TOKEN = accessToken; - - - - const token = await getTurn14AccessTokenFromMetafield() - .catch(err => { - console.error('Error fetching Turn14 token:', err.message); - }); - - TURN14_ACCESS_TOKEN = token - logStep('0', '🔧 Starting syncTurn14Inventory...'); - - const BATCH_SIZE = 1000; + try { const desiredInventoryByHandle = {}; - // Step 1: Fetch Turn14 data + // Step 1: Fetch Turn14 inventory logStep('1', 'Fetching Turn14 inventory...'); const turn14Res = await fetch('https://turn14.data4autos.com/v1/inventory/allupdates', { headers: { - Authorization: `Bearer ${TURN14_ACCESS_TOKEN}`, + Authorization: `Bearer ${turn14Token}`, 'Content-Type': 'application/json', }, }); - console.log(`Turn14 API response status: ${turn14Res.status}`); + + logStep('1', `Turn14 API response status: ${turn14Res.status}`); + + if (!turn14Res.ok) { + const body = await turn14Res.text(); + throw new Error(`Turn14 inventory fetch failed (${turn14Res.status}): ${body.slice(0, 200)}`); + } const turn14Data = await turn14Res.json(); turn14Data.forEach(item => { desiredInventoryByHandle[item.id] = item.totalQuantity; }); - - //desiredInventoryByHandle["358019"] = 4561 logStep('1', `✅ Loaded ${Object.keys(desiredInventoryByHandle).length} items from Turn14`); - // console.log(desiredInventoryByHandle) + // Step 2: Ensure export dir exists - if (!fs.existsSync(exportDir)) fs.mkdirSync(exportDir); + ensureDir(exportDir); - // Step 3-6: Shopify Export, Mapping & JSONL Write - //const locationId = await getLocationID(); + // Step 3: Get location ID + const locationId = LOCATIONID || await getLocationID(shop, accessToken); + logStep('2', `✅ Using location ID: ${locationId}`); - const locationId = LOCATIONID || await getLocationID(); - logStep('2', `✅ Using location ID: ${LOCATIONID}`); - - await startBulkExport(); - const url = await pollUntilReady(); + // Steps 4-6: Shopify bulk export, mapping, JSONL + await startBulkExport(shop, accessToken); + const url = await pollUntilReady(shop, accessToken); const ndjsonPath = await downloadToFile(url); const handleMap = await buildHandleMap(ndjsonPath, desiredInventoryByHandle); const jsonlPath = path.join(exportDir, JSONL_FILENAME); @@ -468,13 +391,13 @@ async function syncTurn14Inventory(shop, accessToken, FULFILLMENTSERVICEID, LOCA // Step 7: Update in batches const batches = chunkArray(inventoryUpdates, BATCH_SIZE); for (let i = 0; i < batches.length; i++) { - await updateInventoryBatch(batches[i], i + 1); + await updateInventoryBatch(batches[i], i + 1, shop, accessToken); } - logStep('8', '✅ All inventory batches processed.'); + logStep('8', `✅ Inventory sync complete for ${shop}`); } catch (err) { - logStep('🚨 ERROR in process', err.message); - process.exit(1); + logStep('🚨 ERROR', `Sync failed for ${shop}: ${err.message}`); + // Do NOT process.exit — let the next shop continue and PM2 stay alive } } diff --git a/JOBS/InventorySyncJob.js b/JOBS/InventorySyncJob.js index 5a5bc05..a2db5ad 100755 --- a/JOBS/InventorySyncJob.js +++ b/JOBS/InventorySyncJob.js @@ -3,42 +3,59 @@ const path = require('path'); const { syncTurn14Inventory } = require('./InventorySync'); const filePath = path.join(__dirname, '..', 'data', 'tokens.json'); -const logFilePath = path.join(__dirname, '..', 'logs', 'BulkInventorySyncJob.log'); +const logDir = path.join(__dirname, '..', 'logs'); +const logFilePath = path.join(logDir, 'BulkInventorySyncJob.log'); + +function ensureDir(dir) { + if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); +} function logStep(step, message) { - const logMessage = `[${new Date().toISOString()}] [${step}] ${message}`; - console.log(logMessage); - fs.appendFileSync(logFilePath, logMessage + '\n', 'utf8'); + ensureDir(logDir); + const logMessage = `[${new Date().toISOString()}] [${step}] ${message}`; + console.log(logMessage); + fs.appendFileSync(logFilePath, logMessage + '\n', 'utf8'); } -function runBulkInventorySyncJob() { - logStep('Bulk Caller Job', `JOB STARTED`); - const jsonData = JSON.parse(fs.readFileSync(filePath, 'utf8')); +async function runBulkInventorySyncJob() { + logStep('Bulk Caller Job', 'JOB STARTED'); - Object.entries(jsonData).forEach(([shopDomain, details]) => { - const SHOP = shopDomain.trim(); - const ACCESS_TOKEN = details.accessToken; + const jsonData = JSON.parse(fs.readFileSync(filePath, 'utf8')); + const shops = Object.entries(jsonData); - const fulfillmentServiceTokens = details.fulfillmentService || {} - const FULFILLMENTSERVICEID = fulfillmentServiceTokens.id || null; - // const LOCATIONID = fulfillmentServiceTokens.location ? fulfillmentServiceTokens.location.id : null; - const LOCATIONID = details.locationId ? details.locationId : null; + // Process shops sequentially to avoid hammering Turn14 with parallel requests + for (const [shopDomain, details] of shops) { + const SHOP = shopDomain.trim(); + const ACCESS_TOKEN = details.accessToken; + const fulfillmentServiceTokens = details.fulfillmentService || {}; + const FULFILLMENTSERVICEID = fulfillmentServiceTokens.id || null; + const LOCATIONID = details.locationId || null; - console.log("Location ID : ", LOCATIONID) + logStep('Bulk Caller Job', `Syncing inventory for: ${SHOP} (locationId: ${LOCATIONID})`); - logStep('Bulk Caller Job', `Syncing inventory for: ${SHOP}`); - syncTurn14Inventory(SHOP, ACCESS_TOKEN, FULFILLMENTSERVICEID, LOCATIONID); + try { + await syncTurn14Inventory(SHOP, ACCESS_TOKEN, FULFILLMENTSERVICEID, LOCATIONID); + } catch (err) { + logStep('Bulk Caller Job', `❌ Unexpected error for ${SHOP}: ${err.message}`); + } + } + + logStep('Bulk Caller Job', 'JOB COMPLETED'); +} + +// Schedule: every 3 hours and 50 minutes +const INTERVAL_MS = (3 * 60 + 50) * 60 * 1000; + +function scheduleEvery3Hrs50Mins() { + runBulkInventorySyncJob().catch(err => { + console.error('[Scheduler] Unhandled error in job run:', err.message); + }); + + setInterval(() => { + runBulkInventorySyncJob().catch(err => { + console.error('[Scheduler] Unhandled error in job run:', err.message); }); - // logStep('Bulk Caller Job', `JOB COMPLETED`); + }, INTERVAL_MS); } -// ⏱ Schedule: every 3 hours and 40 minutes -function scheduleEvery3Hrs40Mins() { - runBulkInventorySyncJob(); // Run immediately on start - - var intervalMs = (3 * 60 + 50) * 60 * 1000; // 13200000 ms = 3hr 50min - //intervalMs = (1 * 60 - 10) * 60 * 1000; // 13200000 ms = 3hr 40min - setInterval(runBulkInventorySyncJob, intervalMs); -} - -scheduleEvery3Hrs40Mins(); +scheduleEvery3Hrs50Mins();