diff --git a/backend/app/admin_role_service.py b/backend/app/admin_role_service.py index c1e1995..4b67f4e 100644 --- a/backend/app/admin_role_service.py +++ b/backend/app/admin_role_service.py @@ -68,9 +68,13 @@ def set_user_role(actor_id: str, target_id: str, new_role: str): def bootstrap_super_admin(): + enabled = (os.getenv("ENABLE_SUPER_ADMIN_BOOTSTRAP") or "").strip().lower() in {"1", "true", "yes"} + if not enabled: + return + email = (os.getenv("SUPER_ADMIN_EMAIL") or "").strip() if not email: - return + raise RuntimeError("SUPER_ADMIN_EMAIL must be configured when bootstrap is enabled") existing = get_user_by_username(email) if existing: diff --git a/backend/app/broker_store.py b/backend/app/broker_store.py index ca95ec9..7e55378 100644 --- a/backend/app/broker_store.py +++ b/backend/app/broker_store.py @@ -136,6 +136,27 @@ def clear_user_broker(user_id: str): cur.execute("DELETE FROM user_broker WHERE user_id = %s", (user_id,)) +def disconnect_user_broker(user_id: str): + with db_transaction() as cur: + cur.execute( + """ + INSERT INTO user_broker (user_id, connected, auth_state, access_token, connected_at) + VALUES (%s, FALSE, 'DISCONNECTED', NULL, NULL) + ON CONFLICT (user_id) + DO UPDATE SET + connected = FALSE, + access_token = NULL, + connected_at = NULL, + auth_state = 'DISCONNECTED', + pending_broker = NULL, + pending_api_key = NULL, + pending_api_secret = NULL, + pending_started_at = NULL + """, + (user_id,), + ) + + def expire_user_broker_session(user_id: str): with db_transaction() as cur: cur.execute( diff --git a/backend/app/db_models.py b/backend/app/db_models.py index f40cc25..d5eea37 100644 --- a/backend/app/db_models.py +++ b/backend/app/db_models.py @@ -430,6 +430,7 @@ class LiveEquitySnapshot(Base): captured_at = Column(DateTime(timezone=True), nullable=False) cash_value = Column(Numeric, nullable=False) holdings_value = Column(Numeric, nullable=False) + positions_adjustment_value = Column(Numeric, nullable=False, server_default=text("0")) total_value = Column(Numeric, nullable=False) __table_args__ = ( @@ -443,6 +444,48 @@ class LiveEquitySnapshot(Base): ) +class BrokerOrderState(Base): + __tablename__ = "broker_order_state" + + local_order_id = Column(String, primary_key=True) + user_id = Column(String, ForeignKey("app_user.id", ondelete="CASCADE"), nullable=False) + run_id = Column(String, ForeignKey("strategy_run.run_id", ondelete="CASCADE"), nullable=False) + logical_time = Column(DateTime(timezone=True), nullable=False) + broker = Column(Text, nullable=False) + symbol = Column(Text, nullable=False) + side = Column(Text, nullable=False) + broker_order_id = Column(Text) + requested_qty = Column(Numeric, nullable=False) + filled_qty = Column(Numeric, nullable=False, server_default=text("0")) + accounted_fill_qty = Column(Numeric, nullable=False, server_default=text("0")) + requested_price = Column(Numeric) + average_price = Column(Numeric) + status = Column(Text, nullable=False) + broker_status = Column(Text) + status_message = Column(Text) + needs_reconciliation = Column(Boolean, nullable=False, server_default=text("false")) + last_checked_at = Column(DateTime(timezone=True)) + created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) + updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) + + __table_args__ = ( + CheckConstraint( + "status IN ('PENDING','PARTIAL','FILLED','REJECTED','CANCELLED','UNKNOWN')", + name="chk_broker_order_state_status", + ), + Index("idx_broker_order_state_user_run_status", "user_id", "run_id", "status"), + Index( + "idx_broker_order_state_reconcile", + "user_id", + "run_id", + "needs_reconciliation", + "last_checked_at", + ), + Index("idx_broker_order_state_broker_order", "broker_order_id"), + Index("idx_broker_order_state_logical_time", "run_id", "logical_time"), + ) + + class SupportTicket(Base): __tablename__ = "support_ticket" diff --git a/backend/app/main.py b/backend/app/main.py index c9c4bec..45105bb 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,4 +1,5 @@ import os +from contextlib import asynccontextmanager from urllib.parse import urlparse from fastapi import FastAPI @@ -85,9 +86,64 @@ def _build_cors_origins() -> list[str]: return deduped +def _validate_runtime_secrets(): + env_name = _environment_name() + if env_name not in PRODUCTION_ENV_NAMES: + return + broker_token_key = (os.getenv("BROKER_TOKEN_KEY") or "").strip() + if not broker_token_key: + raise RuntimeError("BROKER_TOKEN_KEY must be configured in production") + if (os.getenv("ENABLE_SUPER_ADMIN_BOOTSTRAP") or "").strip() in {"1", "true", "yes"}: + if not (os.getenv("SUPER_ADMIN_EMAIL") or "").strip(): + raise RuntimeError("SUPER_ADMIN_EMAIL must be configured when bootstrap is enabled") + if not (os.getenv("SUPER_ADMIN_PASSWORD") or "").strip(): + raise RuntimeError("SUPER_ADMIN_PASSWORD must be configured when bootstrap is enabled") + + +def _initialize_app_state(app: FastAPI): + app.state.startup_complete = False + app.state.startup_error = None + app.state.startup_started = False + app.state.background_warnings = {} + + +def _run_startup_tasks(app: FastAPI): + if os.getenv("DISABLE_STARTUP_TASKS", "0") == "1": + app.state.startup_started = True + app.state.startup_complete = True + app.state.startup_error = None + return + + app.state.startup_started = True + app.state.startup_complete = False + app.state.startup_error = None + try: + init_log_state() + bootstrap_super_admin() + resume_running_runs() + app.state.startup_complete = True + except Exception as exc: + app.state.startup_error = str(exc) + print(f"[STARTUP] critical startup task failed: {exc}", flush=True) + + try: + start_live_equity_snapshot_daemon() + except Exception as exc: + app.state.background_warnings["live_equity_snapshot_daemon"] = str(exc) + print(f"[STARTUP] live equity snapshot daemon failed to start: {exc}", flush=True) + + +@asynccontextmanager +async def _lifespan(app: FastAPI): + _initialize_app_state(app) + _run_startup_tasks(app) + yield + + def create_app() -> FastAPI: _validate_db_config() - app = FastAPI(title="QuantFortune Backend", version="1.0") + _validate_runtime_secrets() + app = FastAPI(title="QuantFortune Backend", version="1.0", lifespan=_lifespan) cors_origins = _build_cors_origins() app.add_middleware( CORSMiddleware, @@ -111,15 +167,6 @@ def create_app() -> FastAPI: app.include_router(support_ticket_router) app.include_router(password_reset_router) - @app.on_event("startup") - def init_app_state(): - if os.getenv("DISABLE_STARTUP_TASKS", "0") == "1": - return - init_log_state() - bootstrap_super_admin() - resume_running_runs() - start_live_equity_snapshot_daemon() - return app diff --git a/backend/app/routers/broker.py b/backend/app/routers/broker.py index 9b63fdb..2d2a2ce 100644 --- a/backend/app/routers/broker.py +++ b/backend/app/routers/broker.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, HTTPException, Query, Request from fastapi.responses import RedirectResponse from app.broker_store import ( - clear_user_broker, + disconnect_user_broker, expire_user_broker_session, get_broker_credentials, get_pending_broker, @@ -34,6 +34,7 @@ from app.services.groww_service import ( ) from app.services.groww_storage import get_session as get_groww_session from app.services.live_equity_service import capture_live_equity_snapshot, get_live_equity_curve +from app.services.strategy_service import block_live_strategy_for_broker_disconnect from app.services.zerodha_service import ( KiteApiError, KiteTokenError, @@ -405,15 +406,19 @@ async def broker_status(request: Request): @router.post("/disconnect") async def disconnect_broker(request: Request): user = _require_user(request) - clear_user_broker(user["id"]) + strategy_update = block_live_strategy_for_broker_disconnect(user["id"], reason="broker_disconnected") + disconnect_user_broker(user["id"]) clear_zerodha_session(user["id"]) - set_broker_auth_state(user["id"], "DISCONNECTED") try: body = "Your broker connection has been disconnected from Quantfortune." send_email_async(user["username"], "Broker disconnected", body) except Exception: pass - return {"connected": False} + return { + "connected": False, + "brokerState": "DISCONNECTED", + "strategy": strategy_update, + } @router.post("/zerodha/login") @@ -749,6 +754,10 @@ async def broker_equity_curve(request: Request, from_: str = Query("", alias="fr normalize_zerodha_holding(item) for item in fetch_zerodha_holdings(session["api_key"], session["access_token"]) ] + positions = [ + normalize_zerodha_position(item) + for item in fetch_zerodha_positions(session["api_key"], session["access_token"]) + ] raw_funds = fetch_zerodha_funds(session["api_key"], session["access_token"]) funds_data = {**(raw_funds.get("equity", {}) or {}), "raw": raw_funds} except KiteApiError as exc: @@ -759,6 +768,7 @@ async def broker_equity_curve(request: Request, from_: str = Query("", alias="fr raise HTTPException(status_code=400, detail="Groww is not connected") try: holdings = _fetch_normalized_groww_holdings(session["access_token"]) + positions = _fetch_normalized_groww_positions(session["access_token"]) funds_data = _normalize_groww_funds(fetch_groww_funds(session["access_token"])) except GrowwApiError as exc: _raise_groww_error(user["id"], exc) @@ -768,6 +778,7 @@ async def broker_equity_curve(request: Request, from_: str = Query("", alias="fr capture_live_equity_snapshot( user["id"], holdings=holdings, + positions=positions, funds_data=funds_data, ) diff --git a/backend/app/routers/health.py b/backend/app/routers/health.py index 9ec315b..9bfeb7c 100644 --- a/backend/app/routers/health.py +++ b/backend/app/routers/health.py @@ -1,12 +1,44 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Request from app.services.db import health_check router = APIRouter() +def _startup_payload(request: Request) -> dict: + return { + "started": bool(getattr(request.app.state, "startup_started", False)), + "complete": bool(getattr(request.app.state, "startup_complete", False)), + "error": getattr(request.app.state, "startup_error", None), + "warnings": dict(getattr(request.app.state, "background_warnings", {}) or {}), + } + + +def _readiness_payload(request: Request) -> tuple[bool, dict]: + db_ok = health_check() + startup = _startup_payload(request) + ready = db_ok and startup["complete"] and not startup["error"] + payload = { + "status": "ok" if ready else "not_ready", + "db": "ok" if db_ok else "unavailable", + "startup": startup, + } + return ready, payload + + +@router.get("/health/live") +def liveness(): + return {"status": "ok"} + + +@router.get("/health/ready") +def readiness(request: Request): + ready, payload = _readiness_payload(request) + if not ready: + raise HTTPException(status_code=503, detail=payload) + return payload + + @router.get("/health") -def health(): - if not health_check(): - raise HTTPException(status_code=503, detail="db_unavailable") - return {"status": "ok", "db": "ok"} +def health(request: Request): + return readiness(request) diff --git a/backend/app/services/live_equity_service.py b/backend/app/services/live_equity_service.py index 806204b..d346c4c 100644 --- a/backend/app/services/live_equity_service.py +++ b/backend/app/services/live_equity_service.py @@ -11,16 +11,20 @@ from app.services.groww_service import ( GrowwApiError, fetch_funds as fetch_groww_funds, fetch_holdings as fetch_groww_holdings, + fetch_positions as fetch_groww_positions, normalize_holding as normalize_groww_holding, + normalize_position as normalize_groww_position, ) from app.services.groww_storage import get_session as get_groww_session from app.services.zerodha_service import ( KiteApiError, fetch_funds as fetch_zerodha_funds, fetch_holdings as fetch_zerodha_holdings, + fetch_positions as fetch_zerodha_positions, holding_effective_quantity, holding_last_price, normalize_holding as normalize_zerodha_holding, + normalize_position as normalize_zerodha_position, ) from app.services.zerodha_storage import get_session as get_zerodha_session @@ -81,6 +85,63 @@ def _extract_holdings_value(holdings: list[dict] | None) -> float: return total +def _position_signed_quantity(item: dict | None) -> float: + entry = item or {} + return _first_numeric( + entry.get("net_quantity"), + entry.get("signed_quantity"), + default=0.0, + ) + + +def _symbol_key(item: dict | None) -> tuple[str, str]: + entry = item or {} + symbol = str( + entry.get("symbol") + or entry.get("tradingsymbol") + or entry.get("trading_symbol") + or "" + ).strip().upper() + exchange = str(entry.get("exchange") or "NSE").strip().upper() + return symbol, exchange + + +def _extract_positions_adjustment_value( + holdings: list[dict] | None, + positions: list[dict] | None, +) -> float: + holdings_map: dict[tuple[str, str], dict] = {} + for item in holdings or []: + key = _symbol_key(item) + holdings_map[key] = { + "settled_qty": _first_numeric(item.get("settled_quantity"), item.get("quantity"), default=0.0), + "t1_qty": _first_numeric(item.get("t1_quantity"), default=0.0), + "effective_qty": holding_effective_quantity(item), + "last_price": holding_last_price(item), + } + + adjustment = 0.0 + for item in positions or []: + signed_qty = _position_signed_quantity(item) + if signed_qty == 0: + continue + key = _symbol_key(item) + holding_entry = holdings_map.get( + key, + {"settled_qty": 0.0, "t1_qty": 0.0, "effective_qty": 0.0, "last_price": holding_last_price(item)}, + ) + effective_qty = float(holding_entry.get("effective_qty") or 0.0) + t1_qty = float(holding_entry.get("t1_qty") or 0.0) + last_price = _first_numeric(item.get("last_price"), item.get("close_price"), holding_entry.get("last_price"), default=0.0) + + positive_qty = max(signed_qty, 0.0) + covered_by_t1 = min(positive_qty, t1_qty) + net_adjustment_qty = (positive_qty - covered_by_t1) + min(signed_qty, 0.0) + net_adjustment_qty = max(net_adjustment_qty, -effective_qty) + adjustment += net_adjustment_qty * last_price + return adjustment + + def _normalize_groww_funds(data: dict | None) -> dict: payload = data if isinstance(data, dict) else {} available = payload.get("available") if isinstance(payload.get("available"), dict) else {} @@ -155,8 +216,9 @@ def _upsert_snapshot( captured_at: datetime, cash_value: float, holdings_value: float, + positions_adjustment_value: float, ): - total_value = cash_value + holdings_value + total_value = cash_value + holdings_value + positions_adjustment_value with db_connection() as conn: with conn: with conn.cursor() as cur: @@ -168,13 +230,15 @@ def _upsert_snapshot( captured_at, cash_value, holdings_value, + positions_adjustment_value, total_value ) - VALUES (%s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (user_id, snapshot_date) DO UPDATE SET captured_at = EXCLUDED.captured_at, cash_value = EXCLUDED.cash_value, holdings_value = EXCLUDED.holdings_value, + positions_adjustment_value = EXCLUDED.positions_adjustment_value, total_value = EXCLUDED.total_value """, ( @@ -183,6 +247,7 @@ def _upsert_snapshot( captured_at, Decimal(str(round(cash_value, 2))), Decimal(str(round(holdings_value, 2))), + Decimal(str(round(positions_adjustment_value, 2))), Decimal(str(round(total_value, 2))), ), ) @@ -191,6 +256,7 @@ def _upsert_snapshot( "capturedAt": captured_at.isoformat(), "cashValue": round(cash_value, 2), "holdingsValue": round(holdings_value, 2), + "positionsAdjustmentValue": round(positions_adjustment_value, 2), "totalValue": round(total_value, 2), } @@ -199,6 +265,7 @@ def capture_live_equity_snapshot( user_id: str, *, holdings: list[dict] | None = None, + positions: list[dict] | None = None, funds_data: dict | None = None, captured_at: datetime | None = None, ): @@ -215,6 +282,10 @@ def capture_live_equity_snapshot( normalize_zerodha_holding(item) for item in fetch_zerodha_holdings(session["api_key"], session["access_token"]) ] + positions = [ + normalize_zerodha_position(item) + for item in fetch_zerodha_positions(session["api_key"], session["access_token"]) + ] elif broker_name == "GROWW": session = get_groww_session(user_id) if not session: @@ -223,8 +294,14 @@ def capture_live_equity_snapshot( normalize_groww_holding(item) for item in fetch_groww_holdings(session["access_token"]) ] + positions = [ + normalize_groww_position(item) + for item in fetch_groww_positions(session["access_token"]) + ] else: return None + elif positions is None: + positions = [] if funds_data is None: if broker_name == "ZERODHA": session = get_zerodha_session(user_id) @@ -243,12 +320,14 @@ def capture_live_equity_snapshot( cash_value = _extract_cash_value(funds_data) holdings_value = _extract_holdings_value(holdings) + positions_adjustment_value = _extract_positions_adjustment_value(holdings, positions) return _upsert_snapshot( user_id=user_id, snapshot_date=_snapshot_day(captured_at), captured_at=captured_at, cash_value=cash_value, holdings_value=holdings_value, + positions_adjustment_value=positions_adjustment_value, ) diff --git a/backend/app/services/strategy_service.py b/backend/app/services/strategy_service.py index a5ae7f8..094f688 100644 --- a/backend/app/services/strategy_service.py +++ b/backend/app/services/strategy_service.py @@ -334,8 +334,16 @@ def _effective_running_run_id(user_id: str): return None engine_row = _get_engine_status_row(user_id, run_id) engine_state = str((engine_row or {}).get("status") or "").strip().upper() - if engine_state not in {"STOPPED", "ERROR"}: + if engine_state not in {"STOPPED", "ERROR", "PAUSED_AUTH_EXPIRED"}: return run_id + if engine_state == "PAUSED_AUTH_EXPIRED": + update_run_status( + user_id, + run_id, + "PAUSED_AUTH_EXPIRED", + meta={"reason": "broker_auth_expired", "engine_state": engine_state}, + ) + return None update_run_status( user_id, run_id, @@ -350,6 +358,95 @@ def _set_run_status_or_raise(user_id: str, run_id: str, status: str, meta: dict if not updated: raise RuntimeError(f"Run {run_id} for user {user_id} no longer exists") + +def _get_run_row(user_id: str, run_id: str | None): + if not user_id or not run_id: + return None + with db_connection() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT status, meta, started_at, stopped_at + FROM strategy_run + WHERE user_id = %s AND run_id = %s + LIMIT 1 + """, + (user_id, run_id), + ) + row = cur.fetchone() + if not row: + return None + return { + "status": row[0], + "meta": row[1] if isinstance(row[1], dict) else {}, + "started_at": row[2], + "stopped_at": row[3], + } + + +def _broker_block_state(user_id: str, cfg: dict | None): + config = cfg or {} + mode = str(config.get("mode") or "").strip().upper() + if mode != "LIVE": + return {"blocked": False, "reason": None, "broker_state": None} + + broker_state = get_user_broker(user_id) or {} + auth_state = str(broker_state.get("auth_state") or "").strip().upper() + connected = bool(broker_state.get("connected")) + broker_name = str(broker_state.get("broker") or "").strip().upper() or None + + if not connected or auth_state == "DISCONNECTED": + return { + "blocked": True, + "reason": "broker_disconnected", + "broker_state": "DISCONNECTED", + "broker": broker_name, + } + if auth_state in {"EXPIRED", "PENDING"}: + return { + "blocked": True, + "reason": "broker_auth_required", + "broker_state": auth_state or "EXPIRED", + "broker": broker_name, + } + return { + "blocked": False, + "reason": None, + "broker_state": auth_state or "VALID", + "broker": broker_name, + } + + +def block_live_strategy_for_broker_disconnect(user_id: str, *, reason: str = "broker_disconnected"): + running_run_id = _effective_running_run_id(user_id) + run_id = running_run_id or get_active_run_id(user_id) + if not run_id: + return None + cfg = _load_config(user_id, run_id) + if str(cfg.get("mode") or "").strip().upper() != "LIVE": + return None + + stop_warning = None + try: + stop_engine(user_id, run_id, timeout=10.0) + except Exception as exc: + stop_warning = str(exc) + print(f"[STRATEGY] stop_engine failed during broker disconnect for {user_id}/{run_id}: {exc}", flush=True) + + deactivate_strategy_config(user_id, run_id) + stop_run(user_id, run_id, reason=reason) + _write_status(user_id, run_id, "STOPPED") + _set_run_status_or_raise( + user_id, + run_id, + "STOPPED", + meta={"reason": reason, "broker_blocked": True}, + ) + result = {"run_id": run_id, "status": "STOPPED", "reason": reason} + if stop_warning: + result["warning"] = stop_warning + return result + def validate_frequency(freq: dict, mode: str): if not isinstance(freq, dict): raise ValueError("Frequency payload is required") @@ -814,7 +911,8 @@ def get_strategy_status(user_id: str): running_run_id = _effective_running_run_id(user_id) run_id = running_run_id or get_active_run_id(user_id) cfg = _load_config(user_id, run_id) if run_id else {} - default_status = "RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE") + run_row = _get_run_row(user_id, run_id) + default_status = (run_row or {}).get("status") or ("RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE")) engine_row = None with db_connection() as conn: with conn.cursor() as cur: @@ -834,6 +932,8 @@ def get_strategy_status(user_id: str): engine_state = str((engine_row or [None])[0] or "").strip().upper() if running_run_id and engine_state in {"STOPPED", "ERROR"}: status["status"] = "STOPPED" + elif engine_state == "PAUSED_AUTH_EXPIRED": + status["status"] = "PAUSED_AUTH_EXPIRED" sip_frequency = cfg.get("sip_frequency") if not isinstance(sip_frequency, dict): frequency = cfg.get("frequency") @@ -866,6 +966,11 @@ def get_strategy_status(user_id: str): parsed_next = parse_persisted_timestamp(next_eligible) if parsed_next and parsed_next > _utc_now(): status["status"] = "WAITING" + broker_block = _broker_block_state(user_id, cfg) + status["broker_state"] = broker_block.get("broker_state") + status["strategy_blocked"] = bool(broker_block.get("blocked")) + status["strategy_block_reason"] = broker_block.get("reason") + status["broker"] = broker_block.get("broker") or cfg.get("broker") status_key = (status.get("status") or "IDLE").upper() resumable = bool(cfg.get("strategy")) and bool(cfg.get("mode")) status["can_resume"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"} @@ -1086,6 +1191,26 @@ def get_strategy_summary(user_id: str): } issue_row = None + block_reason = str(status.get("strategy_block_reason") or "").strip().lower() + if block_reason == "broker_disconnected": + summary.update( + { + "tone": "warning", + "message": "Broker disconnected. Live strategy is stopped until you reconnect.", + "event": "BROKER_DISCONNECTED", + } + ) + return summary + if block_reason == "broker_auth_required" or (status.get("status") or "").upper() == "PAUSED_AUTH_EXPIRED": + summary.update( + { + "tone": "warning", + "message": "Broker session expired. Reconnect broker to resume the strategy.", + "event": "BROKER_AUTH_EXPIRED", + } + ) + return summary + if run_id: with db_connection() as conn: with conn.cursor() as cur: diff --git a/backend/ecosystem.config.js b/backend/ecosystem.config.js index eebd798..2c48ca2 100644 --- a/backend/ecosystem.config.js +++ b/backend/ecosystem.config.js @@ -3,34 +3,29 @@ module.exports = { { name: "Quantfortune - Backend", cwd: "/SERVER_CLIENT/PRODUCTION/SIP_GoldBees_Backend/backend", - script: "/SERVER_CLIENT/PRODUCTION/SIP_GoldBees_Backend/backend/.venv/bin/uvicorn", args: "app.main:app --host 0.0.0.0 --port 3002", - - interpreter: "none", // ✅ IMPORTANT - // exec_mode: "fork", // optional - // instances: 1, // optional - + interpreter: "none", env: { - DB_HOST: "localhost", - DB_PORT: "5432", - DB_NAME: "trading_db", - DB_USER: "trader", - DB_PASSWORD: "traderpass", - DB_SCHEMA: "quant_app", - - BROKER_TOKEN_KEY: "6SuYLz0n7-KM5nB_Bs6ueYgDXZZvbmf-K-WpFbOMbH4=", - - SMTP_HOST: "smtp.gmail.com", - SMTP_PORT: "587", - SMTP_USER: "quantfortune@gmail.com", - SMTP_PASS: "wkbk mwbi aiqo yvwl", - SMTP_FROM_NAME: "Quantfortune Support", - - SUPER_ADMIN_EMAIL: "thigazhezhilanj007@gmail.com", - SUPER_ADMIN_PASSWORD: "AdminPass123!", - RESET_OTP_SECRET: "change_this_secret" - } - } - ] + APP_ENV: process.env.APP_ENV || "production", + DB_HOST: process.env.DB_HOST, + DB_PORT: process.env.DB_PORT, + DB_NAME: process.env.DB_NAME, + DB_USER: process.env.DB_USER, + DB_PASSWORD: process.env.DB_PASSWORD, + DB_SCHEMA: process.env.DB_SCHEMA, + CORS_ORIGINS: process.env.CORS_ORIGINS, + BROKER_TOKEN_KEY: process.env.BROKER_TOKEN_KEY, + SMTP_HOST: process.env.SMTP_HOST, + SMTP_PORT: process.env.SMTP_PORT, + SMTP_USER: process.env.SMTP_USER, + SMTP_PASS: process.env.SMTP_PASS, + SMTP_FROM_NAME: process.env.SMTP_FROM_NAME, + RESET_OTP_SECRET: process.env.RESET_OTP_SECRET, + ENABLE_SUPER_ADMIN_BOOTSTRAP: process.env.ENABLE_SUPER_ADMIN_BOOTSTRAP, + SUPER_ADMIN_EMAIL: process.env.SUPER_ADMIN_EMAIL, + SUPER_ADMIN_PASSWORD: process.env.SUPER_ADMIN_PASSWORD, + }, + }, + ], }; diff --git a/backend/run_backend.ps1 b/backend/run_backend.ps1 index de0097e..dbcd28e 100644 --- a/backend/run_backend.ps1 +++ b/backend/run_backend.ps1 @@ -1,4 +1,6 @@ Set-Location $PSScriptRoot +$appEnv = if ($env:APP_ENV) { $env:APP_ENV } else { 'development' } +if (-not $env:APP_ENV) { $env:APP_ENV = $appEnv } if (-not $env:DB_HOST) { $env:DB_HOST = 'localhost' } if (-not $env:DB_PORT) { $env:DB_PORT = '5432' } if (-not $env:DB_NAME) { $env:DB_NAME = 'trading_db' } @@ -17,13 +19,13 @@ if (Test-Path $frontendUrlFile) { $env:ZERODHA_REDIRECT_URL = "$frontendUrl/login" } } -if (-not $env:BROKER_TOKEN_KEY) { $env:BROKER_TOKEN_KEY = '6SuYLz0n7-KM5nB_Bs6ueYgDXZZvbmf-K-WpFbOMbH4=' } -if (-not $env:SUPER_ADMIN_EMAIL) { $env:SUPER_ADMIN_EMAIL = 'thigazhezhilanj007@gmail.com' } -if (-not $env:SUPER_ADMIN_PASSWORD) { $env:SUPER_ADMIN_PASSWORD = 'AdminPass123!' } if (-not $env:SMTP_HOST) { $env:SMTP_HOST = 'smtp.gmail.com' } if (-not $env:SMTP_PORT) { $env:SMTP_PORT = '587' } -if (-not $env:SMTP_USER) { $env:SMTP_USER = 'quantfortune@gmail.com' } -if (-not $env:SMTP_PASS) { $env:SMTP_PASS = 'wkbk mwbi aiqo yvwl' } if (-not $env:SMTP_FROM_NAME) { $env:SMTP_FROM_NAME = 'Quantfortune Support' } -if (-not $env:RESET_OTP_SECRET) { $env:RESET_OTP_SECRET = 'change_this_secret' } +if (-not $env:BROKER_TOKEN_KEY -and $env:APP_ENV -in @('development', 'dev', 'test', 'testing', 'local')) { + $env:BROKER_TOKEN_KEY = (& .\venv\Scripts\python.exe -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode('utf-8'))").Trim() +} +if (-not $env:RESET_OTP_SECRET -and $env:APP_ENV -in @('development', 'dev', 'test', 'testing', 'local')) { + $env:RESET_OTP_SECRET = 'dev-reset-otp-secret' +} .\venv\Scripts\uvicorn.exe app.main:app --host 0.0.0.0 --port 8000 diff --git a/backend/tests/test_api_semantics_and_utc.py b/backend/tests/test_api_semantics_and_utc.py index 64647ce..26de2f1 100644 --- a/backend/tests/test_api_semantics_and_utc.py +++ b/backend/tests/test_api_semantics_and_utc.py @@ -101,7 +101,10 @@ def test_bootstrap_schema_contains_migrated_core_columns_and_tables(): "ALTER TABLE user_broker", "ADD COLUMN IF NOT EXISTS api_secret TEXT", "ADD COLUMN IF NOT EXISTS auth_state TEXT", + "ALTER TABLE live_equity_snapshot", + "ADD COLUMN IF NOT EXISTS positions_adjustment_value NUMERIC NOT NULL DEFAULT 0", "CREATE TABLE IF NOT EXISTS broker_callback_state", + "CREATE TABLE IF NOT EXISTS broker_order_state", "CREATE TABLE IF NOT EXISTS execution_claim", "CREATE TABLE IF NOT EXISTS run_leases", "CREATE TABLE IF NOT EXISTS support_request_audit", diff --git a/backend/tests/test_broker_disconnect_handling.py b/backend/tests/test_broker_disconnect_handling.py new file mode 100644 index 0000000..aaacafa --- /dev/null +++ b/backend/tests/test_broker_disconnect_handling.py @@ -0,0 +1,252 @@ +import importlib + +from fastapi.testclient import TestClient + + +def _build_app(monkeypatch): + monkeypatch.setenv("APP_ENV", "test") + monkeypatch.setenv("DISABLE_STARTUP_TASKS", "1") + monkeypatch.setenv("DB_HOST", "localhost") + monkeypatch.setenv("DB_NAME", "trading_db") + monkeypatch.setenv("DB_USER", "trader") + monkeypatch.setenv("DB_PASSWORD", "test-password") + monkeypatch.setenv("CORS_ORIGINS", "http://localhost:3000") + monkeypatch.setenv("BROKER_TOKEN_KEY", "test-broker-token-key") + monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret") + + import app.main as app_main + + importlib.reload(app_main) + return app_main.create_app() + + +def test_disconnect_route_stops_live_strategy_and_disconnects_broker(monkeypatch): + app = _build_app(monkeypatch) + client = TestClient(app) + + import app.routers.broker as broker_router + + calls = { + "disconnect": [], + "clear_session": [], + } + + monkeypatch.setattr( + broker_router, + "_require_user", + lambda _request: {"id": "user-1", "username": "user@example.com"}, + ) + monkeypatch.setattr( + broker_router, + "block_live_strategy_for_broker_disconnect", + lambda user_id, reason="broker_disconnected": { + "run_id": "run-1", + "status": "STOPPED", + "reason": reason, + "user_id": user_id, + }, + ) + monkeypatch.setattr( + broker_router, + "disconnect_user_broker", + lambda user_id: calls["disconnect"].append(user_id), + ) + monkeypatch.setattr( + broker_router, + "clear_zerodha_session", + lambda user_id: calls["clear_session"].append(user_id), + ) + monkeypatch.setattr(broker_router, "send_email_async", lambda *_args, **_kwargs: None) + + response = client.post("/api/broker/disconnect", cookies={"session_id": "session-1"}) + + assert response.status_code == 200 + assert response.json() == { + "connected": False, + "brokerState": "DISCONNECTED", + "strategy": { + "run_id": "run-1", + "status": "STOPPED", + "reason": "broker_disconnected", + "user_id": "user-1", + }, + } + assert calls["disconnect"] == ["user-1"] + assert calls["clear_session"] == ["user-1"] + + +def test_block_live_strategy_for_broker_disconnect_auto_stops_active_live_run(monkeypatch): + import app.services.strategy_service as strategy_service + + calls = [] + + monkeypatch.setattr(strategy_service, "_effective_running_run_id", lambda _user_id: "run-1") + monkeypatch.setattr(strategy_service, "_load_config", lambda _user_id, _run_id: {"mode": "LIVE"}) + monkeypatch.setattr( + strategy_service, + "stop_engine", + lambda user_id, run_id, timeout=10.0: calls.append(("stop_engine", user_id, run_id, timeout)), + ) + monkeypatch.setattr( + strategy_service, + "deactivate_strategy_config", + lambda user_id, run_id: calls.append(("deactivate", user_id, run_id)), + ) + monkeypatch.setattr( + strategy_service, + "stop_run", + lambda user_id, run_id, reason="user_request": calls.append(("stop_run", user_id, run_id, reason)), + ) + monkeypatch.setattr( + strategy_service, + "_write_status", + lambda user_id, run_id, status: calls.append(("write_status", user_id, run_id, status)), + ) + monkeypatch.setattr( + strategy_service, + "_set_run_status_or_raise", + lambda user_id, run_id, status, meta=None: calls.append(("set_run_status", user_id, run_id, status, meta)), + ) + + result = strategy_service.block_live_strategy_for_broker_disconnect( + "user-1", + reason="broker_disconnected", + ) + + assert result == { + "run_id": "run-1", + "status": "STOPPED", + "reason": "broker_disconnected", + } + assert ("stop_engine", "user-1", "run-1", 10.0) in calls + assert ("deactivate", "user-1", "run-1") in calls + assert ("stop_run", "user-1", "run-1", "broker_disconnected") in calls + assert ("write_status", "user-1", "run-1", "STOPPED") in calls + + +def test_strategy_status_marks_broker_disconnected_block(monkeypatch): + import app.services.strategy_service as strategy_service + + monkeypatch.setattr(strategy_service, "_effective_running_run_id", lambda _user_id: None) + monkeypatch.setattr(strategy_service, "get_active_run_id", lambda _user_id: "run-1") + monkeypatch.setattr( + strategy_service, + "_load_config", + lambda _user_id, _run_id: { + "strategy": "Golden Nifty", + "mode": "LIVE", + "broker": "ZERODHA", + "active": True, + }, + ) + monkeypatch.setattr( + strategy_service, + "_get_run_row", + lambda _user_id, _run_id: {"status": "STOPPED", "meta": {}, "started_at": None, "stopped_at": None}, + ) + monkeypatch.setattr( + strategy_service, + "_broker_block_state", + lambda _user_id, _cfg: { + "blocked": True, + "reason": "broker_disconnected", + "broker_state": "DISCONNECTED", + "broker": "ZERODHA", + }, + ) + + class FakeCursor: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def execute(self, _sql, _params): + return None + + def fetchone(self): + return None + + class FakeConnection: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def cursor(self): + return FakeCursor() + + monkeypatch.setattr(strategy_service, "db_connection", lambda: FakeConnection()) + + status = strategy_service.get_strategy_status("user-1") + + assert status["status"] == "STOPPED" + assert status["strategy_blocked"] is True + assert status["strategy_block_reason"] == "broker_disconnected" + assert status["broker_state"] == "DISCONNECTED" + assert status["broker"] == "ZERODHA" + + +def test_strategy_status_clears_broker_block_after_reconnect(monkeypatch): + import app.services.strategy_service as strategy_service + + monkeypatch.setattr(strategy_service, "_effective_running_run_id", lambda _user_id: None) + monkeypatch.setattr(strategy_service, "get_active_run_id", lambda _user_id: "run-1") + monkeypatch.setattr( + strategy_service, + "_load_config", + lambda _user_id, _run_id: { + "strategy": "Golden Nifty", + "mode": "LIVE", + "broker": "ZERODHA", + "active": True, + }, + ) + monkeypatch.setattr( + strategy_service, + "_get_run_row", + lambda _user_id, _run_id: {"status": "STOPPED", "meta": {}, "started_at": None, "stopped_at": None}, + ) + monkeypatch.setattr( + strategy_service, + "_broker_block_state", + lambda _user_id, _cfg: { + "blocked": False, + "reason": None, + "broker_state": "VALID", + "broker": "ZERODHA", + }, + ) + + class FakeCursor: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def execute(self, _sql, _params): + return None + + def fetchone(self): + return None + + class FakeConnection: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def cursor(self): + return FakeCursor() + + monkeypatch.setattr(strategy_service, "db_connection", lambda: FakeConnection()) + + status = strategy_service.get_strategy_status("user-1") + + assert status["strategy_blocked"] is False + assert status["strategy_block_reason"] is None + assert status["broker_state"] == "VALID" diff --git a/backend/tests/test_execution_claims.py b/backend/tests/test_execution_claims.py index 7c90599..7651af9 100644 --- a/backend/tests/test_execution_claims.py +++ b/backend/tests/test_execution_claims.py @@ -85,6 +85,11 @@ def test_try_execute_sip_live_emits_one_order_batch_when_claim_is_lost(monkeypat monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False) monkeypatch.setattr(execution, "claim_execution_window", fake_claim_execution_window) monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None) + monkeypatch.setattr( + execution, + "reconcile_live_orders", + lambda **kwargs: {"blocked": False, "orders": [], "summaries": []}, + ) monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None)) monkeypatch.setattr( execution, diff --git a/backend/tests/test_health_readiness.py b/backend/tests/test_health_readiness.py new file mode 100644 index 0000000..36a0a8f --- /dev/null +++ b/backend/tests/test_health_readiness.py @@ -0,0 +1,124 @@ +import importlib + +import pytest +from fastapi.testclient import TestClient + + +def _load_app_main(monkeypatch, *, disable_startup_tasks="1"): + monkeypatch.setenv("APP_ENV", "test") + monkeypatch.setenv("DISABLE_STARTUP_TASKS", disable_startup_tasks) + monkeypatch.setenv("DB_HOST", "localhost") + monkeypatch.setenv("DB_NAME", "trading_db") + monkeypatch.setenv("DB_USER", "trader") + monkeypatch.setenv("DB_PASSWORD", "test-password") + monkeypatch.setenv("CORS_ORIGINS", "http://localhost:3000") + monkeypatch.setenv("BROKER_TOKEN_KEY", "test-broker-token-key") + monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret") + + import app.main as app_main + + importlib.reload(app_main) + return app_main + + +def test_liveness_succeeds_even_if_db_is_unavailable(monkeypatch): + app_main = _load_app_main(monkeypatch) + app = app_main.create_app() + + import app.routers.health as health_router + + monkeypatch.setattr(health_router, "health_check", lambda: False) + + with TestClient(app) as client: + response = client.get("/health/live") + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +def test_readiness_fails_when_db_is_unavailable(monkeypatch): + app_main = _load_app_main(monkeypatch) + app = app_main.create_app() + + import app.routers.health as health_router + + monkeypatch.setattr(health_router, "health_check", lambda: False) + + with TestClient(app) as client: + response = client.get("/health/ready") + + assert response.status_code == 503 + assert response.json()["detail"]["db"] == "unavailable" + + +def test_readiness_succeeds_when_startup_complete_and_db_available(monkeypatch): + app_main = _load_app_main(monkeypatch) + app = app_main.create_app() + + import app.routers.health as health_router + + monkeypatch.setattr(health_router, "health_check", lambda: True) + + with TestClient(app) as client: + response = client.get("/health/ready") + + assert response.status_code == 200 + assert response.json()["status"] == "ok" + assert response.json()["startup"]["complete"] is True + + +def test_startup_failure_keeps_readiness_unhealthy(monkeypatch): + app_main = _load_app_main(monkeypatch, disable_startup_tasks="0") + app = app_main.create_app() + + import app.routers.health as health_router + + monkeypatch.setattr(health_router, "health_check", lambda: True) + + def fake_startup_tasks(app): + app.state.startup_started = True + app.state.startup_complete = False + app.state.startup_error = "resume_failed" + + monkeypatch.setattr(app_main, "_run_startup_tasks", fake_startup_tasks) + + with TestClient(app) as client: + response = client.get("/health") + + assert response.status_code == 503 + assert response.json()["detail"]["startup"]["error"] == "resume_failed" + + +def test_production_boot_fails_without_broker_token_key(monkeypatch): + monkeypatch.setenv("APP_ENV", "production") + monkeypatch.setenv("DB_HOST", "localhost") + monkeypatch.setenv("DB_NAME", "trading_db") + monkeypatch.setenv("DB_USER", "trader") + monkeypatch.setenv("DB_PASSWORD", "test-password") + monkeypatch.setenv("CORS_ORIGINS", "https://app.quantfortune.com") + monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret") + monkeypatch.delenv("BROKER_TOKEN_KEY", raising=False) + + import app.main as app_main + + with pytest.raises(RuntimeError, match="BROKER_TOKEN_KEY must be configured in production"): + importlib.reload(app_main) + + +def test_production_bootstrap_requires_explicit_admin_credentials(monkeypatch): + monkeypatch.setenv("APP_ENV", "production") + monkeypatch.setenv("DB_HOST", "localhost") + monkeypatch.setenv("DB_NAME", "trading_db") + monkeypatch.setenv("DB_USER", "trader") + monkeypatch.setenv("DB_PASSWORD", "test-password") + monkeypatch.setenv("CORS_ORIGINS", "https://app.quantfortune.com") + monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret") + monkeypatch.setenv("BROKER_TOKEN_KEY", "test-broker-token-key") + monkeypatch.setenv("ENABLE_SUPER_ADMIN_BOOTSTRAP", "1") + monkeypatch.delenv("SUPER_ADMIN_EMAIL", raising=False) + monkeypatch.delenv("SUPER_ADMIN_PASSWORD", raising=False) + + import app.main as app_main + + with pytest.raises(RuntimeError, match="SUPER_ADMIN_EMAIL must be configured when bootstrap is enabled"): + importlib.reload(app_main) diff --git a/backend/tests/test_live_equity_positions.py b/backend/tests/test_live_equity_positions.py new file mode 100644 index 0000000..87137a4 --- /dev/null +++ b/backend/tests/test_live_equity_positions.py @@ -0,0 +1,127 @@ +from datetime import datetime, timezone + + +def _holding(symbol: str, *, quantity: float, t1_quantity: float = 0.0, last_price: float = 0.0, exchange: str = "NSE"): + return { + "symbol": symbol, + "exchange": exchange, + "quantity": quantity, + "t1_quantity": t1_quantity, + "last_price": last_price, + } + + +def _position(symbol: str, *, net_quantity: float, last_price: float, exchange: str = "NSE"): + return { + "symbol": symbol, + "exchange": exchange, + "net_quantity": net_quantity, + "last_price": last_price, + } + + +def test_positions_adjustment_is_zero_when_only_holdings_exist(): + from app.services.live_equity_service import _extract_positions_adjustment_value + + adjustment = _extract_positions_adjustment_value( + [_holding("NIFTYBEES", quantity=2, last_price=250.0)], + [], + ) + + assert adjustment == 0.0 + + +def test_positions_only_are_counted_in_live_equity_snapshot(monkeypatch): + import app.services.live_equity_service as live_equity_service + + captured = {} + monkeypatch.setattr(live_equity_service, "get_user_broker", lambda _user_id: {}) + monkeypatch.setattr( + live_equity_service, + "_upsert_snapshot", + lambda **kwargs: captured.update(kwargs) or kwargs, + ) + + result = live_equity_service.capture_live_equity_snapshot( + "user-1", + holdings=[], + positions=[_position("NIFTYBEES", net_quantity=2, last_price=250.0)], + funds_data={"equity": {"available": {"live_balance": 1000.0}}}, + captured_at=datetime(2026, 4, 9, 10, 0, tzinfo=timezone.utc), + ) + + assert captured["cash_value"] == 1000.0 + assert captured["holdings_value"] == 0.0 + assert captured["positions_adjustment_value"] == 500.0 + assert result["positions_adjustment_value"] == 500.0 + + +def test_same_day_position_already_reflected_in_t1_is_not_double_counted(): + from app.services.live_equity_service import _extract_positions_adjustment_value + + adjustment = _extract_positions_adjustment_value( + [_holding("GOLDBEES", quantity=2, t1_quantity=1, last_price=120.0)], + [_position("GOLDBEES", net_quantity=1, last_price=120.0)], + ) + + assert adjustment == 0.0 + + +def test_negative_same_day_position_reduces_exposure_without_going_below_zero(): + from app.services.live_equity_service import _extract_positions_adjustment_value + + adjustment = _extract_positions_adjustment_value( + [_holding("NIFTYBEES", quantity=3, last_price=250.0)], + [_position("NIFTYBEES", net_quantity=-1, last_price=250.0)], + ) + + assert adjustment == -250.0 + + +def test_capture_live_equity_snapshot_avoids_double_counting_when_holdings_and_t1_overlap(monkeypatch): + import app.services.live_equity_service as live_equity_service + + captured = {} + monkeypatch.setattr(live_equity_service, "get_user_broker", lambda _user_id: {}) + monkeypatch.setattr( + live_equity_service, + "_upsert_snapshot", + lambda **kwargs: captured.update(kwargs) or kwargs, + ) + + result = live_equity_service.capture_live_equity_snapshot( + "user-1", + holdings=[_holding("NIFTYBEES", quantity=2, t1_quantity=1, last_price=250.0)], + positions=[_position("NIFTYBEES", net_quantity=1, last_price=250.0)], + funds_data={"equity": {"available": {"live_balance": 1000.0}}}, + captured_at=datetime(2026, 4, 9, 10, 0, tzinfo=timezone.utc), + ) + + assert captured["holdings_value"] == 750.0 + assert captured["positions_adjustment_value"] == 0.0 + assert result["positions_adjustment_value"] == 0.0 + + +def test_capture_live_equity_snapshot_adds_same_day_position_exposure_to_total(monkeypatch): + import app.services.live_equity_service as live_equity_service + + captured = {} + monkeypatch.setattr(live_equity_service, "get_user_broker", lambda _user_id: {}) + monkeypatch.setattr( + live_equity_service, + "_upsert_snapshot", + lambda **kwargs: captured.update(kwargs) or kwargs, + ) + + result = live_equity_service.capture_live_equity_snapshot( + "user-1", + holdings=[_holding("NIFTYBEES", quantity=1, last_price=250.0)], + positions=[_position("NIFTYBEES", net_quantity=1, last_price=250.0)], + funds_data={"equity": {"available": {"live_balance": 1000.0}}}, + captured_at=datetime(2026, 4, 9, 10, 0, tzinfo=timezone.utc), + ) + + assert captured["cash_value"] == 1000.0 + assert captured["holdings_value"] == 250.0 + assert captured["positions_adjustment_value"] == 250.0 + assert result["positions_adjustment_value"] == 250.0 diff --git a/backend/tests/test_order_reconciliation.py b/backend/tests/test_order_reconciliation.py new file mode 100644 index 0000000..4a12677 --- /dev/null +++ b/backend/tests/test_order_reconciliation.py @@ -0,0 +1,337 @@ +from datetime import datetime, timezone + + +def _base_state(): + return { + "nifty_units": 0.0, + "gold_units": 0.0, + "total_invested": 0.0, + "last_run": None, + "last_sip_ts": None, + } + + +class _Broker: + def __init__(self, payload): + self.payload = payload + + def refresh_order_status(self, order): + merged = dict(order) + merged.update(self.payload) + return merged + + +def test_reconciliation_applies_late_fill_once(monkeypatch): + import indian_paper_trading_strategy.engine.execution as execution + + logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc) + checked_at = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc) + state = _base_state() + saved = {} + + pending_orders = [ + { + "local_order_id": "order-1", + "logical_time": logical_time, + "broker": "ZERODHA", + "symbol": "NIFTYBEES.NS", + "side": "BUY", + "requested_qty": 1.0, + "requested_price": 250.0, + "status": "PENDING", + "created_at": checked_at, + } + ] + + cycle_orders = [ + { + "local_order_id": "order-1", + "symbol": "NIFTYBEES.NS", + "side": "BUY", + "requested_qty": 1.0, + "filled_qty": 1.0, + "accounted_fill_qty": 1.0, + "requested_price": 250.0, + "average_price": 251.0, + "status": execution.LOCAL_ORDER_FILLED, + "needs_reconciliation": False, + } + ] + + monkeypatch.setattr(execution, "_normalize_now", lambda value: value) + monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None)) + monkeypatch.setattr(execution, "_list_orders_to_reconcile", lambda cur, checked_before: list(pending_orders)) + monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state) + monkeypatch.setattr(execution, "save_state", lambda snapshot, **kwargs: saved.update(snapshot)) + monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False) + monkeypatch.setattr( + execution, + "_upsert_broker_order_state", + lambda cur, broker_name, logical_time, order, checked_at: { + "local_order_id": "order-1", + "symbol": "NIFTYBEES.NS", + "requested_qty": 1.0, + "filled_qty": 1.0, + "requested_price": 250.0, + "average_price": 251.0, + "status": execution.LOCAL_ORDER_FILLED, + "accounted_fill_qty": 0.0, + "delta_fill_qty": 1.0, + "needs_reconciliation": False, + }, + ) + monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "_load_cycle_order_rows", lambda cur, logical_time: list(cycle_orders)) + + result = execution.reconcile_live_orders( + broker=_Broker({"status": "FILLED", "filled_qty": 1.0, "average_price": 251.0}), + mode="LIVE", + now_ts=checked_at, + ) + + assert result["blocked"] is False + assert state["nifty_units"] == 1.0 + assert state["total_invested"] == 251.0 + assert state["last_sip_ts"] == checked_at.isoformat() + assert saved["last_sip_ts"] == checked_at.isoformat() + + +def test_partial_fill_remains_partial_and_does_not_advance_cycle(monkeypatch): + import indian_paper_trading_strategy.engine.execution as execution + + logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc) + checked_at = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc) + state = _base_state() + + monkeypatch.setattr(execution, "_normalize_now", lambda value: value) + monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None)) + monkeypatch.setattr( + execution, + "_list_orders_to_reconcile", + lambda cur, checked_before: [ + { + "local_order_id": "order-2", + "logical_time": logical_time, + "broker": "ZERODHA", + "symbol": "GOLDBEES.NS", + "side": "BUY", + "requested_qty": 2.0, + "requested_price": 100.0, + "status": "PENDING", + "created_at": checked_at, + } + ], + ) + monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state) + monkeypatch.setattr(execution, "save_state", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False) + monkeypatch.setattr( + execution, + "_upsert_broker_order_state", + lambda cur, broker_name, logical_time, order, checked_at: { + "local_order_id": "order-2", + "symbol": "GOLDBEES.NS", + "requested_qty": 2.0, + "filled_qty": 1.0, + "requested_price": 100.0, + "average_price": 101.0, + "status": execution.LOCAL_ORDER_PARTIAL, + "accounted_fill_qty": 0.0, + "delta_fill_qty": 1.0, + "needs_reconciliation": False, + }, + ) + monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None) + monkeypatch.setattr( + execution, + "_load_cycle_order_rows", + lambda cur, logical_time: [ + { + "local_order_id": "order-2", + "symbol": "GOLDBEES.NS", + "requested_qty": 2.0, + "filled_qty": 1.0, + "requested_price": 100.0, + "average_price": 101.0, + "status": execution.LOCAL_ORDER_PARTIAL, + "needs_reconciliation": False, + } + ], + ) + + result = execution.reconcile_live_orders( + broker=_Broker({"status": "CANCELLED", "filled_qty": 1.0, "average_price": 101.0}), + mode="LIVE", + now_ts=checked_at, + ) + + assert result["blocked"] is False + assert state["gold_units"] == 1.0 + assert state["last_sip_ts"] is None + assert state["last_run"] == checked_at.isoformat() + + +def test_rejected_order_does_not_mark_cycle_executed(monkeypatch): + import indian_paper_trading_strategy.engine.execution as execution + + logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc) + checked_at = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc) + state = _base_state() + + monkeypatch.setattr(execution, "_normalize_now", lambda value: value) + monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None)) + monkeypatch.setattr( + execution, + "_list_orders_to_reconcile", + lambda cur, checked_before: [ + { + "local_order_id": "order-3", + "logical_time": logical_time, + "broker": "ZERODHA", + "symbol": "NIFTYBEES.NS", + "side": "BUY", + "requested_qty": 1.0, + "requested_price": 250.0, + "status": "PENDING", + "created_at": checked_at, + } + ], + ) + monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state) + monkeypatch.setattr(execution, "save_state", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False) + monkeypatch.setattr( + execution, + "_upsert_broker_order_state", + lambda cur, broker_name, logical_time, order, checked_at: { + "local_order_id": "order-3", + "symbol": "NIFTYBEES.NS", + "requested_qty": 1.0, + "filled_qty": 0.0, + "requested_price": 250.0, + "average_price": 0.0, + "status": execution.LOCAL_ORDER_REJECTED, + "accounted_fill_qty": 0.0, + "delta_fill_qty": 0.0, + "needs_reconciliation": False, + }, + ) + monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None) + monkeypatch.setattr( + execution, + "_load_cycle_order_rows", + lambda cur, logical_time: [ + { + "local_order_id": "order-3", + "symbol": "NIFTYBEES.NS", + "requested_qty": 1.0, + "filled_qty": 0.0, + "requested_price": 250.0, + "average_price": 0.0, + "status": execution.LOCAL_ORDER_REJECTED, + "needs_reconciliation": False, + } + ], + ) + + result = execution.reconcile_live_orders( + broker=_Broker({"status": "REJECTED", "filled_qty": 0.0}), + mode="LIVE", + now_ts=checked_at, + ) + + assert result["blocked"] is False + assert state["nifty_units"] == 0.0 + assert state["last_sip_ts"] is None + assert state["last_run"] == checked_at.isoformat() + + +def test_reconciliation_is_repeat_safe(monkeypatch): + import indian_paper_trading_strategy.engine.execution as execution + + logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc) + first_check = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc) + second_check = datetime(2026, 4, 9, 4, 2, tzinfo=timezone.utc) + state = _base_state() + upserts = [ + { + "local_order_id": "order-4", + "symbol": "NIFTYBEES.NS", + "requested_qty": 1.0, + "filled_qty": 1.0, + "requested_price": 250.0, + "average_price": 250.0, + "status": execution.LOCAL_ORDER_FILLED, + "accounted_fill_qty": 0.0, + "delta_fill_qty": 1.0, + "needs_reconciliation": False, + }, + { + "local_order_id": "order-4", + "symbol": "NIFTYBEES.NS", + "requested_qty": 1.0, + "filled_qty": 1.0, + "requested_price": 250.0, + "average_price": 250.0, + "status": execution.LOCAL_ORDER_FILLED, + "accounted_fill_qty": 1.0, + "delta_fill_qty": 0.0, + "needs_reconciliation": False, + }, + ] + + monkeypatch.setattr(execution, "_normalize_now", lambda value: value) + monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None)) + monkeypatch.setattr( + execution, + "_list_orders_to_reconcile", + lambda cur, checked_before: [ + { + "local_order_id": "order-4", + "logical_time": logical_time, + "broker": "ZERODHA", + "symbol": "NIFTYBEES.NS", + "side": "BUY", + "requested_qty": 1.0, + "requested_price": 250.0, + "status": "PENDING", + "created_at": first_check, + } + ], + ) + monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state) + monkeypatch.setattr(execution, "save_state", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False) + monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None) + monkeypatch.setattr(execution, "_load_cycle_order_rows", lambda cur, logical_time: [ + { + "local_order_id": "order-4", + "symbol": "NIFTYBEES.NS", + "requested_qty": 1.0, + "filled_qty": 1.0, + "requested_price": 250.0, + "average_price": 250.0, + "status": execution.LOCAL_ORDER_FILLED, + "needs_reconciliation": False, + } + ]) + monkeypatch.setattr( + execution, + "_upsert_broker_order_state", + lambda cur, broker_name, logical_time, order, checked_at: upserts.pop(0), + ) + + broker = _Broker({"status": "FILLED", "filled_qty": 1.0, "average_price": 250.0}) + execution.reconcile_live_orders(broker=broker, mode="LIVE", now_ts=first_check) + execution.reconcile_live_orders(broker=broker, mode="LIVE", now_ts=second_check) + + assert state["nifty_units"] == 1.0 + assert state["total_invested"] == 250.0 diff --git a/indian_paper_trading_strategy/engine/broker.py b/indian_paper_trading_strategy/engine/broker.py index 32f0f6c..2f20944 100644 --- a/indian_paper_trading_strategy/engine/broker.py +++ b/indian_paper_trading_strategy/engine/broker.py @@ -37,10 +37,13 @@ class Broker(ABC): def get_orders(self): raise NotImplementedError - @abstractmethod + @abstractmethod def get_funds(self): raise NotImplementedError + def refresh_order_status(self, order: dict): + return order + class BrokerError(Exception): pass @@ -296,6 +299,33 @@ class LiveZerodhaBroker(Broker): time.sleep(self.POLL_INTERVAL_SECONDS) + def refresh_order_status(self, order: dict): + from app.services.zerodha_service import KiteTokenError, fetch_order_history + + session = self._session() + order_id = str(order.get("broker_order_id") or order.get("id") or "").strip() + if not order_id: + return dict(order) + try: + history = fetch_order_history( + session["api_key"], + session["access_token"], + order_id, + ) + except KiteTokenError as exc: + self._raise_auth_expired(exc) + + entry = history[-1] if history else None + return self._normalize_order_payload( + order_id=order_id, + symbol=str(order.get("symbol") or ""), + side=str(order.get("side") or ""), + requested_qty=int(order.get("requested_qty") or order.get("qty") or 0), + requested_price=float(order.get("requested_price") or order.get("price") or 0.0), + history_entry=entry, + logical_time=_parse_ts(order.get("logical_time"), assume_local=False), + ) + def get_funds(self, cur=None): from app.services.zerodha_service import KiteTokenError, fetch_funds @@ -693,6 +723,41 @@ class LiveGrowwBroker(Broker): time.sleep(self.POLL_INTERVAL_SECONDS) + def refresh_order_status(self, order: dict): + from app.services.groww_service import GrowwApiError, GrowwTokenError, fetch_order_detail, fetch_order_status + + session = self._session() + order_id = self._extract_order_id(order) + if not order_id: + return dict(order) + segment = self._first_text(order.get("segment"), default="CASH") + merged = {} + try: + detail = fetch_order_detail(session["access_token"], order_id, segment=segment) + status_payload = fetch_order_status(session["access_token"], order_id, segment=segment) + if isinstance(detail, dict): + merged.update(detail) + if isinstance(status_payload, dict): + merged.update(status_payload) + except GrowwTokenError as exc: + self._raise_auth_expired(exc) + except GrowwApiError as exc: + merged = { + "groww_order_id": order_id, + "order_status": "FAILED", + "remark": getattr(exc, "message", str(exc)), + } + + return self._normalize_order_payload( + order_id=order_id, + symbol=str(order.get("symbol") or ""), + side=str(order.get("side") or ""), + requested_qty=int(order.get("requested_qty") or order.get("qty") or 0), + requested_price=float(order.get("requested_price") or order.get("price") or 0.0), + order_entry=merged, + logical_time=_parse_ts(order.get("logical_time"), assume_local=False), + ) + def get_funds(self, cur=None): from app.services.groww_service import GrowwTokenError, fetch_funds diff --git a/indian_paper_trading_strategy/engine/execution.py b/indian_paper_trading_strategy/engine/execution.py index 5068a5a..cc309ed 100644 --- a/indian_paper_trading_strategy/engine/execution.py +++ b/indian_paper_trading_strategy/engine/execution.py @@ -1,12 +1,32 @@ # engine/execution.py -from datetime import datetime, timezone +import os +from datetime import datetime, timedelta, timezone from indian_paper_trading_strategy.engine.state import load_state, save_state from indian_paper_trading_strategy.engine.broker import Broker, BrokerAuthExpired from indian_paper_trading_strategy.engine.ledger import claim_execution_window, log_event, event_exists -from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry +from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry, get_context from indian_paper_trading_strategy.engine.market import market_now from indian_paper_trading_strategy.engine.time_utils import compute_logical_time + +LOCAL_ORDER_PENDING = "PENDING" +LOCAL_ORDER_PARTIAL = "PARTIAL" +LOCAL_ORDER_FILLED = "FILLED" +LOCAL_ORDER_REJECTED = "REJECTED" +LOCAL_ORDER_CANCELLED = "CANCELLED" +LOCAL_ORDER_UNKNOWN = "UNKNOWN" + +_RECONCILEABLE_ORDER_STATES = { + LOCAL_ORDER_PENDING, + LOCAL_ORDER_PARTIAL, + LOCAL_ORDER_UNKNOWN, +} +RECONCILIATION_INTERVAL = timedelta( + seconds=int(os.getenv("ORDER_RECONCILIATION_INTERVAL_SEC", "15")) +) +RECONCILIATION_TIMEOUT = timedelta( + seconds=int(os.getenv("ORDER_RECONCILIATION_TIMEOUT_SEC", "300")) +) def _as_float(value): if hasattr(value, "item"): @@ -88,6 +108,506 @@ def _apply_filled_orders_to_state(state, orders): } +def _utc_now(): + return datetime.now(timezone.utc) + + +def _current_scope(): + return get_context() + + +def _order_status_details(order: dict) -> dict: + payload = dict(order or {}) + requested_qty = float(payload.get("requested_qty") or payload.get("qty") or 0.0) + filled_qty = max(float(payload.get("filled_qty") or 0.0), 0.0) + requested_price = float(payload.get("requested_price") or payload.get("price") or 0.0) + average_price = float(payload.get("average_price") or requested_price or 0.0) + raw_status = str(payload.get("status") or "").strip().upper() + + if requested_qty > 0 and filled_qty >= requested_qty: + local_status = LOCAL_ORDER_FILLED + needs_reconciliation = False + elif filled_qty > 0: + local_status = LOCAL_ORDER_PARTIAL + needs_reconciliation = raw_status not in {LOCAL_ORDER_FILLED, LOCAL_ORDER_REJECTED, LOCAL_ORDER_CANCELLED} + elif raw_status == LOCAL_ORDER_REJECTED: + local_status = LOCAL_ORDER_REJECTED + needs_reconciliation = False + elif raw_status == LOCAL_ORDER_CANCELLED: + local_status = LOCAL_ORDER_CANCELLED + needs_reconciliation = False + elif raw_status == LOCAL_ORDER_PENDING: + local_status = LOCAL_ORDER_PENDING + needs_reconciliation = True + else: + local_status = LOCAL_ORDER_UNKNOWN + needs_reconciliation = True + + return { + "local_order_id": str(payload.get("id") or payload.get("broker_order_id") or ""), + "broker_order_id": str(payload.get("broker_order_id") or payload.get("id") or "") or None, + "symbol": str(payload.get("symbol") or ""), + "side": str(payload.get("side") or "").upper(), + "requested_qty": requested_qty, + "filled_qty": filled_qty, + "requested_price": requested_price, + "average_price": average_price, + "status": local_status, + "broker_status": raw_status or None, + "status_message": payload.get("status_message"), + "needs_reconciliation": needs_reconciliation, + "logical_time": payload.get("logical_time"), + } + + +def _upsert_broker_order_state(cur, *, broker_name: str, logical_time, order: dict, checked_at: datetime): + user_id, run_id = _current_scope() + details = _order_status_details(order) + if not details["local_order_id"]: + raise ValueError("Broker order state requires a stable order id") + + cur.execute( + """ + SELECT status, filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at + FROM broker_order_state + WHERE local_order_id = %s + FOR UPDATE + """, + (details["local_order_id"],), + ) + row = cur.fetchone() + previous = { + "status": row[0], + "filled_qty": float(row[1] or 0.0), + "accounted_fill_qty": float(row[2] or 0.0), + "needs_reconciliation": bool(row[3]), + "last_checked_at": row[4], + } if row else { + "status": None, + "filled_qty": 0.0, + "accounted_fill_qty": 0.0, + "needs_reconciliation": False, + "last_checked_at": None, + } + + logical_ts = logical_time or checked_at + cur.execute( + """ + INSERT INTO broker_order_state ( + local_order_id, + user_id, + run_id, + logical_time, + broker, + symbol, + side, + broker_order_id, + requested_qty, + filled_qty, + accounted_fill_qty, + requested_price, + average_price, + status, + broker_status, + status_message, + needs_reconciliation, + last_checked_at, + created_at, + updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (local_order_id) DO UPDATE + SET broker_order_id = EXCLUDED.broker_order_id, + requested_qty = EXCLUDED.requested_qty, + filled_qty = EXCLUDED.filled_qty, + requested_price = EXCLUDED.requested_price, + average_price = EXCLUDED.average_price, + status = EXCLUDED.status, + broker_status = EXCLUDED.broker_status, + status_message = EXCLUDED.status_message, + needs_reconciliation = EXCLUDED.needs_reconciliation, + last_checked_at = EXCLUDED.last_checked_at, + updated_at = EXCLUDED.updated_at + """, + ( + details["local_order_id"], + user_id, + run_id, + logical_ts, + broker_name, + details["symbol"], + details["side"], + details["broker_order_id"], + details["requested_qty"], + details["filled_qty"], + previous["accounted_fill_qty"], + details["requested_price"], + details["average_price"], + details["status"], + details["broker_status"], + details["status_message"], + details["needs_reconciliation"], + checked_at, + checked_at, + checked_at, + ), + ) + + details["accounted_fill_qty"] = previous["accounted_fill_qty"] + details["delta_fill_qty"] = max(details["filled_qty"] - previous["accounted_fill_qty"], 0.0) + details["previous_status"] = previous["status"] + details["previous_filled_qty"] = previous["filled_qty"] + details["previous_needs_reconciliation"] = previous["needs_reconciliation"] + return details + + +def _mark_accounted_fill(cur, local_order_id: str, accounted_fill_qty: float, checked_at: datetime): + cur.execute( + """ + UPDATE broker_order_state + SET accounted_fill_qty = %s, + updated_at = %s + WHERE local_order_id = %s + """, + (accounted_fill_qty, checked_at, local_order_id), + ) + + +def _apply_fill_delta_to_state(state: dict, order_details: dict): + delta_qty = max(float(order_details.get("delta_fill_qty") or 0.0), 0.0) + average_price = float(order_details.get("average_price") or order_details.get("requested_price") or 0.0) + symbol = str(order_details.get("symbol") or "").upper() + delta_amount = delta_qty * average_price + + if delta_qty <= 0: + return {"nifty_units": 0.0, "gold_units": 0.0, "amount": 0.0} + + nifty_units = 0.0 + gold_units = 0.0 + if symbol.startswith("NIFTYBEES"): + state["nifty_units"] += delta_qty + nifty_units = delta_qty + elif symbol.startswith("GOLDBEES"): + state["gold_units"] += delta_qty + gold_units = delta_qty + state["total_invested"] += delta_amount + return {"nifty_units": nifty_units, "gold_units": gold_units, "amount": delta_amount} + + +def _load_cycle_order_rows(cur, logical_time): + user_id, run_id = _current_scope() + cur.execute( + """ + SELECT local_order_id, symbol, side, requested_qty, filled_qty, accounted_fill_qty, + requested_price, average_price, status, broker_status, status_message, + needs_reconciliation, broker_order_id + FROM broker_order_state + WHERE user_id = %s AND run_id = %s AND logical_time = %s + ORDER BY local_order_id + """, + (user_id, run_id, logical_time), + ) + rows = [] + for row in cur.fetchall(): + rows.append( + { + "local_order_id": row[0], + "symbol": row[1], + "side": row[2], + "requested_qty": float(row[3] or 0.0), + "filled_qty": float(row[4] or 0.0), + "accounted_fill_qty": float(row[5] or 0.0), + "requested_price": float(row[6] or 0.0), + "average_price": float(row[7] or 0.0), + "status": row[8], + "broker_status": row[9], + "status_message": row[10], + "needs_reconciliation": bool(row[11]), + "broker_order_id": row[12], + } + ) + return rows + + +def _aggregate_cycle_orders(orders: list[dict]): + summary = { + "nifty_units": 0.0, + "gold_units": 0.0, + "nifty_price": 0.0, + "gold_price": 0.0, + "amount": 0.0, + "has_any_fill": False, + "all_filled": bool(orders), + "has_unresolved": False, + } + for order in orders: + symbol = str(order.get("symbol") or "").upper() + filled_qty = float(order.get("filled_qty") or 0.0) + requested_qty = float(order.get("requested_qty") or 0.0) + avg_price = float(order.get("average_price") or order.get("requested_price") or 0.0) + status = str(order.get("status") or "") + needs_reconciliation = bool(order.get("needs_reconciliation")) + if status != LOCAL_ORDER_FILLED: + summary["all_filled"] = False + if needs_reconciliation or status in _RECONCILEABLE_ORDER_STATES: + summary["has_unresolved"] = True + if filled_qty <= 0: + continue + summary["has_any_fill"] = True + summary["amount"] += filled_qty * avg_price + if symbol.startswith("NIFTYBEES"): + summary["nifty_units"] += filled_qty + summary["nifty_price"] = avg_price or summary["nifty_price"] + elif symbol.startswith("GOLDBEES"): + summary["gold_units"] += filled_qty + summary["gold_price"] = avg_price or summary["gold_price"] + if requested_qty > 0 and filled_qty < requested_qty: + summary["all_filled"] = False + return summary + + +def _record_reconciliation_event(cur, event: str, *, logical_time, payload: dict, ts: datetime): + if event_exists(event, logical_time, cur=cur): + return + if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}: + log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time) + return + insert_engine_event(cur, event, data=payload, ts=ts) + + +def _advance_cycle_state(cur, *, state: dict, logical_time, cycle_orders: list[dict], event_ts: datetime, failure_reason: str | None = None): + cycle_summary = _aggregate_cycle_orders(cycle_orders) + if cycle_orders or failure_reason: + state["last_run"] = event_ts.isoformat() + + if cycle_summary["all_filled"]: + state["last_sip_ts"] = event_ts.isoformat() + _record_reconciliation_event( + cur, + "SIP_EXECUTED", + logical_time=logical_time, + payload={ + "nifty_units": cycle_summary["nifty_units"], + "gold_units": cycle_summary["gold_units"], + "nifty_price": cycle_summary["nifty_price"], + "gold_price": cycle_summary["gold_price"], + "amount": cycle_summary["amount"], + }, + ts=event_ts, + ) + return True + + if cycle_summary["has_any_fill"] and not cycle_summary["has_unresolved"]: + _record_reconciliation_event( + cur, + "SIP_PARTIAL", + logical_time=logical_time, + payload={ + "nifty_units": cycle_summary["nifty_units"], + "gold_units": cycle_summary["gold_units"], + "nifty_price": cycle_summary["nifty_price"], + "gold_price": cycle_summary["gold_price"], + "amount": cycle_summary["amount"], + "reason": "partial_fill", + }, + ts=event_ts, + ) + return False + + if cycle_summary["has_unresolved"]: + _record_reconciliation_event( + cur, + "ORDER_RECONCILIATION_PENDING", + logical_time=logical_time, + payload={ + "logical_time": logical_time.isoformat(), + "orders": cycle_orders, + }, + ts=event_ts, + ) + return False + + _record_reconciliation_event( + cur, + "SIP_NO_FILL", + logical_time=logical_time, + payload={ + "reason": failure_reason or "no_fill", + "orders": cycle_orders, + }, + ts=event_ts, + ) + return False + + +def _list_orders_to_reconcile(cur, *, checked_before: datetime): + user_id, run_id = _current_scope() + cur.execute( + """ + SELECT local_order_id, logical_time, broker, symbol, side, broker_order_id, requested_qty, + requested_price, average_price, status, broker_status, status_message, + filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at, created_at + FROM broker_order_state + WHERE user_id = %s + AND run_id = %s + AND needs_reconciliation = TRUE + AND (last_checked_at IS NULL OR last_checked_at <= %s) + ORDER BY logical_time ASC, created_at ASC + """, + (user_id, run_id, checked_before), + ) + rows = [] + for row in cur.fetchall(): + rows.append( + { + "local_order_id": row[0], + "logical_time": row[1], + "broker": row[2], + "symbol": row[3], + "side": row[4], + "broker_order_id": row[5], + "requested_qty": float(row[6] or 0.0), + "requested_price": float(row[7] or 0.0), + "average_price": float(row[8] or 0.0), + "status": row[9], + "broker_status": row[10], + "status_message": row[11], + "filled_qty": float(row[12] or 0.0), + "accounted_fill_qty": float(row[13] or 0.0), + "needs_reconciliation": bool(row[14]), + "last_checked_at": row[15], + "created_at": row[16], + "id": row[0], + } + ) + return rows + + +def reconcile_live_orders(*, broker: Broker, mode: str | None = "LIVE", now_ts: datetime | None = None): + checked_at = _normalize_now(now_ts or _utc_now()) + blocked = False + summaries: list[dict] = [] + + pending_orders = run_with_retry( + lambda cur, _conn: _list_orders_to_reconcile( + cur, + checked_before=checked_at - RECONCILIATION_INTERVAL, + ) + ) + if not pending_orders: + return {"blocked": False, "orders": [], "summaries": []} + + def _op(cur, _conn): + state = load_state(mode=mode, cur=cur, for_update=True) + insert_engine_event( + cur, + "ORDER_RECONCILIATION_STARTED", + data={"count": len(pending_orders)}, + ts=checked_at, + ) + + nonlocal blocked + for pending in pending_orders: + refreshed = broker.refresh_order_status(pending) + refreshed["logical_time"] = pending["logical_time"] + details = _upsert_broker_order_state( + cur, + broker_name=str(pending.get("broker") or ""), + logical_time=pending["logical_time"], + order=refreshed, + checked_at=checked_at, + ) + + if details["status"] != pending.get("status"): + insert_engine_event( + cur, + "ORDER_RECONCILIATION_STATUS_CHANGED", + data={ + "order_id": details["local_order_id"], + "from": pending.get("status"), + "to": details["status"], + }, + ts=checked_at, + ) + + if details["status"] == LOCAL_ORDER_PARTIAL: + insert_engine_event( + cur, + "ORDER_PARTIAL_FILL_DETECTED", + data={ + "order_id": details["local_order_id"], + "filled_qty": details["filled_qty"], + "requested_qty": details["requested_qty"], + }, + ts=checked_at, + ) + + applied = _apply_fill_delta_to_state(state, details) + if details["delta_fill_qty"] > 0: + _mark_accounted_fill( + cur, + details["local_order_id"], + details["accounted_fill_qty"] + details["delta_fill_qty"], + checked_at, + ) + + cycle_orders = _load_cycle_order_rows(cur, pending["logical_time"]) + executed = _advance_cycle_state( + cur, + state=state, + logical_time=pending["logical_time"], + cycle_orders=cycle_orders, + event_ts=checked_at, + ) + + unresolved = any(order["needs_reconciliation"] for order in cycle_orders) + if unresolved: + oldest_ts = min( + [ + pending.get("created_at") or checked_at + ] + + [checked_at] + ) + if checked_at - oldest_ts >= RECONCILIATION_TIMEOUT: + insert_engine_event( + cur, + "ORDER_RECONCILIATION_TIMEOUT", + data={ + "logical_time": pending["logical_time"].isoformat(), + "orders": cycle_orders, + }, + ts=checked_at, + ) + blocked = True + + summaries.append( + { + "logical_time": pending["logical_time"], + "executed": executed, + "applied_amount": applied["amount"], + "unresolved": unresolved, + } + ) + + save_state( + state, + mode=mode, + cur=cur, + emit_event=True, + event_meta={"source": "live_reconciliation"}, + ) + insert_engine_event( + cur, + "ORDER_RECONCILIATION_COMPLETE", + data={"blocked": blocked, "count": len(pending_orders)}, + ts=checked_at, + ) + return {"blocked": blocked} + + run_with_retry(_op) + return {"blocked": blocked, "orders": pending_orders, "summaries": summaries} + + def _record_live_order_events(cur, orders, event_ts): for order in orders: insert_engine_event(cur, "ORDER_PLACED", data=order, ts=event_ts) @@ -272,19 +792,53 @@ def _finalize_live_execution( ): def _op(cur, _conn): state = load_state(mode=mode, cur=cur, for_update=True) - _record_live_order_events(cur, orders, now_ts) - - applied = _apply_filled_orders_to_state(state, orders) - executed = applied["amount"] > 0 + broker_name = "" + for order in orders: + if not broker_name: + broker_name = str(order.get("broker") or "").strip().upper() + order.setdefault("logical_time", logical_time) + _record_live_order_events(cur, [order], now_ts) + details = _upsert_broker_order_state( + cur, + broker_name=broker_name, + logical_time=logical_time, + order=order, + checked_at=now_ts, + ) + if details["status"] == LOCAL_ORDER_PARTIAL: + insert_engine_event( + cur, + "ORDER_PARTIAL_FILL_DETECTED", + data={ + "order_id": details["local_order_id"], + "filled_qty": details["filled_qty"], + "requested_qty": details["requested_qty"], + }, + ts=now_ts, + ) + if details["delta_fill_qty"] > 0: + _apply_fill_delta_to_state(state, details) + _mark_accounted_fill( + cur, + details["local_order_id"], + details["accounted_fill_qty"] + details["delta_fill_qty"], + now_ts, + ) if funds_after is not None: cash_after = funds_after.get("cash") if cash_after is not None: state["cash"] = float(cash_after) - if executed: - state["last_run"] = now_ts.isoformat() - state["last_sip_ts"] = now_ts.isoformat() + cycle_orders = _load_cycle_order_rows(cur, logical_time) + executed = _advance_cycle_state( + cur, + state=state, + logical_time=logical_time, + cycle_orders=cycle_orders, + event_ts=now_ts, + failure_reason=failure_reason or ("broker_auth_expired" if auth_failed else None), + ) save_state( state, @@ -294,31 +848,6 @@ def _finalize_live_execution( event_meta={"source": "sip_live"}, ) - if executed: - log_event( - "SIP_EXECUTED", - { - "nifty_units": applied["nifty_units"], - "gold_units": applied["gold_units"], - "nifty_price": sp_price_val, - "gold_price": gd_price_val, - "amount": applied["amount"], - }, - cur=cur, - ts=now_ts, - logical_time=logical_time, - ) - else: - insert_engine_event( - cur, - "SIP_NO_FILL", - data={ - "reason": failure_reason or ("broker_auth_expired" if auth_failed else "no_fill"), - "orders": orders, - }, - ts=now_ts, - ) - return state, executed return run_with_retry(_op) @@ -340,6 +869,10 @@ def _try_execute_sip_live( if not market_open or broker is None: return load_state(mode=mode), False + reconciliation = reconcile_live_orders(broker=broker, mode=mode, now_ts=now_ts) + if reconciliation.get("blocked"): + return load_state(mode=mode), False + sp_price_val = _as_float(sp_price) gd_price_val = _as_float(gd_price) eq_w_val = _as_float(eq_w) @@ -367,6 +900,7 @@ def _try_execute_sip_live( funds_after = None failure_reason = None auth_failed = False + broker_name = getattr(broker, "__class__", type(broker)).__name__.replace("Live", "").replace("Broker", "").upper() try: funds_before = broker.get_funds() @@ -384,6 +918,8 @@ def _try_execute_sip_live( logical_time=logical_time, ) ) + orders[-1]["broker"] = broker_name + orders[-1]["logical_time"] = logical_time if gold_qty > 0: orders.append( broker.place_order( @@ -394,6 +930,8 @@ def _try_execute_sip_live( logical_time=logical_time, ) ) + orders[-1]["broker"] = broker_name + orders[-1]["logical_time"] = logical_time funds_after = broker.get_funds() except BrokerAuthExpired: auth_failed = True diff --git a/indian_paper_trading_strategy/engine/runner.py b/indian_paper_trading_strategy/engine/runner.py index e2ec579..0e96168 100644 --- a/indian_paper_trading_strategy/engine/runner.py +++ b/indian_paper_trading_strategy/engine/runner.py @@ -12,7 +12,7 @@ from indian_paper_trading_strategy.engine.market import ( market_now, market_session, ) -from indian_paper_trading_strategy.engine.execution import try_execute_sip +from indian_paper_trading_strategy.engine.execution import reconcile_live_orders, try_execute_sip from indian_paper_trading_strategy.engine.broker import ( BrokerAuthExpired, LiveGrowwBroker, @@ -279,7 +279,7 @@ def _pause_for_auth_expiry( cur.execute( """ UPDATE strategy_run - SET status = 'STOPPED', + SET status = 'PAUSED_AUTH_EXPIRED', stopped_at = %s, meta = COALESCE(meta, '{}'::jsonb) || %s WHERE user_id = %s AND run_id = %s @@ -296,10 +296,10 @@ def _pause_for_auth_expiry( _set_state( user_id, run_id, - state="STOPPED", + state="PAUSED_AUTH_EXPIRED", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z", ) - _update_engine_status(user_id, run_id, "STOPPED") + _update_engine_status(user_id, run_id, "PAUSED_AUTH_EXPIRED") log_event( event="BROKER_AUTH_EXPIRED", message="Broker authentication expired", @@ -317,8 +317,8 @@ def _pause_for_auth_expiry( meta={"reason": reason}, ) emit_event_cb( - event="STRATEGY_STOPPED", - message="Strategy stopped", + event="STRATEGY_BLOCKED", + message="Strategy blocked until broker reconnect", meta={"reason": "broker_auth_expired"}, ) @@ -440,12 +440,40 @@ def _engine_loop(config, stop_event: threading.Event): last_run = _last_execution_anchor(state, mode) is_first_run = last_run is None now = market_now() - debug_event( - "ENGINE_LOOP_TICK", - "engine loop tick", - {"now": now.isoformat(), "frequency": frequency_label}, - ) - + debug_event( + "ENGINE_LOOP_TICK", + "engine loop tick", + {"now": now.isoformat(), "frequency": frequency_label}, + ) + + if getattr(broker, "external_orders", False): + try: + reconciliation = reconcile_live_orders( + broker=broker, + mode=mode, + now_ts=now, + ) + if reconciliation.get("blocked"): + debug_event( + "ORDER_RECONCILIATION_BLOCKED", + "Unresolved broker orders are still being reconciled", + {"now": now.isoformat()}, + ) + except BrokerAuthExpired as exc: + _pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb) + exit_reason = "AUTH_EXPIRED" + break + except Exception as exc: + debug_event( + "ORDER_RECONCILIATION_ERROR", + "broker order reconciliation failed", + {"error": str(exc)}, + ) + if not sleep_with_heartbeat(15, stop_event, scope_user, scope_run, owner_id): + exit_reason = "LEASE_LOST" + break + continue + if last_run and not is_first_run: parsed_last_run = _parse_market_timestamp(last_run) next_run = align_to_market_open(parsed_last_run + delta) if parsed_last_run else None diff --git a/start_all.ps1 b/start_all.ps1 index 26ad2d1..77a0655 100644 --- a/start_all.ps1 +++ b/start_all.ps1 @@ -7,6 +7,8 @@ $python = Join-Path $root ".venv\\Scripts\\python.exe" $status = [ordered]@{} $engineExternal = $env:ENGINE_EXTERNAL -and $env:ENGINE_EXTERNAL.ToLower() -in @("1", "true", "yes") +if (-not $env:APP_ENV) { $env:APP_ENV = "development" } + if (-not $env:DB_HOST) { $env:DB_HOST = "localhost" } if (-not $env:DB_PORT) { $env:DB_PORT = "5432" } if (-not $env:DB_NAME) { $env:DB_NAME = "trading_db" } @@ -17,6 +19,12 @@ if (-not $env:PGPORT) { $env:PGPORT = $env:DB_PORT } if (-not $env:PGDATABASE) { $env:PGDATABASE = $env:DB_NAME } if (-not $env:PGUSER) { $env:PGUSER = $env:DB_USER } if (-not $env:PGPASSWORD) { $env:PGPASSWORD = $env:DB_PASSWORD } +if (-not $env:BROKER_TOKEN_KEY -and $env:APP_ENV.ToLower() -in @("development", "dev", "test", "testing", "local")) { + $env:BROKER_TOKEN_KEY = (& $python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode('utf-8'))").Trim() +} +if (-not $env:RESET_OTP_SECRET -and $env:APP_ENV.ToLower() -in @("development", "dev", "test", "testing", "local")) { + $env:RESET_OTP_SECRET = "dev-reset-otp-secret" +} function Write-Status { param( @@ -130,7 +138,7 @@ Write-Status "Frontend" $true "pid $($frontendProc.Id)" } | ConvertTo-Json | Set-Content -Encoding ascii $pidFile Write-Host "Waiting for backend health..." -$healthUrl = "http://localhost:8000/health" +$healthUrl = "http://localhost:8000/health/ready" $deadline = [DateTime]::UtcNow.AddMinutes(2) $healthy = $false while ([DateTime]::UtcNow -lt $deadline) {