import json import math from datetime import datetime, timezone, date from pathlib import Path import pandas as pd import psycopg2 from psycopg2.extras import execute_values, Json DB_CONFIG = { "host": "localhost", "port": 5432, "dbname": "trading_db", "user": "trader", "password": "traderpass", } ROOT = Path(__file__).resolve().parent BACKEND_STORAGE = ROOT / "backend" / "storage" BACKEND_LOGS = ROOT / "backend" / "logs" ENGINE_STORAGE = ROOT / "indian_paper_trading_strategy" / "storage" ENGINE_HISTORY = ROOT / "indian_paper_trading_strategy" / "history" ROOT_STORAGE = ROOT / "storage" def load_json(path, default): if not path.exists(): return default try: raw = path.read_text(encoding="utf-8").strip() if not raw: return default return json.loads(raw) except Exception: return default def parse_ts(value): if value is None: return None if isinstance(value, datetime): if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc) if isinstance(value, date) and not isinstance(value, datetime): return datetime(value.year, value.month, value.day, tzinfo=timezone.utc) try: ts = pd.to_datetime(value, utc=True, errors="coerce") except Exception: return None if pd.isna(ts): return None return ts.to_pydatetime() def parse_date(value): if value is None: return None if isinstance(value, date) and not isinstance(value, datetime): return value try: ts = pd.to_datetime(value, utc=True, errors="coerce") except Exception: return None if pd.isna(ts): return None return ts.date() def to_number(value): if value is None: return None if isinstance(value, (int, float)): if isinstance(value, float) and (math.isnan(value) or math.isinf(value)): return None return float(value) if isinstance(value, str): text = value.strip() if not text: return None try: return float(text) except Exception: return None if pd.isna(value): return None return value def to_int(value): if value is None: return None if isinstance(value, bool): return int(value) if isinstance(value, (int, float)): if isinstance(value, float) and math.isnan(value): return None return int(value) if isinstance(value, str): text = value.strip() if not text: return None try: return int(float(text)) except Exception: return None return None def normalize_text(value): if value is None: return None if isinstance(value, str): return value return json.dumps(value) def run_domain(conn, name, func): with conn: with conn.cursor() as cur: func(cur) def migrate_users_sessions(cur): users = load_json(BACKEND_STORAGE / "users.json", []) if isinstance(users, list) and users: rows = [] for user in users: if not isinstance(user, dict): continue user_id = user.get("id") username = user.get("username") password_hash = user.get("password") if not user_id or not username or password_hash is None: continue rows.append((user_id, username, password_hash)) if rows: execute_values( cur, """ INSERT INTO app_user (id, username, password_hash) VALUES %s ON CONFLICT (id) DO UPDATE SET username = EXCLUDED.username, password_hash = EXCLUDED.password_hash """, rows, ) sessions = load_json(BACKEND_STORAGE / "sessions.json", []) if isinstance(sessions, list) and sessions: rows = [] for session in sessions: if not isinstance(session, dict): continue session_id = session.get("id") user_id = session.get("user_id") created_at = parse_ts(session.get("created_at")) last_seen_at = parse_ts(session.get("last_seen_at")) expires_at = parse_ts(session.get("expires_at")) if not session_id or not user_id or created_at is None or expires_at is None: continue rows.append((session_id, user_id, created_at, last_seen_at, expires_at)) if rows: execute_values( cur, """ INSERT INTO app_session (id, user_id, created_at, last_seen_at, expires_at) VALUES %s ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, created_at = EXCLUDED.created_at, last_seen_at = EXCLUDED.last_seen_at, expires_at = EXCLUDED.expires_at """, rows, ) def migrate_user_brokers(cur): brokers = load_json(BACKEND_STORAGE / "user_brokers.json", {}) if not isinstance(brokers, dict) or not brokers: return rows = [] for user_id, entry in brokers.items(): if not isinstance(entry, dict): continue pending = entry.get("pending") if isinstance(entry.get("pending"), dict) else {} rows.append( ( user_id, entry.get("broker"), bool(entry.get("connected")) if entry.get("connected") is not None else False, entry.get("access_token"), parse_ts(entry.get("connected_at")), entry.get("api_key"), entry.get("user_name"), entry.get("broker_user_id"), pending.get("broker"), pending.get("api_key"), pending.get("api_secret"), parse_ts(pending.get("started_at")), ) ) if rows: execute_values( cur, """ INSERT INTO user_broker ( user_id, broker, connected, access_token, connected_at, api_key, user_name, broker_user_id, pending_broker, pending_api_key, pending_api_secret, pending_started_at ) VALUES %s ON CONFLICT (user_id) DO UPDATE SET broker = EXCLUDED.broker, connected = EXCLUDED.connected, access_token = EXCLUDED.access_token, connected_at = EXCLUDED.connected_at, api_key = EXCLUDED.api_key, user_name = EXCLUDED.user_name, broker_user_id = EXCLUDED.broker_user_id, pending_broker = EXCLUDED.pending_broker, pending_api_key = EXCLUDED.pending_api_key, pending_api_secret = EXCLUDED.pending_api_secret, pending_started_at = EXCLUDED.pending_started_at """, rows, ) def migrate_zerodha(cur): sessions = load_json(BACKEND_STORAGE / "zerodha_sessions.json", []) if isinstance(sessions, list): for item in sessions: if not isinstance(item, dict): continue user_id = item.get("user_id") linked_at = parse_ts(item.get("linked_at")) if not user_id or linked_at is None: continue api_key = item.get("api_key") access_token = item.get("access_token") request_token = item.get("request_token") user_name = item.get("user_name") broker_user_id = item.get("broker_user_id") cur.execute( """ INSERT INTO zerodha_session ( user_id, linked_at, api_key, access_token, request_token, user_name, broker_user_id ) SELECT %s, %s, %s, %s, %s, %s, %s WHERE NOT EXISTS ( SELECT 1 FROM zerodha_session WHERE user_id = %s AND linked_at = %s AND api_key IS NOT DISTINCT FROM %s AND access_token IS NOT DISTINCT FROM %s AND request_token IS NOT DISTINCT FROM %s AND user_name IS NOT DISTINCT FROM %s AND broker_user_id IS NOT DISTINCT FROM %s ) """, ( user_id, linked_at, api_key, access_token, request_token, user_name, broker_user_id, user_id, linked_at, api_key, access_token, request_token, user_name, broker_user_id, ), ) tokens = load_json(BACKEND_STORAGE / "zerodha_request_tokens.json", []) if isinstance(tokens, list) and tokens: rows = [] for item in tokens: if not isinstance(item, dict): continue user_id = item.get("user_id") request_token = item.get("request_token") if not user_id or request_token is None: continue rows.append((user_id, request_token)) if rows: execute_values( cur, """ INSERT INTO zerodha_request_token (user_id, request_token) VALUES %s ON CONFLICT (user_id) DO UPDATE SET request_token = EXCLUDED.request_token """, rows, ) def migrate_strategy_config(cur): cfg = load_json(BACKEND_STORAGE / "strategy_config.json", {}) if not isinstance(cfg, dict) or not cfg: return sip_frequency = cfg.get("sip_frequency") sip_frequency_value = None sip_frequency_unit = None if isinstance(sip_frequency, dict): sip_frequency_value = to_int(sip_frequency.get("value")) sip_frequency_unit = sip_frequency.get("unit") frequency = cfg.get("frequency") frequency_text = normalize_text(frequency) frequency_days = to_int(cfg.get("frequency_days")) unit = cfg.get("unit") next_run = parse_ts(cfg.get("next_run")) cur.execute( """ INSERT INTO strategy_config ( id, strategy, sip_amount, sip_frequency_value, sip_frequency_unit, mode, broker, active, frequency, frequency_days, unit, next_run ) VALUES ( 1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON CONFLICT (id) DO UPDATE SET strategy = EXCLUDED.strategy, sip_amount = EXCLUDED.sip_amount, sip_frequency_value = EXCLUDED.sip_frequency_value, sip_frequency_unit = EXCLUDED.sip_frequency_unit, mode = EXCLUDED.mode, broker = EXCLUDED.broker, active = EXCLUDED.active, frequency = EXCLUDED.frequency, frequency_days = EXCLUDED.frequency_days, unit = EXCLUDED.unit, next_run = EXCLUDED.next_run """, ( cfg.get("strategy"), to_number(cfg.get("sip_amount")), sip_frequency_value, sip_frequency_unit, cfg.get("mode"), cfg.get("broker"), cfg.get("active"), frequency_text, frequency_days, unit, next_run, ), ) def migrate_engine_status(cur): status = load_json(BACKEND_STORAGE / "engine_status.json", {}) if not isinstance(status, dict) or not status: return cur.execute( """ INSERT INTO engine_status (id, status, last_updated) VALUES (1, %s, %s) ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, last_updated = EXCLUDED.last_updated """, ( status.get("status"), parse_ts(status.get("last_updated")), ), ) def migrate_strategy_logs(cur): logs = load_json(BACKEND_STORAGE / "strategy_logs.json", []) if not isinstance(logs, list) or not logs: return rows = [] for entry in logs: if not isinstance(entry, dict): continue seq = entry.get("seq") ts = parse_ts(entry.get("ts")) if seq is None or ts is None: continue rows.append( ( int(seq), ts, entry.get("level"), entry.get("category"), entry.get("event"), entry.get("message"), entry.get("run_id"), Json(entry.get("meta")) if isinstance(entry.get("meta"), dict) else None, ) ) if rows: execute_values( cur, """ INSERT INTO strategy_log ( seq, ts, level, category, event, message, run_id, meta ) VALUES %s ON CONFLICT (seq) DO NOTHING """, rows, ) def migrate_engine_log(cur): log_paths = [ BACKEND_STORAGE / "engine.log", BACKEND_LOGS / "engine.log", ] for path in log_paths: if not path.exists(): continue for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: payload = json.loads(line) except Exception: continue if not isinstance(payload, dict): continue ts = parse_ts(payload.get("ts")) if ts is None: continue event = payload.get("event") data = payload.get("data") message = payload.get("message") meta = payload.get("meta") cur.execute( """ INSERT INTO engine_event (ts, event, data, message, meta) SELECT %s, %s, %s, %s, %s WHERE NOT EXISTS ( SELECT 1 FROM engine_event WHERE ts = %s AND event IS NOT DISTINCT FROM %s AND data IS NOT DISTINCT FROM %s AND message IS NOT DISTINCT FROM %s AND meta IS NOT DISTINCT FROM %s ) """, ( ts, event, Json(data) if isinstance(data, dict) else Json(data) if data is not None else None, message, Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None, ts, event, Json(data) if isinstance(data, dict) else Json(data) if data is not None else None, message, Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None, ), ) def migrate_engine_state(cur): state_paper = load_json(ENGINE_STORAGE / "state_paper.json", {}) if isinstance(state_paper, dict) and state_paper: sip_freq = state_paper.get("sip_frequency") sip_value = None sip_unit = None if isinstance(sip_freq, dict): sip_value = to_int(sip_freq.get("value")) sip_unit = sip_freq.get("unit") cur.execute( """ INSERT INTO engine_state_paper ( id, initial_cash, cash, total_invested, nifty_units, gold_units, last_sip_ts, last_run, sip_frequency_value, sip_frequency_unit ) VALUES (1, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET initial_cash = EXCLUDED.initial_cash, cash = EXCLUDED.cash, total_invested = EXCLUDED.total_invested, nifty_units = EXCLUDED.nifty_units, gold_units = EXCLUDED.gold_units, last_sip_ts = EXCLUDED.last_sip_ts, last_run = EXCLUDED.last_run, sip_frequency_value = EXCLUDED.sip_frequency_value, sip_frequency_unit = EXCLUDED.sip_frequency_unit """, ( to_number(state_paper.get("initial_cash")), to_number(state_paper.get("cash")), to_number(state_paper.get("total_invested")), to_number(state_paper.get("nifty_units")), to_number(state_paper.get("gold_units")), parse_ts(state_paper.get("last_sip_ts")), parse_ts(state_paper.get("last_run")), sip_value, sip_unit, ), ) state = load_json(ENGINE_STORAGE / "state.json", {}) if isinstance(state, dict) and state: cur.execute( """ INSERT INTO engine_state ( id, total_invested, nifty_units, gold_units, last_sip_ts, last_run ) VALUES (1, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET total_invested = EXCLUDED.total_invested, nifty_units = EXCLUDED.nifty_units, gold_units = EXCLUDED.gold_units, last_sip_ts = EXCLUDED.last_sip_ts, last_run = EXCLUDED.last_run """, ( to_number(state.get("total_invested")), to_number(state.get("nifty_units")), to_number(state.get("gold_units")), parse_ts(state.get("last_sip_ts")), parse_ts(state.get("last_run")), ), ) def migrate_paper_broker(cur): broker = load_json(ENGINE_STORAGE / "paper_broker.json", {}) if not isinstance(broker, dict) or not broker: return cash = to_number(broker.get("cash")) if cash is not None: cur.execute( """ INSERT INTO paper_broker_account (id, cash) VALUES (1, %s) ON CONFLICT (id) DO UPDATE SET cash = EXCLUDED.cash """, (cash,), ) now_utc = datetime.now(timezone.utc) positions = broker.get("positions") if isinstance(positions, dict) and positions: rows = [] for symbol, data in positions.items(): if not isinstance(data, dict): continue rows.append( ( symbol, to_number(data.get("qty")), to_number(data.get("avg_price")), to_number(data.get("last_price")), now_utc, ) ) if rows: execute_values( cur, """ INSERT INTO paper_position ( symbol, qty, avg_price, last_price, updated_at ) VALUES %s ON CONFLICT (symbol) DO UPDATE SET qty = EXCLUDED.qty, avg_price = EXCLUDED.avg_price, last_price = EXCLUDED.last_price, updated_at = EXCLUDED.updated_at """, rows, ) orders = broker.get("orders") if isinstance(orders, list) and orders: rows = [] for order in orders: if not isinstance(order, dict): continue order_id = order.get("id") if not order_id: continue rows.append( ( order_id, order.get("symbol"), order.get("side"), to_number(order.get("qty")), to_number(order.get("price")), order.get("status"), parse_ts(order.get("timestamp")), ) ) if rows: execute_values( cur, """ INSERT INTO paper_order ( id, symbol, side, qty, price, status, timestamp ) VALUES %s ON CONFLICT (id) DO UPDATE SET symbol = EXCLUDED.symbol, side = EXCLUDED.side, qty = EXCLUDED.qty, price = EXCLUDED.price, status = EXCLUDED.status, timestamp = EXCLUDED.timestamp """, rows, ) trades = broker.get("trades") if isinstance(trades, list) and trades: rows = [] for trade in trades: if not isinstance(trade, dict): continue trade_id = trade.get("id") if not trade_id: continue rows.append( ( trade_id, trade.get("order_id"), trade.get("symbol"), trade.get("side"), to_number(trade.get("qty")), to_number(trade.get("price")), parse_ts(trade.get("timestamp")), ) ) if rows: execute_values( cur, """ INSERT INTO paper_trade ( id, order_id, symbol, side, qty, price, timestamp ) VALUES %s ON CONFLICT (id) DO UPDATE SET order_id = EXCLUDED.order_id, symbol = EXCLUDED.symbol, side = EXCLUDED.side, qty = EXCLUDED.qty, price = EXCLUDED.price, timestamp = EXCLUDED.timestamp """, rows, ) equity_curve = broker.get("equity_curve") if isinstance(equity_curve, list) and equity_curve: rows = [] for point in equity_curve: if not isinstance(point, dict): continue ts = parse_ts(point.get("timestamp")) if ts is None: continue rows.append( ( ts, to_number(point.get("equity")), to_number(point.get("pnl")), ) ) if rows: execute_values( cur, """ INSERT INTO paper_equity_curve (timestamp, equity, pnl) VALUES %s ON CONFLICT (timestamp) DO UPDATE SET equity = EXCLUDED.equity, pnl = EXCLUDED.pnl """, rows, ) def migrate_mtm_ledger(cur): path = ENGINE_STORAGE / "mtm_ledger.csv" if not path.exists(): return df = pd.read_csv(path) if df.empty: return rows = [] for _, row in df.iterrows(): ts = parse_ts(row.get("timestamp")) if ts is None: continue rows.append( ( ts, to_number(row.get("nifty_units")), to_number(row.get("gold_units")), to_number(row.get("nifty_price")), to_number(row.get("gold_price")), to_number(row.get("nifty_value")), to_number(row.get("gold_value")), to_number(row.get("portfolio_value")), to_number(row.get("total_invested")), to_number(row.get("pnl")), ) ) if rows: execute_values( cur, """ INSERT INTO mtm_ledger ( timestamp, nifty_units, gold_units, nifty_price, gold_price, nifty_value, gold_value, portfolio_value, total_invested, pnl ) VALUES %s ON CONFLICT (timestamp) DO NOTHING """, rows, ) def migrate_event_ledger(cur): path = ENGINE_STORAGE / "ledger.csv" if not path.exists(): return df = pd.read_csv(path) if df.empty: return for _, row in df.iterrows(): ts = parse_ts(row.get("timestamp")) if ts is None: continue event = row.get("event") cur.execute( """ INSERT INTO event_ledger ( timestamp, event, nifty_units, gold_units, nifty_price, gold_price, amount ) SELECT %s, %s, %s, %s, %s, %s, %s WHERE NOT EXISTS ( SELECT 1 FROM event_ledger WHERE timestamp = %s AND event IS NOT DISTINCT FROM %s AND nifty_units IS NOT DISTINCT FROM %s AND gold_units IS NOT DISTINCT FROM %s AND nifty_price IS NOT DISTINCT FROM %s AND gold_price IS NOT DISTINCT FROM %s AND amount IS NOT DISTINCT FROM %s ) """, ( ts, event, to_number(row.get("nifty_units")), to_number(row.get("gold_units")), to_number(row.get("nifty_price")), to_number(row.get("gold_price")), to_number(row.get("amount")), ts, event, to_number(row.get("nifty_units")), to_number(row.get("gold_units")), to_number(row.get("nifty_price")), to_number(row.get("gold_price")), to_number(row.get("amount")), ), ) def migrate_market_history(cur): history_paths = set() for base in (ENGINE_HISTORY, ENGINE_STORAGE / "history", ROOT_STORAGE / "history"): if base.exists(): history_paths.update(base.glob("*.csv")) if not history_paths: return rows = [] for path in sorted(history_paths): symbol = path.stem try: df = pd.read_csv(path) except Exception: continue if df.empty or "Date" not in df.columns or "Close" not in df.columns: continue for _, row in df.iterrows(): dt = parse_date(row.get("Date")) close = to_number(row.get("Close")) if dt is None or close is None: continue rows.append((symbol, dt, close)) if rows: execute_values( cur, """ INSERT INTO market_close (symbol, date, close) VALUES %s ON CONFLICT (symbol, date) DO NOTHING """, rows, ) def print_counts(cur): tables = [ "app_user", "app_session", "user_broker", "zerodha_session", "zerodha_request_token", "strategy_config", "engine_status", "strategy_log", "engine_event", "engine_state_paper", "engine_state", "paper_broker_account", "paper_position", "paper_order", "paper_trade", "paper_equity_curve", "mtm_ledger", "event_ledger", "market_close", ] for table in tables: cur.execute(f"SELECT COUNT(*) FROM {table}") count = cur.fetchone()[0] print(f"{table}: {count}") def main(): conn = psycopg2.connect(**DB_CONFIG) try: run_domain(conn, "users_sessions", migrate_users_sessions) run_domain(conn, "user_brokers", migrate_user_brokers) run_domain(conn, "zerodha", migrate_zerodha) run_domain(conn, "strategy_config", migrate_strategy_config) run_domain(conn, "engine_status", migrate_engine_status) run_domain(conn, "strategy_logs", migrate_strategy_logs) run_domain(conn, "engine_log", migrate_engine_log) run_domain(conn, "engine_state", migrate_engine_state) run_domain(conn, "paper_broker", migrate_paper_broker) run_domain(conn, "mtm_ledger", migrate_mtm_ledger) run_domain(conn, "event_ledger", migrate_event_ledger) run_domain(conn, "market_history", migrate_market_history) with conn: with conn.cursor() as cur: print_counts(cur) finally: conn.close() if __name__ == "__main__": main()