diff --git a/backend/app/services/db.py b/backend/app/services/db.py index d3821fe..709fd9f 100644 --- a/backend/app/services/db.py +++ b/backend/app/services/db.py @@ -94,8 +94,8 @@ def get_database_url(cfg: dict[str, str | int] | None = None) -> str: def _create_engine() -> Engine: cfg = _db_config() - pool_size = int(os.getenv("DB_POOL_SIZE", os.getenv("DB_POOL_MIN", "5"))) - max_overflow = int(os.getenv("DB_POOL_MAX", "10")) + pool_size = int(os.getenv("DB_POOL_SIZE", os.getenv("DB_POOL_MIN", "20"))) + max_overflow = int(os.getenv("DB_POOL_MAX", "30")) pool_timeout = int(os.getenv("DB_POOL_TIMEOUT", "30")) engine = create_engine( get_database_url(cfg), diff --git a/backend/app/services/strategy_service.py b/backend/app/services/strategy_service.py index fab9bc5..db558fb 100644 --- a/backend/app/services/strategy_service.py +++ b/backend/app/services/strategy_service.py @@ -1093,12 +1093,23 @@ def _issue_message(event: str, message: str | None, data: dict | None, meta: dic if event == "SIP_NO_FILL": if reason_key == "insufficient_funds": - return "Insufficient funds for this SIP." + cash = payload.get("cash") + required = payload.get("required") + if cash is not None and required is not None: + shortfall = float(required) - float(cash) + return ( + f"Insufficient funds — ₹{shortfall:,.0f} short for this SIP. " + "Add funds and the next SIP will execute automatically." + ) + return ( + "Insufficient funds for this SIP. " + "Add funds and the next SIP will execute automatically." + ) if reason_key == "broker_auth_expired": return "Broker session expired. Reconnect broker." if reason_key == "no_fill": - return "Order was not filled." - return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}." + return "Order was not filled. The strategy will retry at the next interval." + return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}. The strategy will retry automatically." if event == "BROKER_AUTH_EXPIRED": return "Broker session expired. Reconnect broker." @@ -1238,10 +1249,15 @@ def get_strategy_summary(user_id: str): 'ORDER_REJECTED', 'ORDER_CANCELLED' ) + AND ts > COALESCE( + (SELECT MAX(ts) FROM engine_event + WHERE user_id = %s AND run_id = %s AND event = 'SIP_TRIGGERED'), + '1900-01-01'::timestamptz + ) ORDER BY ts DESC LIMIT 1 """, - (user_id, run_id), + (user_id, run_id, user_id, run_id), ) issue_row = cur.fetchone() diff --git a/indian_paper_trading_strategy/engine/db.py b/indian_paper_trading_strategy/engine/db.py index 20e7d73..5f64cc8 100644 --- a/indian_paper_trading_strategy/engine/db.py +++ b/indian_paper_trading_strategy/engine/db.py @@ -1,63 +1,63 @@ -import os -import threading -import time -from contextlib import contextmanager -from datetime import date, datetime, timedelta, timezone -from contextvars import ContextVar -from decimal import Decimal -from uuid import UUID +import os +import threading +import time +from contextlib import contextmanager +from datetime import date, datetime, timedelta, timezone +from contextvars import ContextVar +from decimal import Decimal +from uuid import UUID import psycopg2 from psycopg2 import pool from psycopg2 import OperationalError, InterfaceError from psycopg2.extras import Json -_POOL = None -_POOL_LOCK = threading.Lock() -_DEFAULT_USER_ID = None -_DEFAULT_LOCK = threading.Lock() -NON_PROD_ENVIRONMENTS = {"development", "dev", "test", "testing", "local"} +_POOL = None +_POOL_LOCK = threading.Lock() +_DEFAULT_USER_ID = None +_DEFAULT_LOCK = threading.Lock() +NON_PROD_ENVIRONMENTS = {"development", "dev", "test", "testing", "local"} _USER_ID = ContextVar("engine_user_id", default=None) _RUN_ID = ContextVar("engine_run_id", default=None) -def _db_config(): - env_name = (os.getenv("APP_ENV") or os.getenv("ENVIRONMENT") or os.getenv("FASTAPI_ENV") or "development").strip().lower() - is_non_prod = env_name in NON_PROD_ENVIRONMENTS - url = os.getenv("DATABASE_URL") - if url: - return {"dsn": url} - - schema = os.getenv("DB_SCHEMA") or os.getenv("PGSCHEMA") or "quant_app" - password = os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD") - host = os.getenv("DB_HOST") or os.getenv("PGHOST") or ("localhost" if is_non_prod else None) - dbname = os.getenv("DB_NAME") or os.getenv("PGDATABASE") or ("trading_db" if is_non_prod else None) - user = os.getenv("DB_USER") or os.getenv("PGUSER") or ("trader" if is_non_prod else None) - if not is_non_prod and not password: - raise RuntimeError("DB_PASSWORD or PGPASSWORD must be configured in non-development environments") - if not is_non_prod and (not host or not dbname or not user): - raise RuntimeError("DB_HOST, DB_NAME, and DB_USER must be configured in non-development environments") - - return { - "host": host, - "port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"), - "dbname": dbname, - "user": user, - "password": password, - "connect_timeout": int(os.getenv("DB_CONNECT_TIMEOUT", "5")), - "options": f"-csearch_path={schema},public" if schema else None, - } +def _db_config(): + env_name = (os.getenv("APP_ENV") or os.getenv("ENVIRONMENT") or os.getenv("FASTAPI_ENV") or "development").strip().lower() + is_non_prod = env_name in NON_PROD_ENVIRONMENTS + url = os.getenv("DATABASE_URL") + if url: + return {"dsn": url} + + schema = os.getenv("DB_SCHEMA") or os.getenv("PGSCHEMA") or "quant_app" + password = os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD") + host = os.getenv("DB_HOST") or os.getenv("PGHOST") or ("localhost" if is_non_prod else None) + dbname = os.getenv("DB_NAME") or os.getenv("PGDATABASE") or ("trading_db" if is_non_prod else None) + user = os.getenv("DB_USER") or os.getenv("PGUSER") or ("trader" if is_non_prod else None) + if not is_non_prod and not password: + raise RuntimeError("DB_PASSWORD or PGPASSWORD must be configured in non-development environments") + if not is_non_prod and (not host or not dbname or not user): + raise RuntimeError("DB_HOST, DB_NAME, and DB_USER must be configured in non-development environments") + + return { + "host": host, + "port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"), + "dbname": dbname, + "user": user, + "password": password, + "connect_timeout": int(os.getenv("DB_CONNECT_TIMEOUT", "5")), + "options": f"-csearch_path={schema},public" if schema else None, + } -def _init_pool(): - config = _db_config() - config = {k: v for k, v in config.items() if v is not None} - minconn = int(os.getenv("DB_POOL_MIN", "1")) - maxconn = int(os.getenv("DB_POOL_MAX", "10")) - if "dsn" in config: - return pool.ThreadedConnectionPool(minconn, maxconn, dsn=config["dsn"]) - return pool.ThreadedConnectionPool(minconn, maxconn, **config) +def _init_pool(): + config = _db_config() + config = {k: v for k, v in config.items() if v is not None} + minconn = int(os.getenv("DB_POOL_MIN", "2")) + maxconn = int(os.getenv("DB_POOL_MAX", "50")) + if "dsn" in config: + return pool.ThreadedConnectionPool(minconn, maxconn, dsn=config["dsn"]) + return pool.ThreadedConnectionPool(minconn, maxconn, **config) def get_pool(): @@ -144,24 +144,24 @@ def db_transaction(): raise -def _utc_now(): - return datetime.utcnow().replace(tzinfo=timezone.utc) - - -def _json_safe(value): - if isinstance(value, datetime): - return value.isoformat() - if isinstance(value, date): - return value.isoformat() - if isinstance(value, Decimal): - return float(value) - if isinstance(value, UUID): - return str(value) - if isinstance(value, dict): - return {str(key): _json_safe(item) for key, item in value.items()} - if isinstance(value, (list, tuple, set)): - return [_json_safe(item) for item in value] - return value +def _utc_now(): + return datetime.utcnow().replace(tzinfo=timezone.utc) + + +def _json_safe(value): + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, date): + return value.isoformat() + if isinstance(value, Decimal): + return float(value) + if isinstance(value, UUID): + return str(value) + if isinstance(value, dict): + return {str(key): _json_safe(item) for key, item in value.items()} + if isinstance(value, (list, tuple, set)): + return [_json_safe(item) for item in value] + return value def set_context(user_id: str | None, run_id: str | None): @@ -324,7 +324,7 @@ def get_running_runs(user_id: str | None = None): return run_with_retry(_op) -def insert_engine_event( +def insert_engine_event( cur, event: str, data=None, @@ -336,165 +336,165 @@ def insert_engine_event( ): when = ts or _utc_now() scope_user, scope_run = _resolve_context(user_id, run_id) - cur.execute( - """ - INSERT INTO engine_event (user_id, run_id, ts, event, data, message, meta) - VALUES (%s, %s, %s, %s, %s, %s, %s) + cur.execute( + """ + INSERT INTO engine_event (user_id, run_id, ts, event, data, message, meta) + VALUES (%s, %s, %s, %s, %s, %s, %s) """, - ( - scope_user, - scope_run, - when, - event, - Json(_json_safe(data)) if data is not None else None, - message, - Json(_json_safe(meta)) if meta is not None else None, - ), - ) - - -def acquire_run_lease( - run_id: str, - owner_id: str, - *, - lease_seconds: int = 90, - now: datetime | None = None, -): - current_time = now or _utc_now() - expires_at = current_time + timedelta(seconds=lease_seconds) - - def _op(cur, _conn): - cur.execute( - """ - INSERT INTO run_leases (run_id, owner_id, leased_at, expires_at, heartbeat_at) - VALUES (%s, %s, %s, %s, %s) - ON CONFLICT (run_id) DO NOTHING - RETURNING run_id - """, - (run_id, owner_id, current_time, expires_at, current_time), - ) - inserted = cur.fetchone() - if inserted: - return { - "acquired": True, - "status": "ACQUIRED", - "owner_id": owner_id, - "expires_at": expires_at, - } - - cur.execute( - """ - SELECT owner_id, expires_at - FROM run_leases - WHERE run_id = %s - FOR UPDATE - """, - (run_id,), - ) - row = cur.fetchone() - if not row: - return { - "acquired": False, - "status": "DENIED", - "owner_id": None, - "expires_at": None, - } - - current_owner, current_expiry = row - if current_owner == owner_id: - cur.execute( - """ - UPDATE run_leases - SET leased_at = %s, - expires_at = %s, - heartbeat_at = %s - WHERE run_id = %s AND owner_id = %s - RETURNING run_id - """, - (current_time, expires_at, current_time, run_id, owner_id), - ) - cur.fetchone() - return { - "acquired": True, - "status": "REFRESHED", - "owner_id": owner_id, - "expires_at": expires_at, - } - - if current_expiry <= current_time: - cur.execute( - """ - UPDATE run_leases - SET owner_id = %s, - leased_at = %s, - expires_at = %s, - heartbeat_at = %s - WHERE run_id = %s AND expires_at <= %s - RETURNING run_id - """, - (owner_id, current_time, expires_at, current_time, run_id, current_time), - ) - replaced = cur.fetchone() - if replaced: - return { - "acquired": True, - "status": "REACQUIRED", - "owner_id": owner_id, - "previous_owner": current_owner, - "expires_at": expires_at, - } - - return { - "acquired": False, - "status": "DENIED", - "owner_id": current_owner, - "expires_at": current_expiry, - } - - return run_with_retry(_op) - - -def heartbeat_run_lease( - run_id: str, - owner_id: str, - *, - lease_seconds: int = 90, - now: datetime | None = None, -): - current_time = now or _utc_now() - expires_at = current_time + timedelta(seconds=lease_seconds) - - def _op(cur, _conn): - cur.execute( - """ - UPDATE run_leases - SET heartbeat_at = %s, - expires_at = %s - WHERE run_id = %s - AND owner_id = %s - AND expires_at > %s - RETURNING run_id, expires_at - """, - (current_time, expires_at, run_id, owner_id, current_time), - ) - row = cur.fetchone() - if not row: - return {"active": False, "expires_at": None} - return {"active": True, "expires_at": row[1]} - - return run_with_retry(_op) - - -def release_run_lease(run_id: str, owner_id: str): - def _op(cur, _conn): - cur.execute( - """ - DELETE FROM run_leases - WHERE run_id = %s AND owner_id = %s - RETURNING run_id - """, - (run_id, owner_id), - ) - return cur.fetchone() is not None - - return run_with_retry(_op) + ( + scope_user, + scope_run, + when, + event, + Json(_json_safe(data)) if data is not None else None, + message, + Json(_json_safe(meta)) if meta is not None else None, + ), + ) + + +def acquire_run_lease( + run_id: str, + owner_id: str, + *, + lease_seconds: int = 90, + now: datetime | None = None, +): + current_time = now or _utc_now() + expires_at = current_time + timedelta(seconds=lease_seconds) + + def _op(cur, _conn): + cur.execute( + """ + INSERT INTO run_leases (run_id, owner_id, leased_at, expires_at, heartbeat_at) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (run_id) DO NOTHING + RETURNING run_id + """, + (run_id, owner_id, current_time, expires_at, current_time), + ) + inserted = cur.fetchone() + if inserted: + return { + "acquired": True, + "status": "ACQUIRED", + "owner_id": owner_id, + "expires_at": expires_at, + } + + cur.execute( + """ + SELECT owner_id, expires_at + FROM run_leases + WHERE run_id = %s + FOR UPDATE + """, + (run_id,), + ) + row = cur.fetchone() + if not row: + return { + "acquired": False, + "status": "DENIED", + "owner_id": None, + "expires_at": None, + } + + current_owner, current_expiry = row + if current_owner == owner_id: + cur.execute( + """ + UPDATE run_leases + SET leased_at = %s, + expires_at = %s, + heartbeat_at = %s + WHERE run_id = %s AND owner_id = %s + RETURNING run_id + """, + (current_time, expires_at, current_time, run_id, owner_id), + ) + cur.fetchone() + return { + "acquired": True, + "status": "REFRESHED", + "owner_id": owner_id, + "expires_at": expires_at, + } + + if current_expiry <= current_time: + cur.execute( + """ + UPDATE run_leases + SET owner_id = %s, + leased_at = %s, + expires_at = %s, + heartbeat_at = %s + WHERE run_id = %s AND expires_at <= %s + RETURNING run_id + """, + (owner_id, current_time, expires_at, current_time, run_id, current_time), + ) + replaced = cur.fetchone() + if replaced: + return { + "acquired": True, + "status": "REACQUIRED", + "owner_id": owner_id, + "previous_owner": current_owner, + "expires_at": expires_at, + } + + return { + "acquired": False, + "status": "DENIED", + "owner_id": current_owner, + "expires_at": current_expiry, + } + + return run_with_retry(_op) + + +def heartbeat_run_lease( + run_id: str, + owner_id: str, + *, + lease_seconds: int = 90, + now: datetime | None = None, +): + current_time = now or _utc_now() + expires_at = current_time + timedelta(seconds=lease_seconds) + + def _op(cur, _conn): + cur.execute( + """ + UPDATE run_leases + SET heartbeat_at = %s, + expires_at = %s + WHERE run_id = %s + AND owner_id = %s + AND expires_at > %s + RETURNING run_id, expires_at + """, + (current_time, expires_at, run_id, owner_id, current_time), + ) + row = cur.fetchone() + if not row: + return {"active": False, "expires_at": None} + return {"active": True, "expires_at": row[1]} + + return run_with_retry(_op) + + +def release_run_lease(run_id: str, owner_id: str): + def _op(cur, _conn): + cur.execute( + """ + DELETE FROM run_leases + WHERE run_id = %s AND owner_id = %s + RETURNING run_id + """, + (run_id, owner_id), + ) + return cur.fetchone() is not None + + return run_with_retry(_op) diff --git a/indian_paper_trading_strategy/engine/execution.py b/indian_paper_trading_strategy/engine/execution.py index c41fd93..f12c289 100644 --- a/indian_paper_trading_strategy/engine/execution.py +++ b/indian_paper_trading_strategy/engine/execution.py @@ -371,6 +371,9 @@ def _record_reconciliation_event(cur, event: str, *, logical_time, payload: dict return if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}: log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time) + # Dual-write warning events to engine_event so the strategy summary can surface them + if event in {"SIP_NO_FILL", "SIP_PARTIAL"}: + insert_engine_event(cur, event, data=payload, ts=ts) return insert_engine_event(cur, event, data=payload, ts=ts) @@ -680,6 +683,14 @@ def _try_execute_sip_paper( funds = broker.get_funds(cur=cur) cash = funds.get("cash") if cash is not None and float(cash) < sip_amount_val: + if not event_exists("SIP_NO_FILL", logical_time, cur=cur): + _payload = { + "reason": "insufficient_funds", + "cash": float(cash), + "required": sip_amount_val, + } + log_event("SIP_NO_FILL", _payload, cur=cur, ts=event_ts, logical_time=logical_time) + insert_engine_event(cur, "SIP_NO_FILL", data=_payload, ts=event_ts) return state, False log_event( diff --git a/indian_paper_trading_strategy/engine/runner.py b/indian_paper_trading_strategy/engine/runner.py index 0efeee5..326bbe1 100644 --- a/indian_paper_trading_strategy/engine/runner.py +++ b/indian_paper_trading_strategy/engine/runner.py @@ -533,6 +533,7 @@ def _engine_loop(config, stop_event: threading.Event): _pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb) break except Exception as exc: + log_event("PRICE_FETCH_ERROR", {"error": str(exc)}) debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)}) if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id): exit_reason = "LEASE_LOST" @@ -556,6 +557,7 @@ def _engine_loop(config, stop_event: threading.Event): _pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb) break except Exception as exc: + log_event("HISTORY_LOAD_ERROR", {"error": str(exc)}) debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)}) if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id): exit_reason = "LEASE_LOST"