From cd99fd196280478d593cd141ef6f62c5c0b43f15 Mon Sep 17 00:00:00 2001 From: Thigazhezhilan J Date: Thu, 9 Apr 2026 23:46:13 +0530 Subject: [PATCH] Add broker reconciliation and live equity schema migrations --- db_migrations/20260409_broker_order_state.sql | 40 +++++++++++++++++++ .../20260409_live_equity_positions_value.sql | 6 +++ migrate_to_db.py | 29 ++++++++++---- schema.sql | 40 +++++++++++++++++++ 4 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 db_migrations/20260409_broker_order_state.sql create mode 100644 db_migrations/20260409_live_equity_positions_value.sql diff --git a/db_migrations/20260409_broker_order_state.sql b/db_migrations/20260409_broker_order_state.sql new file mode 100644 index 0000000..54c9fab --- /dev/null +++ b/db_migrations/20260409_broker_order_state.sql @@ -0,0 +1,40 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS broker_order_state ( + local_order_id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE, + run_id TEXT NOT NULL REFERENCES strategy_run(run_id) ON DELETE CASCADE, + logical_time TIMESTAMPTZ NOT NULL, + broker TEXT NOT NULL, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + broker_order_id TEXT, + requested_qty NUMERIC NOT NULL, + filled_qty NUMERIC NOT NULL DEFAULT 0, + accounted_fill_qty NUMERIC NOT NULL DEFAULT 0, + requested_price NUMERIC, + average_price NUMERIC, + status TEXT NOT NULL, + broker_status TEXT, + status_message TEXT, + needs_reconciliation BOOLEAN NOT NULL DEFAULT FALSE, + last_checked_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT chk_broker_order_state_status + CHECK (status IN ('PENDING','PARTIAL','FILLED','REJECTED','CANCELLED','UNKNOWN')) +); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_user_run_status + ON broker_order_state(user_id, run_id, status); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_reconcile + ON broker_order_state(user_id, run_id, needs_reconciliation, last_checked_at); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_broker_order + ON broker_order_state(broker_order_id); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_logical_time + ON broker_order_state(run_id, logical_time); + +COMMIT; diff --git a/db_migrations/20260409_live_equity_positions_value.sql b/db_migrations/20260409_live_equity_positions_value.sql new file mode 100644 index 0000000..bd84700 --- /dev/null +++ b/db_migrations/20260409_live_equity_positions_value.sql @@ -0,0 +1,6 @@ +BEGIN; + +ALTER TABLE live_equity_snapshot + ADD COLUMN IF NOT EXISTS positions_adjustment_value NUMERIC NOT NULL DEFAULT 0; + +COMMIT; diff --git a/migrate_to_db.py b/migrate_to_db.py index e63b53d..2d4b7cc 100644 --- a/migrate_to_db.py +++ b/migrate_to_db.py @@ -1,5 +1,6 @@ import json import math +import os from datetime import datetime, timezone, date from pathlib import Path @@ -8,13 +9,27 @@ import psycopg2 from psycopg2.extras import execute_values, Json -DB_CONFIG = { - "host": "localhost", - "port": 5432, - "dbname": "trading_db", - "user": "trader", - "password": "traderpass", -} +APP_ENV = (os.getenv("APP_ENV") or os.getenv("ENVIRONMENT") or "development").strip().lower() +NON_PROD_ENVS = {"development", "dev", "test", "testing", "local"} + + +def _db_config(): + is_non_prod = APP_ENV in NON_PROD_ENVS + config = { + "host": os.getenv("DB_HOST") or os.getenv("PGHOST") or ("localhost" if is_non_prod else None), + "port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"), + "dbname": os.getenv("DB_NAME") or os.getenv("PGDATABASE") or ("trading_db" if is_non_prod else None), + "user": os.getenv("DB_USER") or os.getenv("PGUSER") or ("trader" if is_non_prod else None), + "password": os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD"), + } + if not is_non_prod and (not config["host"] or not config["dbname"] or not config["user"] or not config["password"]): + raise RuntimeError("DB_HOST, DB_NAME, DB_USER, and DB_PASSWORD must be configured") + if is_non_prod and not config["password"]: + config["password"] = "traderpass" + return config + + +DB_CONFIG = _db_config() ROOT = Path(__file__).resolve().parent diff --git a/schema.sql b/schema.sql index 9964de0..9ff159e 100644 --- a/schema.sql +++ b/schema.sql @@ -391,3 +391,43 @@ CREATE INDEX IF NOT EXISTS idx_support_request_audit_endpoint_ip_created CREATE INDEX IF NOT EXISTS idx_support_request_audit_ticket_created ON support_request_audit(ticket_hash, created_at DESC); + +ALTER TABLE live_equity_snapshot + ADD COLUMN IF NOT EXISTS positions_adjustment_value NUMERIC NOT NULL DEFAULT 0; + +CREATE TABLE IF NOT EXISTS broker_order_state ( + local_order_id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE, + run_id TEXT NOT NULL REFERENCES strategy_run(run_id) ON DELETE CASCADE, + logical_time TIMESTAMPTZ NOT NULL, + broker TEXT NOT NULL, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + broker_order_id TEXT, + requested_qty NUMERIC NOT NULL, + filled_qty NUMERIC NOT NULL DEFAULT 0, + accounted_fill_qty NUMERIC NOT NULL DEFAULT 0, + requested_price NUMERIC, + average_price NUMERIC, + status TEXT NOT NULL, + broker_status TEXT, + status_message TEXT, + needs_reconciliation BOOLEAN NOT NULL DEFAULT FALSE, + last_checked_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT chk_broker_order_state_status + CHECK (status IN ('PENDING','PARTIAL','FILLED','REJECTED','CANCELLED','UNKNOWN')) +); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_user_run_status + ON broker_order_state(user_id, run_id, status); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_reconcile + ON broker_order_state(user_id, run_id, needs_reconciliation, last_checked_at); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_broker_order + ON broker_order_state(broker_order_id); + +CREATE INDEX IF NOT EXISTS idx_broker_order_state_logical_time + ON broker_order_state(run_id, logical_time);