915 lines
29 KiB
Python
915 lines
29 KiB
Python
import json
|
|
import math
|
|
from datetime import datetime, timezone, date
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values, Json
|
|
|
|
|
|
DB_CONFIG = {
|
|
"host": "localhost",
|
|
"port": 5432,
|
|
"dbname": "trading_db",
|
|
"user": "trader",
|
|
"password": "traderpass",
|
|
}
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
BACKEND_STORAGE = ROOT / "backend" / "storage"
|
|
BACKEND_LOGS = ROOT / "backend" / "logs"
|
|
ENGINE_STORAGE = ROOT / "indian_paper_trading_strategy" / "storage"
|
|
ENGINE_HISTORY = ROOT / "indian_paper_trading_strategy" / "history"
|
|
ROOT_STORAGE = ROOT / "storage"
|
|
|
|
|
|
def load_json(path, default):
|
|
if not path.exists():
|
|
return default
|
|
try:
|
|
raw = path.read_text(encoding="utf-8").strip()
|
|
if not raw:
|
|
return default
|
|
return json.loads(raw)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def parse_ts(value):
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
if value.tzinfo is None:
|
|
return value.replace(tzinfo=timezone.utc)
|
|
return value.astimezone(timezone.utc)
|
|
if isinstance(value, date) and not isinstance(value, datetime):
|
|
return datetime(value.year, value.month, value.day, tzinfo=timezone.utc)
|
|
try:
|
|
ts = pd.to_datetime(value, utc=True, errors="coerce")
|
|
except Exception:
|
|
return None
|
|
if pd.isna(ts):
|
|
return None
|
|
return ts.to_pydatetime()
|
|
|
|
|
|
def parse_date(value):
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, date) and not isinstance(value, datetime):
|
|
return value
|
|
try:
|
|
ts = pd.to_datetime(value, utc=True, errors="coerce")
|
|
except Exception:
|
|
return None
|
|
if pd.isna(ts):
|
|
return None
|
|
return ts.date()
|
|
|
|
|
|
def to_number(value):
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
|
|
return None
|
|
return float(value)
|
|
if isinstance(value, str):
|
|
text = value.strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
return float(text)
|
|
except Exception:
|
|
return None
|
|
if pd.isna(value):
|
|
return None
|
|
return value
|
|
|
|
|
|
def to_int(value):
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, bool):
|
|
return int(value)
|
|
if isinstance(value, (int, float)):
|
|
if isinstance(value, float) and math.isnan(value):
|
|
return None
|
|
return int(value)
|
|
if isinstance(value, str):
|
|
text = value.strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
return int(float(text))
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def normalize_text(value):
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, str):
|
|
return value
|
|
return json.dumps(value)
|
|
|
|
|
|
def run_domain(conn, name, func):
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
func(cur)
|
|
|
|
|
|
def migrate_users_sessions(cur):
|
|
users = load_json(BACKEND_STORAGE / "users.json", [])
|
|
if isinstance(users, list) and users:
|
|
rows = []
|
|
for user in users:
|
|
if not isinstance(user, dict):
|
|
continue
|
|
user_id = user.get("id")
|
|
username = user.get("username")
|
|
password_hash = user.get("password")
|
|
if not user_id or not username or password_hash is None:
|
|
continue
|
|
rows.append((user_id, username, password_hash))
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO app_user (id, username, password_hash)
|
|
VALUES %s
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET username = EXCLUDED.username,
|
|
password_hash = EXCLUDED.password_hash
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
sessions = load_json(BACKEND_STORAGE / "sessions.json", [])
|
|
if isinstance(sessions, list) and sessions:
|
|
rows = []
|
|
for session in sessions:
|
|
if not isinstance(session, dict):
|
|
continue
|
|
session_id = session.get("id")
|
|
user_id = session.get("user_id")
|
|
created_at = parse_ts(session.get("created_at"))
|
|
last_seen_at = parse_ts(session.get("last_seen_at"))
|
|
expires_at = parse_ts(session.get("expires_at"))
|
|
if not session_id or not user_id or created_at is None or expires_at is None:
|
|
continue
|
|
rows.append((session_id, user_id, created_at, last_seen_at, expires_at))
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO app_session (id, user_id, created_at, last_seen_at, expires_at)
|
|
VALUES %s
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET user_id = EXCLUDED.user_id,
|
|
created_at = EXCLUDED.created_at,
|
|
last_seen_at = EXCLUDED.last_seen_at,
|
|
expires_at = EXCLUDED.expires_at
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def migrate_user_brokers(cur):
|
|
brokers = load_json(BACKEND_STORAGE / "user_brokers.json", {})
|
|
if not isinstance(brokers, dict) or not brokers:
|
|
return
|
|
rows = []
|
|
for user_id, entry in brokers.items():
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
pending = entry.get("pending") if isinstance(entry.get("pending"), dict) else {}
|
|
rows.append(
|
|
(
|
|
user_id,
|
|
entry.get("broker"),
|
|
bool(entry.get("connected")) if entry.get("connected") is not None else False,
|
|
entry.get("access_token"),
|
|
parse_ts(entry.get("connected_at")),
|
|
entry.get("api_key"),
|
|
entry.get("user_name"),
|
|
entry.get("broker_user_id"),
|
|
pending.get("broker"),
|
|
pending.get("api_key"),
|
|
pending.get("api_secret"),
|
|
parse_ts(pending.get("started_at")),
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO user_broker (
|
|
user_id,
|
|
broker,
|
|
connected,
|
|
access_token,
|
|
connected_at,
|
|
api_key,
|
|
user_name,
|
|
broker_user_id,
|
|
pending_broker,
|
|
pending_api_key,
|
|
pending_api_secret,
|
|
pending_started_at
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (user_id) DO UPDATE
|
|
SET broker = EXCLUDED.broker,
|
|
connected = EXCLUDED.connected,
|
|
access_token = EXCLUDED.access_token,
|
|
connected_at = EXCLUDED.connected_at,
|
|
api_key = EXCLUDED.api_key,
|
|
user_name = EXCLUDED.user_name,
|
|
broker_user_id = EXCLUDED.broker_user_id,
|
|
pending_broker = EXCLUDED.pending_broker,
|
|
pending_api_key = EXCLUDED.pending_api_key,
|
|
pending_api_secret = EXCLUDED.pending_api_secret,
|
|
pending_started_at = EXCLUDED.pending_started_at
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def migrate_zerodha(cur):
|
|
sessions = load_json(BACKEND_STORAGE / "zerodha_sessions.json", [])
|
|
if isinstance(sessions, list):
|
|
for item in sessions:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
user_id = item.get("user_id")
|
|
linked_at = parse_ts(item.get("linked_at"))
|
|
if not user_id or linked_at is None:
|
|
continue
|
|
api_key = item.get("api_key")
|
|
access_token = item.get("access_token")
|
|
request_token = item.get("request_token")
|
|
user_name = item.get("user_name")
|
|
broker_user_id = item.get("broker_user_id")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO zerodha_session (
|
|
user_id, linked_at, api_key, access_token, request_token, user_name, broker_user_id
|
|
)
|
|
SELECT %s, %s, %s, %s, %s, %s, %s
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM zerodha_session
|
|
WHERE user_id = %s
|
|
AND linked_at = %s
|
|
AND api_key IS NOT DISTINCT FROM %s
|
|
AND access_token IS NOT DISTINCT FROM %s
|
|
AND request_token IS NOT DISTINCT FROM %s
|
|
AND user_name IS NOT DISTINCT FROM %s
|
|
AND broker_user_id IS NOT DISTINCT FROM %s
|
|
)
|
|
""",
|
|
(
|
|
user_id,
|
|
linked_at,
|
|
api_key,
|
|
access_token,
|
|
request_token,
|
|
user_name,
|
|
broker_user_id,
|
|
user_id,
|
|
linked_at,
|
|
api_key,
|
|
access_token,
|
|
request_token,
|
|
user_name,
|
|
broker_user_id,
|
|
),
|
|
)
|
|
|
|
tokens = load_json(BACKEND_STORAGE / "zerodha_request_tokens.json", [])
|
|
if isinstance(tokens, list) and tokens:
|
|
rows = []
|
|
for item in tokens:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
user_id = item.get("user_id")
|
|
request_token = item.get("request_token")
|
|
if not user_id or request_token is None:
|
|
continue
|
|
rows.append((user_id, request_token))
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO zerodha_request_token (user_id, request_token)
|
|
VALUES %s
|
|
ON CONFLICT (user_id) DO UPDATE
|
|
SET request_token = EXCLUDED.request_token
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def migrate_strategy_config(cur):
|
|
cfg = load_json(BACKEND_STORAGE / "strategy_config.json", {})
|
|
if not isinstance(cfg, dict) or not cfg:
|
|
return
|
|
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
sip_frequency_value = None
|
|
sip_frequency_unit = None
|
|
if isinstance(sip_frequency, dict):
|
|
sip_frequency_value = to_int(sip_frequency.get("value"))
|
|
sip_frequency_unit = sip_frequency.get("unit")
|
|
|
|
frequency = cfg.get("frequency")
|
|
frequency_text = normalize_text(frequency)
|
|
frequency_days = to_int(cfg.get("frequency_days"))
|
|
unit = cfg.get("unit")
|
|
next_run = parse_ts(cfg.get("next_run"))
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO strategy_config (
|
|
id,
|
|
strategy,
|
|
sip_amount,
|
|
sip_frequency_value,
|
|
sip_frequency_unit,
|
|
mode,
|
|
broker,
|
|
active,
|
|
frequency,
|
|
frequency_days,
|
|
unit,
|
|
next_run
|
|
)
|
|
VALUES (
|
|
1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
|
|
)
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET strategy = EXCLUDED.strategy,
|
|
sip_amount = EXCLUDED.sip_amount,
|
|
sip_frequency_value = EXCLUDED.sip_frequency_value,
|
|
sip_frequency_unit = EXCLUDED.sip_frequency_unit,
|
|
mode = EXCLUDED.mode,
|
|
broker = EXCLUDED.broker,
|
|
active = EXCLUDED.active,
|
|
frequency = EXCLUDED.frequency,
|
|
frequency_days = EXCLUDED.frequency_days,
|
|
unit = EXCLUDED.unit,
|
|
next_run = EXCLUDED.next_run
|
|
""",
|
|
(
|
|
cfg.get("strategy"),
|
|
to_number(cfg.get("sip_amount")),
|
|
sip_frequency_value,
|
|
sip_frequency_unit,
|
|
cfg.get("mode"),
|
|
cfg.get("broker"),
|
|
cfg.get("active"),
|
|
frequency_text,
|
|
frequency_days,
|
|
unit,
|
|
next_run,
|
|
),
|
|
)
|
|
|
|
|
|
def migrate_engine_status(cur):
|
|
status = load_json(BACKEND_STORAGE / "engine_status.json", {})
|
|
if not isinstance(status, dict) or not status:
|
|
return
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_status (id, status, last_updated)
|
|
VALUES (1, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET status = EXCLUDED.status,
|
|
last_updated = EXCLUDED.last_updated
|
|
""",
|
|
(
|
|
status.get("status"),
|
|
parse_ts(status.get("last_updated")),
|
|
),
|
|
)
|
|
|
|
|
|
def migrate_strategy_logs(cur):
|
|
logs = load_json(BACKEND_STORAGE / "strategy_logs.json", [])
|
|
if not isinstance(logs, list) or not logs:
|
|
return
|
|
rows = []
|
|
for entry in logs:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
seq = entry.get("seq")
|
|
ts = parse_ts(entry.get("ts"))
|
|
if seq is None or ts is None:
|
|
continue
|
|
rows.append(
|
|
(
|
|
int(seq),
|
|
ts,
|
|
entry.get("level"),
|
|
entry.get("category"),
|
|
entry.get("event"),
|
|
entry.get("message"),
|
|
entry.get("run_id"),
|
|
Json(entry.get("meta")) if isinstance(entry.get("meta"), dict) else None,
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO strategy_log (
|
|
seq, ts, level, category, event, message, run_id, meta
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (seq) DO NOTHING
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def migrate_engine_log(cur):
|
|
log_paths = [
|
|
BACKEND_STORAGE / "engine.log",
|
|
BACKEND_LOGS / "engine.log",
|
|
]
|
|
for path in log_paths:
|
|
if not path.exists():
|
|
continue
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
payload = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
ts = parse_ts(payload.get("ts"))
|
|
if ts is None:
|
|
continue
|
|
event = payload.get("event")
|
|
data = payload.get("data")
|
|
message = payload.get("message")
|
|
meta = payload.get("meta")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_event (ts, event, data, message, meta)
|
|
SELECT %s, %s, %s, %s, %s
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM engine_event
|
|
WHERE ts = %s
|
|
AND event IS NOT DISTINCT FROM %s
|
|
AND data IS NOT DISTINCT FROM %s
|
|
AND message IS NOT DISTINCT FROM %s
|
|
AND meta IS NOT DISTINCT FROM %s
|
|
)
|
|
""",
|
|
(
|
|
ts,
|
|
event,
|
|
Json(data) if isinstance(data, dict) else Json(data) if data is not None else None,
|
|
message,
|
|
Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None,
|
|
ts,
|
|
event,
|
|
Json(data) if isinstance(data, dict) else Json(data) if data is not None else None,
|
|
message,
|
|
Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None,
|
|
),
|
|
)
|
|
|
|
|
|
def migrate_engine_state(cur):
|
|
state_paper = load_json(ENGINE_STORAGE / "state_paper.json", {})
|
|
if isinstance(state_paper, dict) and state_paper:
|
|
sip_freq = state_paper.get("sip_frequency")
|
|
sip_value = None
|
|
sip_unit = None
|
|
if isinstance(sip_freq, dict):
|
|
sip_value = to_int(sip_freq.get("value"))
|
|
sip_unit = sip_freq.get("unit")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_state_paper (
|
|
id,
|
|
initial_cash,
|
|
cash,
|
|
total_invested,
|
|
nifty_units,
|
|
gold_units,
|
|
last_sip_ts,
|
|
last_run,
|
|
sip_frequency_value,
|
|
sip_frequency_unit
|
|
)
|
|
VALUES (1, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET initial_cash = EXCLUDED.initial_cash,
|
|
cash = EXCLUDED.cash,
|
|
total_invested = EXCLUDED.total_invested,
|
|
nifty_units = EXCLUDED.nifty_units,
|
|
gold_units = EXCLUDED.gold_units,
|
|
last_sip_ts = EXCLUDED.last_sip_ts,
|
|
last_run = EXCLUDED.last_run,
|
|
sip_frequency_value = EXCLUDED.sip_frequency_value,
|
|
sip_frequency_unit = EXCLUDED.sip_frequency_unit
|
|
""",
|
|
(
|
|
to_number(state_paper.get("initial_cash")),
|
|
to_number(state_paper.get("cash")),
|
|
to_number(state_paper.get("total_invested")),
|
|
to_number(state_paper.get("nifty_units")),
|
|
to_number(state_paper.get("gold_units")),
|
|
parse_ts(state_paper.get("last_sip_ts")),
|
|
parse_ts(state_paper.get("last_run")),
|
|
sip_value,
|
|
sip_unit,
|
|
),
|
|
)
|
|
|
|
state = load_json(ENGINE_STORAGE / "state.json", {})
|
|
if isinstance(state, dict) and state:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_state (
|
|
id,
|
|
total_invested,
|
|
nifty_units,
|
|
gold_units,
|
|
last_sip_ts,
|
|
last_run
|
|
)
|
|
VALUES (1, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET total_invested = EXCLUDED.total_invested,
|
|
nifty_units = EXCLUDED.nifty_units,
|
|
gold_units = EXCLUDED.gold_units,
|
|
last_sip_ts = EXCLUDED.last_sip_ts,
|
|
last_run = EXCLUDED.last_run
|
|
""",
|
|
(
|
|
to_number(state.get("total_invested")),
|
|
to_number(state.get("nifty_units")),
|
|
to_number(state.get("gold_units")),
|
|
parse_ts(state.get("last_sip_ts")),
|
|
parse_ts(state.get("last_run")),
|
|
),
|
|
)
|
|
|
|
|
|
def migrate_paper_broker(cur):
|
|
broker = load_json(ENGINE_STORAGE / "paper_broker.json", {})
|
|
if not isinstance(broker, dict) or not broker:
|
|
return
|
|
|
|
cash = to_number(broker.get("cash"))
|
|
if cash is not None:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO paper_broker_account (id, cash)
|
|
VALUES (1, %s)
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET cash = EXCLUDED.cash
|
|
""",
|
|
(cash,),
|
|
)
|
|
|
|
now_utc = datetime.now(timezone.utc)
|
|
positions = broker.get("positions")
|
|
if isinstance(positions, dict) and positions:
|
|
rows = []
|
|
for symbol, data in positions.items():
|
|
if not isinstance(data, dict):
|
|
continue
|
|
rows.append(
|
|
(
|
|
symbol,
|
|
to_number(data.get("qty")),
|
|
to_number(data.get("avg_price")),
|
|
to_number(data.get("last_price")),
|
|
now_utc,
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO paper_position (
|
|
symbol, qty, avg_price, last_price, updated_at
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (symbol) DO UPDATE
|
|
SET qty = EXCLUDED.qty,
|
|
avg_price = EXCLUDED.avg_price,
|
|
last_price = EXCLUDED.last_price,
|
|
updated_at = EXCLUDED.updated_at
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
orders = broker.get("orders")
|
|
if isinstance(orders, list) and orders:
|
|
rows = []
|
|
for order in orders:
|
|
if not isinstance(order, dict):
|
|
continue
|
|
order_id = order.get("id")
|
|
if not order_id:
|
|
continue
|
|
rows.append(
|
|
(
|
|
order_id,
|
|
order.get("symbol"),
|
|
order.get("side"),
|
|
to_number(order.get("qty")),
|
|
to_number(order.get("price")),
|
|
order.get("status"),
|
|
parse_ts(order.get("timestamp")),
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO paper_order (
|
|
id, symbol, side, qty, price, status, timestamp
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET symbol = EXCLUDED.symbol,
|
|
side = EXCLUDED.side,
|
|
qty = EXCLUDED.qty,
|
|
price = EXCLUDED.price,
|
|
status = EXCLUDED.status,
|
|
timestamp = EXCLUDED.timestamp
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
trades = broker.get("trades")
|
|
if isinstance(trades, list) and trades:
|
|
rows = []
|
|
for trade in trades:
|
|
if not isinstance(trade, dict):
|
|
continue
|
|
trade_id = trade.get("id")
|
|
if not trade_id:
|
|
continue
|
|
rows.append(
|
|
(
|
|
trade_id,
|
|
trade.get("order_id"),
|
|
trade.get("symbol"),
|
|
trade.get("side"),
|
|
to_number(trade.get("qty")),
|
|
to_number(trade.get("price")),
|
|
parse_ts(trade.get("timestamp")),
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO paper_trade (
|
|
id, order_id, symbol, side, qty, price, timestamp
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET order_id = EXCLUDED.order_id,
|
|
symbol = EXCLUDED.symbol,
|
|
side = EXCLUDED.side,
|
|
qty = EXCLUDED.qty,
|
|
price = EXCLUDED.price,
|
|
timestamp = EXCLUDED.timestamp
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
equity_curve = broker.get("equity_curve")
|
|
if isinstance(equity_curve, list) and equity_curve:
|
|
rows = []
|
|
for point in equity_curve:
|
|
if not isinstance(point, dict):
|
|
continue
|
|
ts = parse_ts(point.get("timestamp"))
|
|
if ts is None:
|
|
continue
|
|
rows.append(
|
|
(
|
|
ts,
|
|
to_number(point.get("equity")),
|
|
to_number(point.get("pnl")),
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO paper_equity_curve (timestamp, equity, pnl)
|
|
VALUES %s
|
|
ON CONFLICT (timestamp) DO UPDATE
|
|
SET equity = EXCLUDED.equity,
|
|
pnl = EXCLUDED.pnl
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def migrate_mtm_ledger(cur):
|
|
path = ENGINE_STORAGE / "mtm_ledger.csv"
|
|
if not path.exists():
|
|
return
|
|
df = pd.read_csv(path)
|
|
if df.empty:
|
|
return
|
|
rows = []
|
|
for _, row in df.iterrows():
|
|
ts = parse_ts(row.get("timestamp"))
|
|
if ts is None:
|
|
continue
|
|
rows.append(
|
|
(
|
|
ts,
|
|
to_number(row.get("nifty_units")),
|
|
to_number(row.get("gold_units")),
|
|
to_number(row.get("nifty_price")),
|
|
to_number(row.get("gold_price")),
|
|
to_number(row.get("nifty_value")),
|
|
to_number(row.get("gold_value")),
|
|
to_number(row.get("portfolio_value")),
|
|
to_number(row.get("total_invested")),
|
|
to_number(row.get("pnl")),
|
|
)
|
|
)
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO mtm_ledger (
|
|
timestamp,
|
|
nifty_units,
|
|
gold_units,
|
|
nifty_price,
|
|
gold_price,
|
|
nifty_value,
|
|
gold_value,
|
|
portfolio_value,
|
|
total_invested,
|
|
pnl
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (timestamp) DO NOTHING
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def migrate_event_ledger(cur):
|
|
path = ENGINE_STORAGE / "ledger.csv"
|
|
if not path.exists():
|
|
return
|
|
df = pd.read_csv(path)
|
|
if df.empty:
|
|
return
|
|
for _, row in df.iterrows():
|
|
ts = parse_ts(row.get("timestamp"))
|
|
if ts is None:
|
|
continue
|
|
event = row.get("event")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO event_ledger (
|
|
timestamp, event, nifty_units, gold_units, nifty_price, gold_price, amount
|
|
)
|
|
SELECT %s, %s, %s, %s, %s, %s, %s
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM event_ledger
|
|
WHERE timestamp = %s
|
|
AND event IS NOT DISTINCT FROM %s
|
|
AND nifty_units IS NOT DISTINCT FROM %s
|
|
AND gold_units IS NOT DISTINCT FROM %s
|
|
AND nifty_price IS NOT DISTINCT FROM %s
|
|
AND gold_price IS NOT DISTINCT FROM %s
|
|
AND amount IS NOT DISTINCT FROM %s
|
|
)
|
|
""",
|
|
(
|
|
ts,
|
|
event,
|
|
to_number(row.get("nifty_units")),
|
|
to_number(row.get("gold_units")),
|
|
to_number(row.get("nifty_price")),
|
|
to_number(row.get("gold_price")),
|
|
to_number(row.get("amount")),
|
|
ts,
|
|
event,
|
|
to_number(row.get("nifty_units")),
|
|
to_number(row.get("gold_units")),
|
|
to_number(row.get("nifty_price")),
|
|
to_number(row.get("gold_price")),
|
|
to_number(row.get("amount")),
|
|
),
|
|
)
|
|
|
|
|
|
def migrate_market_history(cur):
|
|
history_paths = set()
|
|
for base in (ENGINE_HISTORY, ENGINE_STORAGE / "history", ROOT_STORAGE / "history"):
|
|
if base.exists():
|
|
history_paths.update(base.glob("*.csv"))
|
|
if not history_paths:
|
|
return
|
|
|
|
rows = []
|
|
for path in sorted(history_paths):
|
|
symbol = path.stem
|
|
try:
|
|
df = pd.read_csv(path)
|
|
except Exception:
|
|
continue
|
|
if df.empty or "Date" not in df.columns or "Close" not in df.columns:
|
|
continue
|
|
for _, row in df.iterrows():
|
|
dt = parse_date(row.get("Date"))
|
|
close = to_number(row.get("Close"))
|
|
if dt is None or close is None:
|
|
continue
|
|
rows.append((symbol, dt, close))
|
|
|
|
if rows:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO market_close (symbol, date, close)
|
|
VALUES %s
|
|
ON CONFLICT (symbol, date) DO NOTHING
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def print_counts(cur):
|
|
tables = [
|
|
"app_user",
|
|
"app_session",
|
|
"user_broker",
|
|
"zerodha_session",
|
|
"zerodha_request_token",
|
|
"strategy_config",
|
|
"engine_status",
|
|
"strategy_log",
|
|
"engine_event",
|
|
"engine_state_paper",
|
|
"engine_state",
|
|
"paper_broker_account",
|
|
"paper_position",
|
|
"paper_order",
|
|
"paper_trade",
|
|
"paper_equity_curve",
|
|
"mtm_ledger",
|
|
"event_ledger",
|
|
"market_close",
|
|
]
|
|
for table in tables:
|
|
cur.execute(f"SELECT COUNT(*) FROM {table}")
|
|
count = cur.fetchone()[0]
|
|
print(f"{table}: {count}")
|
|
|
|
|
|
def main():
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
try:
|
|
run_domain(conn, "users_sessions", migrate_users_sessions)
|
|
run_domain(conn, "user_brokers", migrate_user_brokers)
|
|
run_domain(conn, "zerodha", migrate_zerodha)
|
|
run_domain(conn, "strategy_config", migrate_strategy_config)
|
|
run_domain(conn, "engine_status", migrate_engine_status)
|
|
run_domain(conn, "strategy_logs", migrate_strategy_logs)
|
|
run_domain(conn, "engine_log", migrate_engine_log)
|
|
run_domain(conn, "engine_state", migrate_engine_state)
|
|
run_domain(conn, "paper_broker", migrate_paper_broker)
|
|
run_domain(conn, "mtm_ledger", migrate_mtm_ledger)
|
|
run_domain(conn, "event_ledger", migrate_event_ledger)
|
|
run_domain(conn, "market_history", migrate_market_history)
|
|
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
print_counts(cur)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|