Add broker reconciliation and live equity schema migrations

This commit is contained in:
Thigazhezhilan J 2026-04-09 23:46:13 +05:30
parent fd3642827d
commit cd99fd1962
4 changed files with 108 additions and 7 deletions

View File

@ -0,0 +1,40 @@
BEGIN;
CREATE TABLE IF NOT EXISTS broker_order_state (
local_order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE,
run_id TEXT NOT NULL REFERENCES strategy_run(run_id) ON DELETE CASCADE,
logical_time TIMESTAMPTZ NOT NULL,
broker TEXT NOT NULL,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
broker_order_id TEXT,
requested_qty NUMERIC NOT NULL,
filled_qty NUMERIC NOT NULL DEFAULT 0,
accounted_fill_qty NUMERIC NOT NULL DEFAULT 0,
requested_price NUMERIC,
average_price NUMERIC,
status TEXT NOT NULL,
broker_status TEXT,
status_message TEXT,
needs_reconciliation BOOLEAN NOT NULL DEFAULT FALSE,
last_checked_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT chk_broker_order_state_status
CHECK (status IN ('PENDING','PARTIAL','FILLED','REJECTED','CANCELLED','UNKNOWN'))
);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_user_run_status
ON broker_order_state(user_id, run_id, status);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_reconcile
ON broker_order_state(user_id, run_id, needs_reconciliation, last_checked_at);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_broker_order
ON broker_order_state(broker_order_id);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_logical_time
ON broker_order_state(run_id, logical_time);
COMMIT;

View File

@ -0,0 +1,6 @@
BEGIN;
ALTER TABLE live_equity_snapshot
ADD COLUMN IF NOT EXISTS positions_adjustment_value NUMERIC NOT NULL DEFAULT 0;
COMMIT;

View File

@ -1,5 +1,6 @@
import json import json
import math import math
import os
from datetime import datetime, timezone, date from datetime import datetime, timezone, date
from pathlib import Path from pathlib import Path
@ -8,13 +9,27 @@ import psycopg2
from psycopg2.extras import execute_values, Json from psycopg2.extras import execute_values, Json
DB_CONFIG = { APP_ENV = (os.getenv("APP_ENV") or os.getenv("ENVIRONMENT") or "development").strip().lower()
"host": "localhost", NON_PROD_ENVS = {"development", "dev", "test", "testing", "local"}
"port": 5432,
"dbname": "trading_db",
"user": "trader", def _db_config():
"password": "traderpass", is_non_prod = APP_ENV in NON_PROD_ENVS
} config = {
"host": os.getenv("DB_HOST") or os.getenv("PGHOST") or ("localhost" if is_non_prod else None),
"port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"),
"dbname": os.getenv("DB_NAME") or os.getenv("PGDATABASE") or ("trading_db" if is_non_prod else None),
"user": os.getenv("DB_USER") or os.getenv("PGUSER") or ("trader" if is_non_prod else None),
"password": os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD"),
}
if not is_non_prod and (not config["host"] or not config["dbname"] or not config["user"] or not config["password"]):
raise RuntimeError("DB_HOST, DB_NAME, DB_USER, and DB_PASSWORD must be configured")
if is_non_prod and not config["password"]:
config["password"] = "traderpass"
return config
DB_CONFIG = _db_config()
ROOT = Path(__file__).resolve().parent ROOT = Path(__file__).resolve().parent

View File

@ -391,3 +391,43 @@ CREATE INDEX IF NOT EXISTS idx_support_request_audit_endpoint_ip_created
CREATE INDEX IF NOT EXISTS idx_support_request_audit_ticket_created CREATE INDEX IF NOT EXISTS idx_support_request_audit_ticket_created
ON support_request_audit(ticket_hash, created_at DESC); ON support_request_audit(ticket_hash, created_at DESC);
ALTER TABLE live_equity_snapshot
ADD COLUMN IF NOT EXISTS positions_adjustment_value NUMERIC NOT NULL DEFAULT 0;
CREATE TABLE IF NOT EXISTS broker_order_state (
local_order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE,
run_id TEXT NOT NULL REFERENCES strategy_run(run_id) ON DELETE CASCADE,
logical_time TIMESTAMPTZ NOT NULL,
broker TEXT NOT NULL,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
broker_order_id TEXT,
requested_qty NUMERIC NOT NULL,
filled_qty NUMERIC NOT NULL DEFAULT 0,
accounted_fill_qty NUMERIC NOT NULL DEFAULT 0,
requested_price NUMERIC,
average_price NUMERIC,
status TEXT NOT NULL,
broker_status TEXT,
status_message TEXT,
needs_reconciliation BOOLEAN NOT NULL DEFAULT FALSE,
last_checked_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT chk_broker_order_state_status
CHECK (status IN ('PENDING','PARTIAL','FILLED','REJECTED','CANCELLED','UNKNOWN'))
);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_user_run_status
ON broker_order_state(user_id, run_id, status);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_reconcile
ON broker_order_state(user_id, run_id, needs_reconciliation, last_checked_at);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_broker_order
ON broker_order_state(broker_order_id);
CREATE INDEX IF NOT EXISTS idx_broker_order_state_logical_time
ON broker_order_state(run_id, logical_time);