SIP_GoldBees_Database/migrate_to_db.py
2026-02-01 14:14:57 +00:00

915 lines
29 KiB
Python

import json
import math
from datetime import datetime, timezone, date
from pathlib import Path
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values, Json
DB_CONFIG = {
"host": "localhost",
"port": 5432,
"dbname": "trading_db",
"user": "trader",
"password": "traderpass",
}
ROOT = Path(__file__).resolve().parent
BACKEND_STORAGE = ROOT / "backend" / "storage"
BACKEND_LOGS = ROOT / "backend" / "logs"
ENGINE_STORAGE = ROOT / "indian_paper_trading_strategy" / "storage"
ENGINE_HISTORY = ROOT / "indian_paper_trading_strategy" / "history"
ROOT_STORAGE = ROOT / "storage"
def load_json(path, default):
if not path.exists():
return default
try:
raw = path.read_text(encoding="utf-8").strip()
if not raw:
return default
return json.loads(raw)
except Exception:
return default
def parse_ts(value):
if value is None:
return None
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
if isinstance(value, date) and not isinstance(value, datetime):
return datetime(value.year, value.month, value.day, tzinfo=timezone.utc)
try:
ts = pd.to_datetime(value, utc=True, errors="coerce")
except Exception:
return None
if pd.isna(ts):
return None
return ts.to_pydatetime()
def parse_date(value):
if value is None:
return None
if isinstance(value, date) and not isinstance(value, datetime):
return value
try:
ts = pd.to_datetime(value, utc=True, errors="coerce")
except Exception:
return None
if pd.isna(ts):
return None
return ts.date()
def to_number(value):
if value is None:
return None
if isinstance(value, (int, float)):
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return None
return float(value)
if isinstance(value, str):
text = value.strip()
if not text:
return None
try:
return float(text)
except Exception:
return None
if pd.isna(value):
return None
return value
def to_int(value):
if value is None:
return None
if isinstance(value, bool):
return int(value)
if isinstance(value, (int, float)):
if isinstance(value, float) and math.isnan(value):
return None
return int(value)
if isinstance(value, str):
text = value.strip()
if not text:
return None
try:
return int(float(text))
except Exception:
return None
return None
def normalize_text(value):
if value is None:
return None
if isinstance(value, str):
return value
return json.dumps(value)
def run_domain(conn, name, func):
with conn:
with conn.cursor() as cur:
func(cur)
def migrate_users_sessions(cur):
users = load_json(BACKEND_STORAGE / "users.json", [])
if isinstance(users, list) and users:
rows = []
for user in users:
if not isinstance(user, dict):
continue
user_id = user.get("id")
username = user.get("username")
password_hash = user.get("password")
if not user_id or not username or password_hash is None:
continue
rows.append((user_id, username, password_hash))
if rows:
execute_values(
cur,
"""
INSERT INTO app_user (id, username, password_hash)
VALUES %s
ON CONFLICT (id) DO UPDATE
SET username = EXCLUDED.username,
password_hash = EXCLUDED.password_hash
""",
rows,
)
sessions = load_json(BACKEND_STORAGE / "sessions.json", [])
if isinstance(sessions, list) and sessions:
rows = []
for session in sessions:
if not isinstance(session, dict):
continue
session_id = session.get("id")
user_id = session.get("user_id")
created_at = parse_ts(session.get("created_at"))
last_seen_at = parse_ts(session.get("last_seen_at"))
expires_at = parse_ts(session.get("expires_at"))
if not session_id or not user_id or created_at is None or expires_at is None:
continue
rows.append((session_id, user_id, created_at, last_seen_at, expires_at))
if rows:
execute_values(
cur,
"""
INSERT INTO app_session (id, user_id, created_at, last_seen_at, expires_at)
VALUES %s
ON CONFLICT (id) DO UPDATE
SET user_id = EXCLUDED.user_id,
created_at = EXCLUDED.created_at,
last_seen_at = EXCLUDED.last_seen_at,
expires_at = EXCLUDED.expires_at
""",
rows,
)
def migrate_user_brokers(cur):
brokers = load_json(BACKEND_STORAGE / "user_brokers.json", {})
if not isinstance(brokers, dict) or not brokers:
return
rows = []
for user_id, entry in brokers.items():
if not isinstance(entry, dict):
continue
pending = entry.get("pending") if isinstance(entry.get("pending"), dict) else {}
rows.append(
(
user_id,
entry.get("broker"),
bool(entry.get("connected")) if entry.get("connected") is not None else False,
entry.get("access_token"),
parse_ts(entry.get("connected_at")),
entry.get("api_key"),
entry.get("user_name"),
entry.get("broker_user_id"),
pending.get("broker"),
pending.get("api_key"),
pending.get("api_secret"),
parse_ts(pending.get("started_at")),
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO user_broker (
user_id,
broker,
connected,
access_token,
connected_at,
api_key,
user_name,
broker_user_id,
pending_broker,
pending_api_key,
pending_api_secret,
pending_started_at
)
VALUES %s
ON CONFLICT (user_id) DO UPDATE
SET broker = EXCLUDED.broker,
connected = EXCLUDED.connected,
access_token = EXCLUDED.access_token,
connected_at = EXCLUDED.connected_at,
api_key = EXCLUDED.api_key,
user_name = EXCLUDED.user_name,
broker_user_id = EXCLUDED.broker_user_id,
pending_broker = EXCLUDED.pending_broker,
pending_api_key = EXCLUDED.pending_api_key,
pending_api_secret = EXCLUDED.pending_api_secret,
pending_started_at = EXCLUDED.pending_started_at
""",
rows,
)
def migrate_zerodha(cur):
sessions = load_json(BACKEND_STORAGE / "zerodha_sessions.json", [])
if isinstance(sessions, list):
for item in sessions:
if not isinstance(item, dict):
continue
user_id = item.get("user_id")
linked_at = parse_ts(item.get("linked_at"))
if not user_id or linked_at is None:
continue
api_key = item.get("api_key")
access_token = item.get("access_token")
request_token = item.get("request_token")
user_name = item.get("user_name")
broker_user_id = item.get("broker_user_id")
cur.execute(
"""
INSERT INTO zerodha_session (
user_id, linked_at, api_key, access_token, request_token, user_name, broker_user_id
)
SELECT %s, %s, %s, %s, %s, %s, %s
WHERE NOT EXISTS (
SELECT 1 FROM zerodha_session
WHERE user_id = %s
AND linked_at = %s
AND api_key IS NOT DISTINCT FROM %s
AND access_token IS NOT DISTINCT FROM %s
AND request_token IS NOT DISTINCT FROM %s
AND user_name IS NOT DISTINCT FROM %s
AND broker_user_id IS NOT DISTINCT FROM %s
)
""",
(
user_id,
linked_at,
api_key,
access_token,
request_token,
user_name,
broker_user_id,
user_id,
linked_at,
api_key,
access_token,
request_token,
user_name,
broker_user_id,
),
)
tokens = load_json(BACKEND_STORAGE / "zerodha_request_tokens.json", [])
if isinstance(tokens, list) and tokens:
rows = []
for item in tokens:
if not isinstance(item, dict):
continue
user_id = item.get("user_id")
request_token = item.get("request_token")
if not user_id or request_token is None:
continue
rows.append((user_id, request_token))
if rows:
execute_values(
cur,
"""
INSERT INTO zerodha_request_token (user_id, request_token)
VALUES %s
ON CONFLICT (user_id) DO UPDATE
SET request_token = EXCLUDED.request_token
""",
rows,
)
def migrate_strategy_config(cur):
cfg = load_json(BACKEND_STORAGE / "strategy_config.json", {})
if not isinstance(cfg, dict) or not cfg:
return
sip_frequency = cfg.get("sip_frequency")
sip_frequency_value = None
sip_frequency_unit = None
if isinstance(sip_frequency, dict):
sip_frequency_value = to_int(sip_frequency.get("value"))
sip_frequency_unit = sip_frequency.get("unit")
frequency = cfg.get("frequency")
frequency_text = normalize_text(frequency)
frequency_days = to_int(cfg.get("frequency_days"))
unit = cfg.get("unit")
next_run = parse_ts(cfg.get("next_run"))
cur.execute(
"""
INSERT INTO strategy_config (
id,
strategy,
sip_amount,
sip_frequency_value,
sip_frequency_unit,
mode,
broker,
active,
frequency,
frequency_days,
unit,
next_run
)
VALUES (
1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON CONFLICT (id) DO UPDATE
SET strategy = EXCLUDED.strategy,
sip_amount = EXCLUDED.sip_amount,
sip_frequency_value = EXCLUDED.sip_frequency_value,
sip_frequency_unit = EXCLUDED.sip_frequency_unit,
mode = EXCLUDED.mode,
broker = EXCLUDED.broker,
active = EXCLUDED.active,
frequency = EXCLUDED.frequency,
frequency_days = EXCLUDED.frequency_days,
unit = EXCLUDED.unit,
next_run = EXCLUDED.next_run
""",
(
cfg.get("strategy"),
to_number(cfg.get("sip_amount")),
sip_frequency_value,
sip_frequency_unit,
cfg.get("mode"),
cfg.get("broker"),
cfg.get("active"),
frequency_text,
frequency_days,
unit,
next_run,
),
)
def migrate_engine_status(cur):
status = load_json(BACKEND_STORAGE / "engine_status.json", {})
if not isinstance(status, dict) or not status:
return
cur.execute(
"""
INSERT INTO engine_status (id, status, last_updated)
VALUES (1, %s, %s)
ON CONFLICT (id) DO UPDATE
SET status = EXCLUDED.status,
last_updated = EXCLUDED.last_updated
""",
(
status.get("status"),
parse_ts(status.get("last_updated")),
),
)
def migrate_strategy_logs(cur):
logs = load_json(BACKEND_STORAGE / "strategy_logs.json", [])
if not isinstance(logs, list) or not logs:
return
rows = []
for entry in logs:
if not isinstance(entry, dict):
continue
seq = entry.get("seq")
ts = parse_ts(entry.get("ts"))
if seq is None or ts is None:
continue
rows.append(
(
int(seq),
ts,
entry.get("level"),
entry.get("category"),
entry.get("event"),
entry.get("message"),
entry.get("run_id"),
Json(entry.get("meta")) if isinstance(entry.get("meta"), dict) else None,
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO strategy_log (
seq, ts, level, category, event, message, run_id, meta
)
VALUES %s
ON CONFLICT (seq) DO NOTHING
""",
rows,
)
def migrate_engine_log(cur):
log_paths = [
BACKEND_STORAGE / "engine.log",
BACKEND_LOGS / "engine.log",
]
for path in log_paths:
if not path.exists():
continue
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
except Exception:
continue
if not isinstance(payload, dict):
continue
ts = parse_ts(payload.get("ts"))
if ts is None:
continue
event = payload.get("event")
data = payload.get("data")
message = payload.get("message")
meta = payload.get("meta")
cur.execute(
"""
INSERT INTO engine_event (ts, event, data, message, meta)
SELECT %s, %s, %s, %s, %s
WHERE NOT EXISTS (
SELECT 1 FROM engine_event
WHERE ts = %s
AND event IS NOT DISTINCT FROM %s
AND data IS NOT DISTINCT FROM %s
AND message IS NOT DISTINCT FROM %s
AND meta IS NOT DISTINCT FROM %s
)
""",
(
ts,
event,
Json(data) if isinstance(data, dict) else Json(data) if data is not None else None,
message,
Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None,
ts,
event,
Json(data) if isinstance(data, dict) else Json(data) if data is not None else None,
message,
Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None,
),
)
def migrate_engine_state(cur):
state_paper = load_json(ENGINE_STORAGE / "state_paper.json", {})
if isinstance(state_paper, dict) and state_paper:
sip_freq = state_paper.get("sip_frequency")
sip_value = None
sip_unit = None
if isinstance(sip_freq, dict):
sip_value = to_int(sip_freq.get("value"))
sip_unit = sip_freq.get("unit")
cur.execute(
"""
INSERT INTO engine_state_paper (
id,
initial_cash,
cash,
total_invested,
nifty_units,
gold_units,
last_sip_ts,
last_run,
sip_frequency_value,
sip_frequency_unit
)
VALUES (1, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET initial_cash = EXCLUDED.initial_cash,
cash = EXCLUDED.cash,
total_invested = EXCLUDED.total_invested,
nifty_units = EXCLUDED.nifty_units,
gold_units = EXCLUDED.gold_units,
last_sip_ts = EXCLUDED.last_sip_ts,
last_run = EXCLUDED.last_run,
sip_frequency_value = EXCLUDED.sip_frequency_value,
sip_frequency_unit = EXCLUDED.sip_frequency_unit
""",
(
to_number(state_paper.get("initial_cash")),
to_number(state_paper.get("cash")),
to_number(state_paper.get("total_invested")),
to_number(state_paper.get("nifty_units")),
to_number(state_paper.get("gold_units")),
parse_ts(state_paper.get("last_sip_ts")),
parse_ts(state_paper.get("last_run")),
sip_value,
sip_unit,
),
)
state = load_json(ENGINE_STORAGE / "state.json", {})
if isinstance(state, dict) and state:
cur.execute(
"""
INSERT INTO engine_state (
id,
total_invested,
nifty_units,
gold_units,
last_sip_ts,
last_run
)
VALUES (1, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET total_invested = EXCLUDED.total_invested,
nifty_units = EXCLUDED.nifty_units,
gold_units = EXCLUDED.gold_units,
last_sip_ts = EXCLUDED.last_sip_ts,
last_run = EXCLUDED.last_run
""",
(
to_number(state.get("total_invested")),
to_number(state.get("nifty_units")),
to_number(state.get("gold_units")),
parse_ts(state.get("last_sip_ts")),
parse_ts(state.get("last_run")),
),
)
def migrate_paper_broker(cur):
broker = load_json(ENGINE_STORAGE / "paper_broker.json", {})
if not isinstance(broker, dict) or not broker:
return
cash = to_number(broker.get("cash"))
if cash is not None:
cur.execute(
"""
INSERT INTO paper_broker_account (id, cash)
VALUES (1, %s)
ON CONFLICT (id) DO UPDATE
SET cash = EXCLUDED.cash
""",
(cash,),
)
now_utc = datetime.now(timezone.utc)
positions = broker.get("positions")
if isinstance(positions, dict) and positions:
rows = []
for symbol, data in positions.items():
if not isinstance(data, dict):
continue
rows.append(
(
symbol,
to_number(data.get("qty")),
to_number(data.get("avg_price")),
to_number(data.get("last_price")),
now_utc,
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO paper_position (
symbol, qty, avg_price, last_price, updated_at
)
VALUES %s
ON CONFLICT (symbol) DO UPDATE
SET qty = EXCLUDED.qty,
avg_price = EXCLUDED.avg_price,
last_price = EXCLUDED.last_price,
updated_at = EXCLUDED.updated_at
""",
rows,
)
orders = broker.get("orders")
if isinstance(orders, list) and orders:
rows = []
for order in orders:
if not isinstance(order, dict):
continue
order_id = order.get("id")
if not order_id:
continue
rows.append(
(
order_id,
order.get("symbol"),
order.get("side"),
to_number(order.get("qty")),
to_number(order.get("price")),
order.get("status"),
parse_ts(order.get("timestamp")),
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO paper_order (
id, symbol, side, qty, price, status, timestamp
)
VALUES %s
ON CONFLICT (id) DO UPDATE
SET symbol = EXCLUDED.symbol,
side = EXCLUDED.side,
qty = EXCLUDED.qty,
price = EXCLUDED.price,
status = EXCLUDED.status,
timestamp = EXCLUDED.timestamp
""",
rows,
)
trades = broker.get("trades")
if isinstance(trades, list) and trades:
rows = []
for trade in trades:
if not isinstance(trade, dict):
continue
trade_id = trade.get("id")
if not trade_id:
continue
rows.append(
(
trade_id,
trade.get("order_id"),
trade.get("symbol"),
trade.get("side"),
to_number(trade.get("qty")),
to_number(trade.get("price")),
parse_ts(trade.get("timestamp")),
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO paper_trade (
id, order_id, symbol, side, qty, price, timestamp
)
VALUES %s
ON CONFLICT (id) DO UPDATE
SET order_id = EXCLUDED.order_id,
symbol = EXCLUDED.symbol,
side = EXCLUDED.side,
qty = EXCLUDED.qty,
price = EXCLUDED.price,
timestamp = EXCLUDED.timestamp
""",
rows,
)
equity_curve = broker.get("equity_curve")
if isinstance(equity_curve, list) and equity_curve:
rows = []
for point in equity_curve:
if not isinstance(point, dict):
continue
ts = parse_ts(point.get("timestamp"))
if ts is None:
continue
rows.append(
(
ts,
to_number(point.get("equity")),
to_number(point.get("pnl")),
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO paper_equity_curve (timestamp, equity, pnl)
VALUES %s
ON CONFLICT (timestamp) DO UPDATE
SET equity = EXCLUDED.equity,
pnl = EXCLUDED.pnl
""",
rows,
)
def migrate_mtm_ledger(cur):
path = ENGINE_STORAGE / "mtm_ledger.csv"
if not path.exists():
return
df = pd.read_csv(path)
if df.empty:
return
rows = []
for _, row in df.iterrows():
ts = parse_ts(row.get("timestamp"))
if ts is None:
continue
rows.append(
(
ts,
to_number(row.get("nifty_units")),
to_number(row.get("gold_units")),
to_number(row.get("nifty_price")),
to_number(row.get("gold_price")),
to_number(row.get("nifty_value")),
to_number(row.get("gold_value")),
to_number(row.get("portfolio_value")),
to_number(row.get("total_invested")),
to_number(row.get("pnl")),
)
)
if rows:
execute_values(
cur,
"""
INSERT INTO mtm_ledger (
timestamp,
nifty_units,
gold_units,
nifty_price,
gold_price,
nifty_value,
gold_value,
portfolio_value,
total_invested,
pnl
)
VALUES %s
ON CONFLICT (timestamp) DO NOTHING
""",
rows,
)
def migrate_event_ledger(cur):
path = ENGINE_STORAGE / "ledger.csv"
if not path.exists():
return
df = pd.read_csv(path)
if df.empty:
return
for _, row in df.iterrows():
ts = parse_ts(row.get("timestamp"))
if ts is None:
continue
event = row.get("event")
cur.execute(
"""
INSERT INTO event_ledger (
timestamp, event, nifty_units, gold_units, nifty_price, gold_price, amount
)
SELECT %s, %s, %s, %s, %s, %s, %s
WHERE NOT EXISTS (
SELECT 1 FROM event_ledger
WHERE timestamp = %s
AND event IS NOT DISTINCT FROM %s
AND nifty_units IS NOT DISTINCT FROM %s
AND gold_units IS NOT DISTINCT FROM %s
AND nifty_price IS NOT DISTINCT FROM %s
AND gold_price IS NOT DISTINCT FROM %s
AND amount IS NOT DISTINCT FROM %s
)
""",
(
ts,
event,
to_number(row.get("nifty_units")),
to_number(row.get("gold_units")),
to_number(row.get("nifty_price")),
to_number(row.get("gold_price")),
to_number(row.get("amount")),
ts,
event,
to_number(row.get("nifty_units")),
to_number(row.get("gold_units")),
to_number(row.get("nifty_price")),
to_number(row.get("gold_price")),
to_number(row.get("amount")),
),
)
def migrate_market_history(cur):
history_paths = set()
for base in (ENGINE_HISTORY, ENGINE_STORAGE / "history", ROOT_STORAGE / "history"):
if base.exists():
history_paths.update(base.glob("*.csv"))
if not history_paths:
return
rows = []
for path in sorted(history_paths):
symbol = path.stem
try:
df = pd.read_csv(path)
except Exception:
continue
if df.empty or "Date" not in df.columns or "Close" not in df.columns:
continue
for _, row in df.iterrows():
dt = parse_date(row.get("Date"))
close = to_number(row.get("Close"))
if dt is None or close is None:
continue
rows.append((symbol, dt, close))
if rows:
execute_values(
cur,
"""
INSERT INTO market_close (symbol, date, close)
VALUES %s
ON CONFLICT (symbol, date) DO NOTHING
""",
rows,
)
def print_counts(cur):
tables = [
"app_user",
"app_session",
"user_broker",
"zerodha_session",
"zerodha_request_token",
"strategy_config",
"engine_status",
"strategy_log",
"engine_event",
"engine_state_paper",
"engine_state",
"paper_broker_account",
"paper_position",
"paper_order",
"paper_trade",
"paper_equity_curve",
"mtm_ledger",
"event_ledger",
"market_close",
]
for table in tables:
cur.execute(f"SELECT COUNT(*) FROM {table}")
count = cur.fetchone()[0]
print(f"{table}: {count}")
def main():
conn = psycopg2.connect(**DB_CONFIG)
try:
run_domain(conn, "users_sessions", migrate_users_sessions)
run_domain(conn, "user_brokers", migrate_user_brokers)
run_domain(conn, "zerodha", migrate_zerodha)
run_domain(conn, "strategy_config", migrate_strategy_config)
run_domain(conn, "engine_status", migrate_engine_status)
run_domain(conn, "strategy_logs", migrate_strategy_logs)
run_domain(conn, "engine_log", migrate_engine_log)
run_domain(conn, "engine_state", migrate_engine_state)
run_domain(conn, "paper_broker", migrate_paper_broker)
run_domain(conn, "mtm_ledger", migrate_mtm_ledger)
run_domain(conn, "event_ledger", migrate_event_ledger)
run_domain(conn, "market_history", migrate_market_history)
with conn:
with conn.cursor() as cur:
print_counts(cur)
finally:
conn.close()
if __name__ == "__main__":
main()