Data4Autos-Shopify-Backend/JOBS/InventorySync.js
MOHAN c2b5168a04 fix: resolve inventory sync crash loop and parallel execution issues
- Remove process.exit(1) that caused PM2 to restart in an infinite loop
- Fix && false bug that forced Turn14 token refresh on every run
- Remove global SHOP/ACCESS_TOKEN to eliminate race conditions
- Make shop loop sequential (for...of + await) to prevent Turn14 429s
- Add early return when Turn14 credentials are missing for a shop
- Guard non-JSON Turn14 responses (429 plain text) before calling .json()
- Ensure logs/ and exports/ dirs exist before writing
- Pass shop/accessToken as params to all helper functions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-18 00:53:55 +05:30

405 lines
13 KiB
JavaScript
Executable File

import axios from 'axios';
import https from 'https';
import fs from 'fs';
import readline from 'readline';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// ─── CONFIG ────────────────────────────────────────────────────────
const API_VERSION = '2025-10';
const exportDir = path.join(__dirname, '..', 'exports');
const logDir = path.join(__dirname, '..', 'logs');
const logFilePath = path.join(logDir, 'BulkInventorySyncJob.log');
const JSONL_FILENAME = 'inventory_data.jsonl';
const BATCH_SIZE = 1000;
// ─── HELPERS ───────────────────────────────────────────────────────
function ensureDir(dir) {
if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true });
}
function chunkArray(array, size) {
const result = [];
for (let i = 0; i < array.length; i += size) {
result.push(array.slice(i, i + size));
}
return result;
}
function timestamp() {
return new Date().toISOString().replace(/[-:]/g, '').replace('T', '_').split('.')[0];
}
function logStep(step, message) {
ensureDir(logDir);
const logMessage = `[${new Date().toISOString()}] [${step}] ${message}`;
console.log(logMessage);
fs.appendFileSync(logFilePath, logMessage + '\n', 'utf8');
}
// ─── TURN14 TOKEN ──────────────────────────────────────────────────
async function getTurn14AccessToken(shop, accessToken) {
const client = axios.create({
baseURL: `https://${shop}/admin/api/${API_VERSION}/graphql.json`,
headers: {
'X-Shopify-Access-Token': accessToken,
'Content-Type': 'application/json',
},
});
const query = `{
shop {
id
metafield(namespace: "turn14", key: "credentials") {
value
}
}
}`;
const gqlRes = await client.post('', { query });
const shopId = gqlRes.data?.data?.shop?.id;
const raw = gqlRes.data?.data?.shop?.metafield?.value;
if (!raw) {
throw new Error('No Turn14 credentials found in Shopify metafield.');
}
let creds;
try {
creds = JSON.parse(raw);
} catch {
throw new Error('Malformed Turn14 credential metafield.');
}
logStep('1', `Turn14 credentials loaded from metafield (clientId: ${creds.clientId?.slice(0, 8)}...)`);
const now = new Date();
const expiresAt = creds.expiresAt ? new Date(creds.expiresAt) : new Date(0);
const isExpired = now >= expiresAt;
if (!isExpired && creds.accessToken) {
logStep('1', 'Turn14 token is still valid, using cached token.');
return creds.accessToken;
}
logStep('1', 'Turn14 token expired or missing. Refreshing...');
const response = await axios.post(
'https://turn14.data4autos.com/v1/auth/token',
{
grant_type: 'client_credentials',
client_id: creds.clientId,
client_secret: creds.clientSecret,
},
{ headers: { 'Content-Type': 'application/json' } }
);
const newToken = response.data.access_token;
if (!newToken) {
throw new Error('Turn14 token refresh returned no access_token.');
}
const newExpiresAt = new Date(Date.now() + 3600 * 1000).toISOString();
const newValue = JSON.stringify({
clientId: creds.clientId,
clientSecret: creds.clientSecret,
accessToken: newToken,
expiresAt: newExpiresAt,
}).replace(/"/g, '\\"');
const mutation = `
mutation {
metafieldsSet(metafields: [{
ownerId: "${shopId}"
namespace: "turn14"
key: "credentials"
type: "json"
value: "${newValue}"
}]) {
userErrors { field message }
}
}
`;
const updateRes = await client.post('', { query: mutation });
const userErrors = updateRes.data?.data?.metafieldsSet?.userErrors;
if (userErrors?.length) {
throw new Error(`Failed to update metafield: ${JSON.stringify(userErrors)}`);
}
logStep('1', '✅ Turn14 token refreshed and metafield updated.');
return newToken;
}
// ─── SHOPIFY HELPERS ───────────────────────────────────────────────
async function getLocationID(shop, accessToken) {
logStep('3', 'Fetching Shopify location ID...');
const response = await axios.post(
`https://${shop}/admin/api/${API_VERSION}/graphql.json`,
{ query: `{ locations(first: 10) { edges { node { id name } } } }` },
{ headers: { 'X-Shopify-Access-Token': accessToken } }
);
const locations = response.data.data.locations.edges;
const locationId = locations[0].node.id;
logStep('3', `✅ Location ID: ${locationId}`);
return locationId;
}
async function startBulkExport(shop, accessToken) {
logStep('4.1', 'Starting Shopify bulk product export...');
const mutation = `
mutation {
bulkOperationRunQuery(
query: """
{
products {
edges {
node {
id
handle
variants {
edges {
node {
id
inventoryItem { id }
}
}
}
}
}
}
}
"""
) {
bulkOperation { id status }
userErrors { field message code }
}
}
`;
const resp = await axios.post(
`https://${shop}/admin/api/${API_VERSION}/graphql.json`,
{ query: mutation },
{ headers: { 'X-Shopify-Access-Token': accessToken } }
);
const err = resp.data.data.bulkOperationRunQuery.userErrors;
if (err.length) throw new Error(JSON.stringify(err));
logStep('4.1', `✅ Export started: ${resp.data.data.bulkOperationRunQuery.bulkOperation.id}`);
}
async function pollUntilReady(shop, accessToken) {
const query = `{ currentBulkOperation { status url errorCode } }`;
while (true) {
const resp = await axios.post(
`https://${shop}/admin/api/${API_VERSION}/graphql.json`,
{ query },
{ headers: { 'X-Shopify-Access-Token': accessToken } }
);
const op = resp.data.data.currentBulkOperation;
logStep('4.2', `Bulk operation status: ${op.status}`);
if (op.status === 'COMPLETED') {
logStep('4.2', `✅ Export URL ready`);
return op.url;
}
if (op.status === 'FAILED') throw new Error(`Bulk export failed: ${op.errorCode}`);
await new Promise(r => setTimeout(r, 5000));
}
}
async function downloadToFile(fileUrl) {
ensureDir(exportDir);
const fname = path.join(exportDir, `${timestamp()}_bulk_export.ndjson`);
if (!fileUrl) {
await fs.promises.writeFile(fname, '', 'utf8');
logStep('4.3', `⚠️ No file URL — created empty export file`);
return fname;
}
const fileStream = fs.createWriteStream(fname);
await new Promise((resolve, reject) => {
https.get(fileUrl, res => {
res.pipe(fileStream);
fileStream.on('finish', () => fileStream.close(resolve));
fileStream.on('error', reject);
});
});
logStep('4.3', `✅ Downloaded export to ${fname}`);
return fname;
}
async function buildHandleMap(ndjsonPath, desiredInventoryByHandle) {
logStep('5', 'Building handle → inventory map...');
const handleMap = {};
const productIdToHandle = {};
const rl = readline.createInterface({
input: fs.createReadStream(ndjsonPath),
crlfDelay: Infinity,
});
for await (const line of rl) {
const rec = JSON.parse(line);
if (rec.handle && rec.id) productIdToHandle[rec.id] = rec.handle;
if (rec.__parentId && rec.inventoryItem?.id) {
const handle = productIdToHandle[rec.__parentId];
if (handle && desiredInventoryByHandle[handle] != null) {
if (!handleMap[handle]) handleMap[handle] = [];
handleMap[handle].push({ inventoryItemId: rec.inventoryItem.id });
}
}
}
logStep('5', `✅ Mapped ${Object.keys(handleMap).length} handles`);
return handleMap;
}
function writeBulkInventoryJSONL(handleMap, desiredInventoryByHandle, outputPath, locationId) {
logStep('6', 'Writing JSONL inventory updates...');
const stream = fs.createWriteStream(outputPath);
const updates = [];
for (const [handle, items] of Object.entries(handleMap)) {
const qty = desiredInventoryByHandle[handle];
if (qty == null) continue;
items.forEach(({ inventoryItemId }) => {
const entry = { inventoryItemId, quantity: qty, locationId };
updates.push(entry);
stream.write(JSON.stringify({ input: entry }) + '\n');
});
}
stream.end();
logStep('6', `✅ Wrote ${updates.length} inventory adjustments`);
return updates;
}
async function updateInventoryBatch(batch, index, shop, accessToken) {
logStep(`7.${index}`, `Updating inventory batch of ${batch.length} items...`);
const mutation = `
mutation {
inventorySetQuantities(input: {
name: "available",
reason: "correction",
ignoreCompareQuantity: true,
quantities: [
${batch.map(item => `{
inventoryItemId: "${item.inventoryItemId}",
locationId: "${item.locationId}",
quantity: ${item.quantity}
}`).join(',\n')}
]
}) {
inventoryAdjustmentGroup { id }
userErrors { field message }
}
}
`;
try {
const res = await axios.post(
`https://${shop}/admin/api/2025-07/graphql.json`,
{ query: mutation },
{
headers: {
'Content-Type': 'application/json',
'X-Shopify-Access-Token': accessToken,
},
}
);
const json = res.data;
const userErrors = json.data?.inventorySetQuantities?.userErrors;
if (json.errors || userErrors?.length) {
console.error(`[7.${index}] ❌ Errors:`, JSON.stringify(json.errors || userErrors));
} else {
logStep(`7.${index}`, '✅ Batch updated successfully');
}
} catch (err) {
console.error(`[7.${index}] ❌ Axios error:`, err.response?.data || err.message);
}
}
// ─── MASTER FUNCTION ──────────────────────────────────────────────
async function syncTurn14Inventory(shop, accessToken, FULFILLMENTSERVICEID, LOCATIONID) {
logStep('0', `🔧 Starting syncTurn14Inventory for ${shop}...`);
// Step 1: Get Turn14 token — skip shop if credentials are missing
let turn14Token;
try {
turn14Token = await getTurn14AccessToken(shop, accessToken);
} catch (err) {
logStep('1', `⚠️ Skipping ${shop} — could not get Turn14 token: ${err.message}`);
return;
}
if (!turn14Token) {
logStep('1', `⚠️ Skipping ${shop} — Turn14 token is empty.`);
return;
}
try {
const desiredInventoryByHandle = {};
// Step 1: Fetch Turn14 inventory
logStep('1', 'Fetching Turn14 inventory...');
const turn14Res = await fetch('https://turn14.data4autos.com/v1/inventory/allupdates', {
headers: {
Authorization: `Bearer ${turn14Token}`,
'Content-Type': 'application/json',
},
});
logStep('1', `Turn14 API response status: ${turn14Res.status}`);
if (!turn14Res.ok) {
const body = await turn14Res.text();
throw new Error(`Turn14 inventory fetch failed (${turn14Res.status}): ${body.slice(0, 200)}`);
}
const turn14Data = await turn14Res.json();
turn14Data.forEach(item => {
desiredInventoryByHandle[item.id] = item.totalQuantity;
});
logStep('1', `✅ Loaded ${Object.keys(desiredInventoryByHandle).length} items from Turn14`);
// Step 2: Ensure export dir exists
ensureDir(exportDir);
// Step 3: Get location ID
const locationId = LOCATIONID || await getLocationID(shop, accessToken);
logStep('2', `✅ Using location ID: ${locationId}`);
// Steps 4-6: Shopify bulk export, mapping, JSONL
await startBulkExport(shop, accessToken);
const url = await pollUntilReady(shop, accessToken);
const ndjsonPath = await downloadToFile(url);
const handleMap = await buildHandleMap(ndjsonPath, desiredInventoryByHandle);
const jsonlPath = path.join(exportDir, JSONL_FILENAME);
const inventoryUpdates = writeBulkInventoryJSONL(handleMap, desiredInventoryByHandle, jsonlPath, locationId);
// Step 7: Update in batches
const batches = chunkArray(inventoryUpdates, BATCH_SIZE);
for (let i = 0; i < batches.length; i++) {
await updateInventoryBatch(batches[i], i + 1, shop, accessToken);
}
logStep('8', `✅ Inventory sync complete for ${shop}`);
} catch (err) {
logStep('🚨 ERROR', `Sync failed for ${shop}: ${err.message}`);
// Do NOT process.exit — let the next shop continue and PM2 stay alive
}
}
export { syncTurn14Inventory };