fix: surface SIP_NO_FILL warnings and prevent silent fund failures

- execution.py: dual-write SIP_NO_FILL and SIP_PARTIAL to engine_event
  so the strategy summary can surface them to users
- execution.py: emit SIP_NO_FILL event (with cash/required amounts) on
  the paper path instead of silently returning when funds are insufficient
- strategy_service.py: improve insufficient_funds message to show exact
  shortfall and reassure user that next SIP will auto-execute when funded
- strategy_service.py: clear SIP_NO_FILL warning after a successful
  SIP_TRIGGERED so it does not persist after funds are added
- runner.py: always write PRICE_FETCH_ERROR and HISTORY_LOAD_ERROR to
  engine_event regardless of ENGINE_DEBUG flag
- db.py (backend + engine): raise default pool sizes to 20/50 max
  connections to handle 100 concurrent users without pool exhaustion

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thigazhezhilan J 2026-06-04 10:04:53 +05:30
parent 98ef7701d1
commit 298d245048
5 changed files with 262 additions and 233 deletions

View File

@ -94,8 +94,8 @@ def get_database_url(cfg: dict[str, str | int] | None = None) -> str:
def _create_engine() -> Engine: def _create_engine() -> Engine:
cfg = _db_config() cfg = _db_config()
pool_size = int(os.getenv("DB_POOL_SIZE", os.getenv("DB_POOL_MIN", "5"))) pool_size = int(os.getenv("DB_POOL_SIZE", os.getenv("DB_POOL_MIN", "20")))
max_overflow = int(os.getenv("DB_POOL_MAX", "10")) max_overflow = int(os.getenv("DB_POOL_MAX", "30"))
pool_timeout = int(os.getenv("DB_POOL_TIMEOUT", "30")) pool_timeout = int(os.getenv("DB_POOL_TIMEOUT", "30"))
engine = create_engine( engine = create_engine(
get_database_url(cfg), get_database_url(cfg),

View File

@ -1093,12 +1093,23 @@ def _issue_message(event: str, message: str | None, data: dict | None, meta: dic
if event == "SIP_NO_FILL": if event == "SIP_NO_FILL":
if reason_key == "insufficient_funds": if reason_key == "insufficient_funds":
return "Insufficient funds for this SIP." cash = payload.get("cash")
required = payload.get("required")
if cash is not None and required is not None:
shortfall = float(required) - float(cash)
return (
f"Insufficient funds — ₹{shortfall:,.0f} short for this SIP. "
"Add funds and the next SIP will execute automatically."
)
return (
"Insufficient funds for this SIP. "
"Add funds and the next SIP will execute automatically."
)
if reason_key == "broker_auth_expired": if reason_key == "broker_auth_expired":
return "Broker session expired. Reconnect broker." return "Broker session expired. Reconnect broker."
if reason_key == "no_fill": if reason_key == "no_fill":
return "Order was not filled." return "Order was not filled. The strategy will retry at the next interval."
return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}." return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}. The strategy will retry automatically."
if event == "BROKER_AUTH_EXPIRED": if event == "BROKER_AUTH_EXPIRED":
return "Broker session expired. Reconnect broker." return "Broker session expired. Reconnect broker."
@ -1238,10 +1249,15 @@ def get_strategy_summary(user_id: str):
'ORDER_REJECTED', 'ORDER_REJECTED',
'ORDER_CANCELLED' 'ORDER_CANCELLED'
) )
AND ts > COALESCE(
(SELECT MAX(ts) FROM engine_event
WHERE user_id = %s AND run_id = %s AND event = 'SIP_TRIGGERED'),
'1900-01-01'::timestamptz
)
ORDER BY ts DESC ORDER BY ts DESC
LIMIT 1 LIMIT 1
""", """,
(user_id, run_id), (user_id, run_id, user_id, run_id),
) )
issue_row = cur.fetchone() issue_row = cur.fetchone()

View File

@ -1,63 +1,63 @@
import os import os
import threading import threading
import time import time
from contextlib import contextmanager from contextlib import contextmanager
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from contextvars import ContextVar from contextvars import ContextVar
from decimal import Decimal from decimal import Decimal
from uuid import UUID from uuid import UUID
import psycopg2 import psycopg2
from psycopg2 import pool from psycopg2 import pool
from psycopg2 import OperationalError, InterfaceError from psycopg2 import OperationalError, InterfaceError
from psycopg2.extras import Json from psycopg2.extras import Json
_POOL = None _POOL = None
_POOL_LOCK = threading.Lock() _POOL_LOCK = threading.Lock()
_DEFAULT_USER_ID = None _DEFAULT_USER_ID = None
_DEFAULT_LOCK = threading.Lock() _DEFAULT_LOCK = threading.Lock()
NON_PROD_ENVIRONMENTS = {"development", "dev", "test", "testing", "local"} NON_PROD_ENVIRONMENTS = {"development", "dev", "test", "testing", "local"}
_USER_ID = ContextVar("engine_user_id", default=None) _USER_ID = ContextVar("engine_user_id", default=None)
_RUN_ID = ContextVar("engine_run_id", default=None) _RUN_ID = ContextVar("engine_run_id", default=None)
def _db_config(): def _db_config():
env_name = (os.getenv("APP_ENV") or os.getenv("ENVIRONMENT") or os.getenv("FASTAPI_ENV") or "development").strip().lower() env_name = (os.getenv("APP_ENV") or os.getenv("ENVIRONMENT") or os.getenv("FASTAPI_ENV") or "development").strip().lower()
is_non_prod = env_name in NON_PROD_ENVIRONMENTS is_non_prod = env_name in NON_PROD_ENVIRONMENTS
url = os.getenv("DATABASE_URL") url = os.getenv("DATABASE_URL")
if url: if url:
return {"dsn": url} return {"dsn": url}
schema = os.getenv("DB_SCHEMA") or os.getenv("PGSCHEMA") or "quant_app" schema = os.getenv("DB_SCHEMA") or os.getenv("PGSCHEMA") or "quant_app"
password = os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD") password = os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD")
host = os.getenv("DB_HOST") or os.getenv("PGHOST") or ("localhost" if is_non_prod else None) host = os.getenv("DB_HOST") or os.getenv("PGHOST") or ("localhost" if is_non_prod else None)
dbname = os.getenv("DB_NAME") or os.getenv("PGDATABASE") or ("trading_db" if is_non_prod else None) dbname = os.getenv("DB_NAME") or os.getenv("PGDATABASE") or ("trading_db" if is_non_prod else None)
user = os.getenv("DB_USER") or os.getenv("PGUSER") or ("trader" if is_non_prod else None) user = os.getenv("DB_USER") or os.getenv("PGUSER") or ("trader" if is_non_prod else None)
if not is_non_prod and not password: if not is_non_prod and not password:
raise RuntimeError("DB_PASSWORD or PGPASSWORD must be configured in non-development environments") raise RuntimeError("DB_PASSWORD or PGPASSWORD must be configured in non-development environments")
if not is_non_prod and (not host or not dbname or not user): if not is_non_prod and (not host or not dbname or not user):
raise RuntimeError("DB_HOST, DB_NAME, and DB_USER must be configured in non-development environments") raise RuntimeError("DB_HOST, DB_NAME, and DB_USER must be configured in non-development environments")
return { return {
"host": host, "host": host,
"port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"), "port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"),
"dbname": dbname, "dbname": dbname,
"user": user, "user": user,
"password": password, "password": password,
"connect_timeout": int(os.getenv("DB_CONNECT_TIMEOUT", "5")), "connect_timeout": int(os.getenv("DB_CONNECT_TIMEOUT", "5")),
"options": f"-csearch_path={schema},public" if schema else None, "options": f"-csearch_path={schema},public" if schema else None,
} }
def _init_pool(): def _init_pool():
config = _db_config() config = _db_config()
config = {k: v for k, v in config.items() if v is not None} config = {k: v for k, v in config.items() if v is not None}
minconn = int(os.getenv("DB_POOL_MIN", "1")) minconn = int(os.getenv("DB_POOL_MIN", "2"))
maxconn = int(os.getenv("DB_POOL_MAX", "10")) maxconn = int(os.getenv("DB_POOL_MAX", "50"))
if "dsn" in config: if "dsn" in config:
return pool.ThreadedConnectionPool(minconn, maxconn, dsn=config["dsn"]) return pool.ThreadedConnectionPool(minconn, maxconn, dsn=config["dsn"])
return pool.ThreadedConnectionPool(minconn, maxconn, **config) return pool.ThreadedConnectionPool(minconn, maxconn, **config)
def get_pool(): def get_pool():
@ -144,24 +144,24 @@ def db_transaction():
raise raise
def _utc_now(): def _utc_now():
return datetime.utcnow().replace(tzinfo=timezone.utc) return datetime.utcnow().replace(tzinfo=timezone.utc)
def _json_safe(value): def _json_safe(value):
if isinstance(value, datetime): if isinstance(value, datetime):
return value.isoformat() return value.isoformat()
if isinstance(value, date): if isinstance(value, date):
return value.isoformat() return value.isoformat()
if isinstance(value, Decimal): if isinstance(value, Decimal):
return float(value) return float(value)
if isinstance(value, UUID): if isinstance(value, UUID):
return str(value) return str(value)
if isinstance(value, dict): if isinstance(value, dict):
return {str(key): _json_safe(item) for key, item in value.items()} return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple, set)): if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value] return [_json_safe(item) for item in value]
return value return value
def set_context(user_id: str | None, run_id: str | None): def set_context(user_id: str | None, run_id: str | None):
@ -324,7 +324,7 @@ def get_running_runs(user_id: str | None = None):
return run_with_retry(_op) return run_with_retry(_op)
def insert_engine_event( def insert_engine_event(
cur, cur,
event: str, event: str,
data=None, data=None,
@ -336,165 +336,165 @@ def insert_engine_event(
): ):
when = ts or _utc_now() when = ts or _utc_now()
scope_user, scope_run = _resolve_context(user_id, run_id) scope_user, scope_run = _resolve_context(user_id, run_id)
cur.execute( cur.execute(
""" """
INSERT INTO engine_event (user_id, run_id, ts, event, data, message, meta) INSERT INTO engine_event (user_id, run_id, ts, event, data, message, meta)
VALUES (%s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s)
""", """,
( (
scope_user, scope_user,
scope_run, scope_run,
when, when,
event, event,
Json(_json_safe(data)) if data is not None else None, Json(_json_safe(data)) if data is not None else None,
message, message,
Json(_json_safe(meta)) if meta is not None else None, Json(_json_safe(meta)) if meta is not None else None,
), ),
) )
def acquire_run_lease( def acquire_run_lease(
run_id: str, run_id: str,
owner_id: str, owner_id: str,
*, *,
lease_seconds: int = 90, lease_seconds: int = 90,
now: datetime | None = None, now: datetime | None = None,
): ):
current_time = now or _utc_now() current_time = now or _utc_now()
expires_at = current_time + timedelta(seconds=lease_seconds) expires_at = current_time + timedelta(seconds=lease_seconds)
def _op(cur, _conn): def _op(cur, _conn):
cur.execute( cur.execute(
""" """
INSERT INTO run_leases (run_id, owner_id, leased_at, expires_at, heartbeat_at) INSERT INTO run_leases (run_id, owner_id, leased_at, expires_at, heartbeat_at)
VALUES (%s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (run_id) DO NOTHING ON CONFLICT (run_id) DO NOTHING
RETURNING run_id RETURNING run_id
""", """,
(run_id, owner_id, current_time, expires_at, current_time), (run_id, owner_id, current_time, expires_at, current_time),
) )
inserted = cur.fetchone() inserted = cur.fetchone()
if inserted: if inserted:
return { return {
"acquired": True, "acquired": True,
"status": "ACQUIRED", "status": "ACQUIRED",
"owner_id": owner_id, "owner_id": owner_id,
"expires_at": expires_at, "expires_at": expires_at,
} }
cur.execute( cur.execute(
""" """
SELECT owner_id, expires_at SELECT owner_id, expires_at
FROM run_leases FROM run_leases
WHERE run_id = %s WHERE run_id = %s
FOR UPDATE FOR UPDATE
""", """,
(run_id,), (run_id,),
) )
row = cur.fetchone() row = cur.fetchone()
if not row: if not row:
return { return {
"acquired": False, "acquired": False,
"status": "DENIED", "status": "DENIED",
"owner_id": None, "owner_id": None,
"expires_at": None, "expires_at": None,
} }
current_owner, current_expiry = row current_owner, current_expiry = row
if current_owner == owner_id: if current_owner == owner_id:
cur.execute( cur.execute(
""" """
UPDATE run_leases UPDATE run_leases
SET leased_at = %s, SET leased_at = %s,
expires_at = %s, expires_at = %s,
heartbeat_at = %s heartbeat_at = %s
WHERE run_id = %s AND owner_id = %s WHERE run_id = %s AND owner_id = %s
RETURNING run_id RETURNING run_id
""", """,
(current_time, expires_at, current_time, run_id, owner_id), (current_time, expires_at, current_time, run_id, owner_id),
) )
cur.fetchone() cur.fetchone()
return { return {
"acquired": True, "acquired": True,
"status": "REFRESHED", "status": "REFRESHED",
"owner_id": owner_id, "owner_id": owner_id,
"expires_at": expires_at, "expires_at": expires_at,
} }
if current_expiry <= current_time: if current_expiry <= current_time:
cur.execute( cur.execute(
""" """
UPDATE run_leases UPDATE run_leases
SET owner_id = %s, SET owner_id = %s,
leased_at = %s, leased_at = %s,
expires_at = %s, expires_at = %s,
heartbeat_at = %s heartbeat_at = %s
WHERE run_id = %s AND expires_at <= %s WHERE run_id = %s AND expires_at <= %s
RETURNING run_id RETURNING run_id
""", """,
(owner_id, current_time, expires_at, current_time, run_id, current_time), (owner_id, current_time, expires_at, current_time, run_id, current_time),
) )
replaced = cur.fetchone() replaced = cur.fetchone()
if replaced: if replaced:
return { return {
"acquired": True, "acquired": True,
"status": "REACQUIRED", "status": "REACQUIRED",
"owner_id": owner_id, "owner_id": owner_id,
"previous_owner": current_owner, "previous_owner": current_owner,
"expires_at": expires_at, "expires_at": expires_at,
} }
return { return {
"acquired": False, "acquired": False,
"status": "DENIED", "status": "DENIED",
"owner_id": current_owner, "owner_id": current_owner,
"expires_at": current_expiry, "expires_at": current_expiry,
} }
return run_with_retry(_op) return run_with_retry(_op)
def heartbeat_run_lease( def heartbeat_run_lease(
run_id: str, run_id: str,
owner_id: str, owner_id: str,
*, *,
lease_seconds: int = 90, lease_seconds: int = 90,
now: datetime | None = None, now: datetime | None = None,
): ):
current_time = now or _utc_now() current_time = now or _utc_now()
expires_at = current_time + timedelta(seconds=lease_seconds) expires_at = current_time + timedelta(seconds=lease_seconds)
def _op(cur, _conn): def _op(cur, _conn):
cur.execute( cur.execute(
""" """
UPDATE run_leases UPDATE run_leases
SET heartbeat_at = %s, SET heartbeat_at = %s,
expires_at = %s expires_at = %s
WHERE run_id = %s WHERE run_id = %s
AND owner_id = %s AND owner_id = %s
AND expires_at > %s AND expires_at > %s
RETURNING run_id, expires_at RETURNING run_id, expires_at
""", """,
(current_time, expires_at, run_id, owner_id, current_time), (current_time, expires_at, run_id, owner_id, current_time),
) )
row = cur.fetchone() row = cur.fetchone()
if not row: if not row:
return {"active": False, "expires_at": None} return {"active": False, "expires_at": None}
return {"active": True, "expires_at": row[1]} return {"active": True, "expires_at": row[1]}
return run_with_retry(_op) return run_with_retry(_op)
def release_run_lease(run_id: str, owner_id: str): def release_run_lease(run_id: str, owner_id: str):
def _op(cur, _conn): def _op(cur, _conn):
cur.execute( cur.execute(
""" """
DELETE FROM run_leases DELETE FROM run_leases
WHERE run_id = %s AND owner_id = %s WHERE run_id = %s AND owner_id = %s
RETURNING run_id RETURNING run_id
""", """,
(run_id, owner_id), (run_id, owner_id),
) )
return cur.fetchone() is not None return cur.fetchone() is not None
return run_with_retry(_op) return run_with_retry(_op)

View File

@ -371,6 +371,9 @@ def _record_reconciliation_event(cur, event: str, *, logical_time, payload: dict
return return
if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}: if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}:
log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time) log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time)
# Dual-write warning events to engine_event so the strategy summary can surface them
if event in {"SIP_NO_FILL", "SIP_PARTIAL"}:
insert_engine_event(cur, event, data=payload, ts=ts)
return return
insert_engine_event(cur, event, data=payload, ts=ts) insert_engine_event(cur, event, data=payload, ts=ts)
@ -680,6 +683,14 @@ def _try_execute_sip_paper(
funds = broker.get_funds(cur=cur) funds = broker.get_funds(cur=cur)
cash = funds.get("cash") cash = funds.get("cash")
if cash is not None and float(cash) < sip_amount_val: if cash is not None and float(cash) < sip_amount_val:
if not event_exists("SIP_NO_FILL", logical_time, cur=cur):
_payload = {
"reason": "insufficient_funds",
"cash": float(cash),
"required": sip_amount_val,
}
log_event("SIP_NO_FILL", _payload, cur=cur, ts=event_ts, logical_time=logical_time)
insert_engine_event(cur, "SIP_NO_FILL", data=_payload, ts=event_ts)
return state, False return state, False
log_event( log_event(

View File

@ -533,6 +533,7 @@ def _engine_loop(config, stop_event: threading.Event):
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb) _pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
break break
except Exception as exc: except Exception as exc:
log_event("PRICE_FETCH_ERROR", {"error": str(exc)})
debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)}) debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)})
if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id): if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id):
exit_reason = "LEASE_LOST" exit_reason = "LEASE_LOST"
@ -556,6 +557,7 @@ def _engine_loop(config, stop_event: threading.Event):
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb) _pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
break break
except Exception as exc: except Exception as exc:
log_event("HISTORY_LOAD_ERROR", {"error": str(exc)})
debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)}) debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)})
if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id): if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id):
exit_reason = "LEASE_LOST" exit_reason = "LEASE_LOST"