Fix broker reconciliation, health readiness, and live equity safety
This commit is contained in:
parent
519addd78f
commit
9c171ba799
@ -68,9 +68,13 @@ def set_user_role(actor_id: str, target_id: str, new_role: str):
|
||||
|
||||
|
||||
def bootstrap_super_admin():
|
||||
enabled = (os.getenv("ENABLE_SUPER_ADMIN_BOOTSTRAP") or "").strip().lower() in {"1", "true", "yes"}
|
||||
if not enabled:
|
||||
return
|
||||
|
||||
email = (os.getenv("SUPER_ADMIN_EMAIL") or "").strip()
|
||||
if not email:
|
||||
return
|
||||
raise RuntimeError("SUPER_ADMIN_EMAIL must be configured when bootstrap is enabled")
|
||||
|
||||
existing = get_user_by_username(email)
|
||||
if existing:
|
||||
|
||||
@ -136,6 +136,27 @@ def clear_user_broker(user_id: str):
|
||||
cur.execute("DELETE FROM user_broker WHERE user_id = %s", (user_id,))
|
||||
|
||||
|
||||
def disconnect_user_broker(user_id: str):
|
||||
with db_transaction() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO user_broker (user_id, connected, auth_state, access_token, connected_at)
|
||||
VALUES (%s, FALSE, 'DISCONNECTED', NULL, NULL)
|
||||
ON CONFLICT (user_id)
|
||||
DO UPDATE SET
|
||||
connected = FALSE,
|
||||
access_token = NULL,
|
||||
connected_at = NULL,
|
||||
auth_state = 'DISCONNECTED',
|
||||
pending_broker = NULL,
|
||||
pending_api_key = NULL,
|
||||
pending_api_secret = NULL,
|
||||
pending_started_at = NULL
|
||||
""",
|
||||
(user_id,),
|
||||
)
|
||||
|
||||
|
||||
def expire_user_broker_session(user_id: str):
|
||||
with db_transaction() as cur:
|
||||
cur.execute(
|
||||
|
||||
@ -430,6 +430,7 @@ class LiveEquitySnapshot(Base):
|
||||
captured_at = Column(DateTime(timezone=True), nullable=False)
|
||||
cash_value = Column(Numeric, nullable=False)
|
||||
holdings_value = Column(Numeric, nullable=False)
|
||||
positions_adjustment_value = Column(Numeric, nullable=False, server_default=text("0"))
|
||||
total_value = Column(Numeric, nullable=False)
|
||||
|
||||
__table_args__ = (
|
||||
@ -443,6 +444,48 @@ class LiveEquitySnapshot(Base):
|
||||
)
|
||||
|
||||
|
||||
class BrokerOrderState(Base):
|
||||
__tablename__ = "broker_order_state"
|
||||
|
||||
local_order_id = Column(String, primary_key=True)
|
||||
user_id = Column(String, ForeignKey("app_user.id", ondelete="CASCADE"), nullable=False)
|
||||
run_id = Column(String, ForeignKey("strategy_run.run_id", ondelete="CASCADE"), nullable=False)
|
||||
logical_time = Column(DateTime(timezone=True), nullable=False)
|
||||
broker = Column(Text, nullable=False)
|
||||
symbol = Column(Text, nullable=False)
|
||||
side = Column(Text, nullable=False)
|
||||
broker_order_id = Column(Text)
|
||||
requested_qty = Column(Numeric, nullable=False)
|
||||
filled_qty = Column(Numeric, nullable=False, server_default=text("0"))
|
||||
accounted_fill_qty = Column(Numeric, nullable=False, server_default=text("0"))
|
||||
requested_price = Column(Numeric)
|
||||
average_price = Column(Numeric)
|
||||
status = Column(Text, nullable=False)
|
||||
broker_status = Column(Text)
|
||||
status_message = Column(Text)
|
||||
needs_reconciliation = Column(Boolean, nullable=False, server_default=text("false"))
|
||||
last_checked_at = Column(DateTime(timezone=True))
|
||||
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
|
||||
updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
|
||||
|
||||
__table_args__ = (
|
||||
CheckConstraint(
|
||||
"status IN ('PENDING','PARTIAL','FILLED','REJECTED','CANCELLED','UNKNOWN')",
|
||||
name="chk_broker_order_state_status",
|
||||
),
|
||||
Index("idx_broker_order_state_user_run_status", "user_id", "run_id", "status"),
|
||||
Index(
|
||||
"idx_broker_order_state_reconcile",
|
||||
"user_id",
|
||||
"run_id",
|
||||
"needs_reconciliation",
|
||||
"last_checked_at",
|
||||
),
|
||||
Index("idx_broker_order_state_broker_order", "broker_order_id"),
|
||||
Index("idx_broker_order_state_logical_time", "run_id", "logical_time"),
|
||||
)
|
||||
|
||||
|
||||
class SupportTicket(Base):
|
||||
__tablename__ = "support_ticket"
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from fastapi import FastAPI
|
||||
@ -85,9 +86,64 @@ def _build_cors_origins() -> list[str]:
|
||||
return deduped
|
||||
|
||||
|
||||
def _validate_runtime_secrets():
|
||||
env_name = _environment_name()
|
||||
if env_name not in PRODUCTION_ENV_NAMES:
|
||||
return
|
||||
broker_token_key = (os.getenv("BROKER_TOKEN_KEY") or "").strip()
|
||||
if not broker_token_key:
|
||||
raise RuntimeError("BROKER_TOKEN_KEY must be configured in production")
|
||||
if (os.getenv("ENABLE_SUPER_ADMIN_BOOTSTRAP") or "").strip() in {"1", "true", "yes"}:
|
||||
if not (os.getenv("SUPER_ADMIN_EMAIL") or "").strip():
|
||||
raise RuntimeError("SUPER_ADMIN_EMAIL must be configured when bootstrap is enabled")
|
||||
if not (os.getenv("SUPER_ADMIN_PASSWORD") or "").strip():
|
||||
raise RuntimeError("SUPER_ADMIN_PASSWORD must be configured when bootstrap is enabled")
|
||||
|
||||
|
||||
def _initialize_app_state(app: FastAPI):
|
||||
app.state.startup_complete = False
|
||||
app.state.startup_error = None
|
||||
app.state.startup_started = False
|
||||
app.state.background_warnings = {}
|
||||
|
||||
|
||||
def _run_startup_tasks(app: FastAPI):
|
||||
if os.getenv("DISABLE_STARTUP_TASKS", "0") == "1":
|
||||
app.state.startup_started = True
|
||||
app.state.startup_complete = True
|
||||
app.state.startup_error = None
|
||||
return
|
||||
|
||||
app.state.startup_started = True
|
||||
app.state.startup_complete = False
|
||||
app.state.startup_error = None
|
||||
try:
|
||||
init_log_state()
|
||||
bootstrap_super_admin()
|
||||
resume_running_runs()
|
||||
app.state.startup_complete = True
|
||||
except Exception as exc:
|
||||
app.state.startup_error = str(exc)
|
||||
print(f"[STARTUP] critical startup task failed: {exc}", flush=True)
|
||||
|
||||
try:
|
||||
start_live_equity_snapshot_daemon()
|
||||
except Exception as exc:
|
||||
app.state.background_warnings["live_equity_snapshot_daemon"] = str(exc)
|
||||
print(f"[STARTUP] live equity snapshot daemon failed to start: {exc}", flush=True)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _lifespan(app: FastAPI):
|
||||
_initialize_app_state(app)
|
||||
_run_startup_tasks(app)
|
||||
yield
|
||||
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
_validate_db_config()
|
||||
app = FastAPI(title="QuantFortune Backend", version="1.0")
|
||||
_validate_runtime_secrets()
|
||||
app = FastAPI(title="QuantFortune Backend", version="1.0", lifespan=_lifespan)
|
||||
cors_origins = _build_cors_origins()
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
@ -111,15 +167,6 @@ def create_app() -> FastAPI:
|
||||
app.include_router(support_ticket_router)
|
||||
app.include_router(password_reset_router)
|
||||
|
||||
@app.on_event("startup")
|
||||
def init_app_state():
|
||||
if os.getenv("DISABLE_STARTUP_TASKS", "0") == "1":
|
||||
return
|
||||
init_log_state()
|
||||
bootstrap_super_admin()
|
||||
resume_running_runs()
|
||||
start_live_equity_snapshot_daemon()
|
||||
|
||||
return app
|
||||
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@ from fastapi import APIRouter, HTTPException, Query, Request
|
||||
from fastapi.responses import RedirectResponse
|
||||
|
||||
from app.broker_store import (
|
||||
clear_user_broker,
|
||||
disconnect_user_broker,
|
||||
expire_user_broker_session,
|
||||
get_broker_credentials,
|
||||
get_pending_broker,
|
||||
@ -34,6 +34,7 @@ from app.services.groww_service import (
|
||||
)
|
||||
from app.services.groww_storage import get_session as get_groww_session
|
||||
from app.services.live_equity_service import capture_live_equity_snapshot, get_live_equity_curve
|
||||
from app.services.strategy_service import block_live_strategy_for_broker_disconnect
|
||||
from app.services.zerodha_service import (
|
||||
KiteApiError,
|
||||
KiteTokenError,
|
||||
@ -405,15 +406,19 @@ async def broker_status(request: Request):
|
||||
@router.post("/disconnect")
|
||||
async def disconnect_broker(request: Request):
|
||||
user = _require_user(request)
|
||||
clear_user_broker(user["id"])
|
||||
strategy_update = block_live_strategy_for_broker_disconnect(user["id"], reason="broker_disconnected")
|
||||
disconnect_user_broker(user["id"])
|
||||
clear_zerodha_session(user["id"])
|
||||
set_broker_auth_state(user["id"], "DISCONNECTED")
|
||||
try:
|
||||
body = "Your broker connection has been disconnected from Quantfortune."
|
||||
send_email_async(user["username"], "Broker disconnected", body)
|
||||
except Exception:
|
||||
pass
|
||||
return {"connected": False}
|
||||
return {
|
||||
"connected": False,
|
||||
"brokerState": "DISCONNECTED",
|
||||
"strategy": strategy_update,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/zerodha/login")
|
||||
@ -749,6 +754,10 @@ async def broker_equity_curve(request: Request, from_: str = Query("", alias="fr
|
||||
normalize_zerodha_holding(item)
|
||||
for item in fetch_zerodha_holdings(session["api_key"], session["access_token"])
|
||||
]
|
||||
positions = [
|
||||
normalize_zerodha_position(item)
|
||||
for item in fetch_zerodha_positions(session["api_key"], session["access_token"])
|
||||
]
|
||||
raw_funds = fetch_zerodha_funds(session["api_key"], session["access_token"])
|
||||
funds_data = {**(raw_funds.get("equity", {}) or {}), "raw": raw_funds}
|
||||
except KiteApiError as exc:
|
||||
@ -759,6 +768,7 @@ async def broker_equity_curve(request: Request, from_: str = Query("", alias="fr
|
||||
raise HTTPException(status_code=400, detail="Groww is not connected")
|
||||
try:
|
||||
holdings = _fetch_normalized_groww_holdings(session["access_token"])
|
||||
positions = _fetch_normalized_groww_positions(session["access_token"])
|
||||
funds_data = _normalize_groww_funds(fetch_groww_funds(session["access_token"]))
|
||||
except GrowwApiError as exc:
|
||||
_raise_groww_error(user["id"], exc)
|
||||
@ -768,6 +778,7 @@ async def broker_equity_curve(request: Request, from_: str = Query("", alias="fr
|
||||
capture_live_equity_snapshot(
|
||||
user["id"],
|
||||
holdings=holdings,
|
||||
positions=positions,
|
||||
funds_data=funds_data,
|
||||
)
|
||||
|
||||
|
||||
@ -1,12 +1,44 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
|
||||
from app.services.db import health_check
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def _startup_payload(request: Request) -> dict:
|
||||
return {
|
||||
"started": bool(getattr(request.app.state, "startup_started", False)),
|
||||
"complete": bool(getattr(request.app.state, "startup_complete", False)),
|
||||
"error": getattr(request.app.state, "startup_error", None),
|
||||
"warnings": dict(getattr(request.app.state, "background_warnings", {}) or {}),
|
||||
}
|
||||
|
||||
|
||||
def _readiness_payload(request: Request) -> tuple[bool, dict]:
|
||||
db_ok = health_check()
|
||||
startup = _startup_payload(request)
|
||||
ready = db_ok and startup["complete"] and not startup["error"]
|
||||
payload = {
|
||||
"status": "ok" if ready else "not_ready",
|
||||
"db": "ok" if db_ok else "unavailable",
|
||||
"startup": startup,
|
||||
}
|
||||
return ready, payload
|
||||
|
||||
|
||||
@router.get("/health/live")
|
||||
def liveness():
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@router.get("/health/ready")
|
||||
def readiness(request: Request):
|
||||
ready, payload = _readiness_payload(request)
|
||||
if not ready:
|
||||
raise HTTPException(status_code=503, detail=payload)
|
||||
return payload
|
||||
|
||||
|
||||
@router.get("/health")
|
||||
def health():
|
||||
if not health_check():
|
||||
raise HTTPException(status_code=503, detail="db_unavailable")
|
||||
return {"status": "ok", "db": "ok"}
|
||||
def health(request: Request):
|
||||
return readiness(request)
|
||||
|
||||
@ -11,16 +11,20 @@ from app.services.groww_service import (
|
||||
GrowwApiError,
|
||||
fetch_funds as fetch_groww_funds,
|
||||
fetch_holdings as fetch_groww_holdings,
|
||||
fetch_positions as fetch_groww_positions,
|
||||
normalize_holding as normalize_groww_holding,
|
||||
normalize_position as normalize_groww_position,
|
||||
)
|
||||
from app.services.groww_storage import get_session as get_groww_session
|
||||
from app.services.zerodha_service import (
|
||||
KiteApiError,
|
||||
fetch_funds as fetch_zerodha_funds,
|
||||
fetch_holdings as fetch_zerodha_holdings,
|
||||
fetch_positions as fetch_zerodha_positions,
|
||||
holding_effective_quantity,
|
||||
holding_last_price,
|
||||
normalize_holding as normalize_zerodha_holding,
|
||||
normalize_position as normalize_zerodha_position,
|
||||
)
|
||||
from app.services.zerodha_storage import get_session as get_zerodha_session
|
||||
|
||||
@ -81,6 +85,63 @@ def _extract_holdings_value(holdings: list[dict] | None) -> float:
|
||||
return total
|
||||
|
||||
|
||||
def _position_signed_quantity(item: dict | None) -> float:
|
||||
entry = item or {}
|
||||
return _first_numeric(
|
||||
entry.get("net_quantity"),
|
||||
entry.get("signed_quantity"),
|
||||
default=0.0,
|
||||
)
|
||||
|
||||
|
||||
def _symbol_key(item: dict | None) -> tuple[str, str]:
|
||||
entry = item or {}
|
||||
symbol = str(
|
||||
entry.get("symbol")
|
||||
or entry.get("tradingsymbol")
|
||||
or entry.get("trading_symbol")
|
||||
or ""
|
||||
).strip().upper()
|
||||
exchange = str(entry.get("exchange") or "NSE").strip().upper()
|
||||
return symbol, exchange
|
||||
|
||||
|
||||
def _extract_positions_adjustment_value(
|
||||
holdings: list[dict] | None,
|
||||
positions: list[dict] | None,
|
||||
) -> float:
|
||||
holdings_map: dict[tuple[str, str], dict] = {}
|
||||
for item in holdings or []:
|
||||
key = _symbol_key(item)
|
||||
holdings_map[key] = {
|
||||
"settled_qty": _first_numeric(item.get("settled_quantity"), item.get("quantity"), default=0.0),
|
||||
"t1_qty": _first_numeric(item.get("t1_quantity"), default=0.0),
|
||||
"effective_qty": holding_effective_quantity(item),
|
||||
"last_price": holding_last_price(item),
|
||||
}
|
||||
|
||||
adjustment = 0.0
|
||||
for item in positions or []:
|
||||
signed_qty = _position_signed_quantity(item)
|
||||
if signed_qty == 0:
|
||||
continue
|
||||
key = _symbol_key(item)
|
||||
holding_entry = holdings_map.get(
|
||||
key,
|
||||
{"settled_qty": 0.0, "t1_qty": 0.0, "effective_qty": 0.0, "last_price": holding_last_price(item)},
|
||||
)
|
||||
effective_qty = float(holding_entry.get("effective_qty") or 0.0)
|
||||
t1_qty = float(holding_entry.get("t1_qty") or 0.0)
|
||||
last_price = _first_numeric(item.get("last_price"), item.get("close_price"), holding_entry.get("last_price"), default=0.0)
|
||||
|
||||
positive_qty = max(signed_qty, 0.0)
|
||||
covered_by_t1 = min(positive_qty, t1_qty)
|
||||
net_adjustment_qty = (positive_qty - covered_by_t1) + min(signed_qty, 0.0)
|
||||
net_adjustment_qty = max(net_adjustment_qty, -effective_qty)
|
||||
adjustment += net_adjustment_qty * last_price
|
||||
return adjustment
|
||||
|
||||
|
||||
def _normalize_groww_funds(data: dict | None) -> dict:
|
||||
payload = data if isinstance(data, dict) else {}
|
||||
available = payload.get("available") if isinstance(payload.get("available"), dict) else {}
|
||||
@ -155,8 +216,9 @@ def _upsert_snapshot(
|
||||
captured_at: datetime,
|
||||
cash_value: float,
|
||||
holdings_value: float,
|
||||
positions_adjustment_value: float,
|
||||
):
|
||||
total_value = cash_value + holdings_value
|
||||
total_value = cash_value + holdings_value + positions_adjustment_value
|
||||
with db_connection() as conn:
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
@ -168,13 +230,15 @@ def _upsert_snapshot(
|
||||
captured_at,
|
||||
cash_value,
|
||||
holdings_value,
|
||||
positions_adjustment_value,
|
||||
total_value
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (user_id, snapshot_date) DO UPDATE
|
||||
SET captured_at = EXCLUDED.captured_at,
|
||||
cash_value = EXCLUDED.cash_value,
|
||||
holdings_value = EXCLUDED.holdings_value,
|
||||
positions_adjustment_value = EXCLUDED.positions_adjustment_value,
|
||||
total_value = EXCLUDED.total_value
|
||||
""",
|
||||
(
|
||||
@ -183,6 +247,7 @@ def _upsert_snapshot(
|
||||
captured_at,
|
||||
Decimal(str(round(cash_value, 2))),
|
||||
Decimal(str(round(holdings_value, 2))),
|
||||
Decimal(str(round(positions_adjustment_value, 2))),
|
||||
Decimal(str(round(total_value, 2))),
|
||||
),
|
||||
)
|
||||
@ -191,6 +256,7 @@ def _upsert_snapshot(
|
||||
"capturedAt": captured_at.isoformat(),
|
||||
"cashValue": round(cash_value, 2),
|
||||
"holdingsValue": round(holdings_value, 2),
|
||||
"positionsAdjustmentValue": round(positions_adjustment_value, 2),
|
||||
"totalValue": round(total_value, 2),
|
||||
}
|
||||
|
||||
@ -199,6 +265,7 @@ def capture_live_equity_snapshot(
|
||||
user_id: str,
|
||||
*,
|
||||
holdings: list[dict] | None = None,
|
||||
positions: list[dict] | None = None,
|
||||
funds_data: dict | None = None,
|
||||
captured_at: datetime | None = None,
|
||||
):
|
||||
@ -215,6 +282,10 @@ def capture_live_equity_snapshot(
|
||||
normalize_zerodha_holding(item)
|
||||
for item in fetch_zerodha_holdings(session["api_key"], session["access_token"])
|
||||
]
|
||||
positions = [
|
||||
normalize_zerodha_position(item)
|
||||
for item in fetch_zerodha_positions(session["api_key"], session["access_token"])
|
||||
]
|
||||
elif broker_name == "GROWW":
|
||||
session = get_groww_session(user_id)
|
||||
if not session:
|
||||
@ -223,8 +294,14 @@ def capture_live_equity_snapshot(
|
||||
normalize_groww_holding(item)
|
||||
for item in fetch_groww_holdings(session["access_token"])
|
||||
]
|
||||
positions = [
|
||||
normalize_groww_position(item)
|
||||
for item in fetch_groww_positions(session["access_token"])
|
||||
]
|
||||
else:
|
||||
return None
|
||||
elif positions is None:
|
||||
positions = []
|
||||
if funds_data is None:
|
||||
if broker_name == "ZERODHA":
|
||||
session = get_zerodha_session(user_id)
|
||||
@ -243,12 +320,14 @@ def capture_live_equity_snapshot(
|
||||
|
||||
cash_value = _extract_cash_value(funds_data)
|
||||
holdings_value = _extract_holdings_value(holdings)
|
||||
positions_adjustment_value = _extract_positions_adjustment_value(holdings, positions)
|
||||
return _upsert_snapshot(
|
||||
user_id=user_id,
|
||||
snapshot_date=_snapshot_day(captured_at),
|
||||
captured_at=captured_at,
|
||||
cash_value=cash_value,
|
||||
holdings_value=holdings_value,
|
||||
positions_adjustment_value=positions_adjustment_value,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -334,8 +334,16 @@ def _effective_running_run_id(user_id: str):
|
||||
return None
|
||||
engine_row = _get_engine_status_row(user_id, run_id)
|
||||
engine_state = str((engine_row or {}).get("status") or "").strip().upper()
|
||||
if engine_state not in {"STOPPED", "ERROR"}:
|
||||
if engine_state not in {"STOPPED", "ERROR", "PAUSED_AUTH_EXPIRED"}:
|
||||
return run_id
|
||||
if engine_state == "PAUSED_AUTH_EXPIRED":
|
||||
update_run_status(
|
||||
user_id,
|
||||
run_id,
|
||||
"PAUSED_AUTH_EXPIRED",
|
||||
meta={"reason": "broker_auth_expired", "engine_state": engine_state},
|
||||
)
|
||||
return None
|
||||
update_run_status(
|
||||
user_id,
|
||||
run_id,
|
||||
@ -350,6 +358,95 @@ def _set_run_status_or_raise(user_id: str, run_id: str, status: str, meta: dict
|
||||
if not updated:
|
||||
raise RuntimeError(f"Run {run_id} for user {user_id} no longer exists")
|
||||
|
||||
|
||||
def _get_run_row(user_id: str, run_id: str | None):
|
||||
if not user_id or not run_id:
|
||||
return None
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT status, meta, started_at, stopped_at
|
||||
FROM strategy_run
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
LIMIT 1
|
||||
""",
|
||||
(user_id, run_id),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {
|
||||
"status": row[0],
|
||||
"meta": row[1] if isinstance(row[1], dict) else {},
|
||||
"started_at": row[2],
|
||||
"stopped_at": row[3],
|
||||
}
|
||||
|
||||
|
||||
def _broker_block_state(user_id: str, cfg: dict | None):
|
||||
config = cfg or {}
|
||||
mode = str(config.get("mode") or "").strip().upper()
|
||||
if mode != "LIVE":
|
||||
return {"blocked": False, "reason": None, "broker_state": None}
|
||||
|
||||
broker_state = get_user_broker(user_id) or {}
|
||||
auth_state = str(broker_state.get("auth_state") or "").strip().upper()
|
||||
connected = bool(broker_state.get("connected"))
|
||||
broker_name = str(broker_state.get("broker") or "").strip().upper() or None
|
||||
|
||||
if not connected or auth_state == "DISCONNECTED":
|
||||
return {
|
||||
"blocked": True,
|
||||
"reason": "broker_disconnected",
|
||||
"broker_state": "DISCONNECTED",
|
||||
"broker": broker_name,
|
||||
}
|
||||
if auth_state in {"EXPIRED", "PENDING"}:
|
||||
return {
|
||||
"blocked": True,
|
||||
"reason": "broker_auth_required",
|
||||
"broker_state": auth_state or "EXPIRED",
|
||||
"broker": broker_name,
|
||||
}
|
||||
return {
|
||||
"blocked": False,
|
||||
"reason": None,
|
||||
"broker_state": auth_state or "VALID",
|
||||
"broker": broker_name,
|
||||
}
|
||||
|
||||
|
||||
def block_live_strategy_for_broker_disconnect(user_id: str, *, reason: str = "broker_disconnected"):
|
||||
running_run_id = _effective_running_run_id(user_id)
|
||||
run_id = running_run_id or get_active_run_id(user_id)
|
||||
if not run_id:
|
||||
return None
|
||||
cfg = _load_config(user_id, run_id)
|
||||
if str(cfg.get("mode") or "").strip().upper() != "LIVE":
|
||||
return None
|
||||
|
||||
stop_warning = None
|
||||
try:
|
||||
stop_engine(user_id, run_id, timeout=10.0)
|
||||
except Exception as exc:
|
||||
stop_warning = str(exc)
|
||||
print(f"[STRATEGY] stop_engine failed during broker disconnect for {user_id}/{run_id}: {exc}", flush=True)
|
||||
|
||||
deactivate_strategy_config(user_id, run_id)
|
||||
stop_run(user_id, run_id, reason=reason)
|
||||
_write_status(user_id, run_id, "STOPPED")
|
||||
_set_run_status_or_raise(
|
||||
user_id,
|
||||
run_id,
|
||||
"STOPPED",
|
||||
meta={"reason": reason, "broker_blocked": True},
|
||||
)
|
||||
result = {"run_id": run_id, "status": "STOPPED", "reason": reason}
|
||||
if stop_warning:
|
||||
result["warning"] = stop_warning
|
||||
return result
|
||||
|
||||
def validate_frequency(freq: dict, mode: str):
|
||||
if not isinstance(freq, dict):
|
||||
raise ValueError("Frequency payload is required")
|
||||
@ -814,7 +911,8 @@ def get_strategy_status(user_id: str):
|
||||
running_run_id = _effective_running_run_id(user_id)
|
||||
run_id = running_run_id or get_active_run_id(user_id)
|
||||
cfg = _load_config(user_id, run_id) if run_id else {}
|
||||
default_status = "RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE")
|
||||
run_row = _get_run_row(user_id, run_id)
|
||||
default_status = (run_row or {}).get("status") or ("RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE"))
|
||||
engine_row = None
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
@ -834,6 +932,8 @@ def get_strategy_status(user_id: str):
|
||||
engine_state = str((engine_row or [None])[0] or "").strip().upper()
|
||||
if running_run_id and engine_state in {"STOPPED", "ERROR"}:
|
||||
status["status"] = "STOPPED"
|
||||
elif engine_state == "PAUSED_AUTH_EXPIRED":
|
||||
status["status"] = "PAUSED_AUTH_EXPIRED"
|
||||
sip_frequency = cfg.get("sip_frequency")
|
||||
if not isinstance(sip_frequency, dict):
|
||||
frequency = cfg.get("frequency")
|
||||
@ -866,6 +966,11 @@ def get_strategy_status(user_id: str):
|
||||
parsed_next = parse_persisted_timestamp(next_eligible)
|
||||
if parsed_next and parsed_next > _utc_now():
|
||||
status["status"] = "WAITING"
|
||||
broker_block = _broker_block_state(user_id, cfg)
|
||||
status["broker_state"] = broker_block.get("broker_state")
|
||||
status["strategy_blocked"] = bool(broker_block.get("blocked"))
|
||||
status["strategy_block_reason"] = broker_block.get("reason")
|
||||
status["broker"] = broker_block.get("broker") or cfg.get("broker")
|
||||
status_key = (status.get("status") or "IDLE").upper()
|
||||
resumable = bool(cfg.get("strategy")) and bool(cfg.get("mode"))
|
||||
status["can_resume"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
||||
@ -1086,6 +1191,26 @@ def get_strategy_summary(user_id: str):
|
||||
}
|
||||
|
||||
issue_row = None
|
||||
block_reason = str(status.get("strategy_block_reason") or "").strip().lower()
|
||||
if block_reason == "broker_disconnected":
|
||||
summary.update(
|
||||
{
|
||||
"tone": "warning",
|
||||
"message": "Broker disconnected. Live strategy is stopped until you reconnect.",
|
||||
"event": "BROKER_DISCONNECTED",
|
||||
}
|
||||
)
|
||||
return summary
|
||||
if block_reason == "broker_auth_required" or (status.get("status") or "").upper() == "PAUSED_AUTH_EXPIRED":
|
||||
summary.update(
|
||||
{
|
||||
"tone": "warning",
|
||||
"message": "Broker session expired. Reconnect broker to resume the strategy.",
|
||||
"event": "BROKER_AUTH_EXPIRED",
|
||||
}
|
||||
)
|
||||
return summary
|
||||
|
||||
if run_id:
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
|
||||
@ -3,34 +3,29 @@ module.exports = {
|
||||
{
|
||||
name: "Quantfortune - Backend",
|
||||
cwd: "/SERVER_CLIENT/PRODUCTION/SIP_GoldBees_Backend/backend",
|
||||
|
||||
script: "/SERVER_CLIENT/PRODUCTION/SIP_GoldBees_Backend/backend/.venv/bin/uvicorn",
|
||||
args: "app.main:app --host 0.0.0.0 --port 3002",
|
||||
|
||||
interpreter: "none", // ✅ IMPORTANT
|
||||
// exec_mode: "fork", // optional
|
||||
// instances: 1, // optional
|
||||
|
||||
interpreter: "none",
|
||||
env: {
|
||||
DB_HOST: "localhost",
|
||||
DB_PORT: "5432",
|
||||
DB_NAME: "trading_db",
|
||||
DB_USER: "trader",
|
||||
DB_PASSWORD: "traderpass",
|
||||
DB_SCHEMA: "quant_app",
|
||||
|
||||
BROKER_TOKEN_KEY: "6SuYLz0n7-KM5nB_Bs6ueYgDXZZvbmf-K-WpFbOMbH4=",
|
||||
|
||||
SMTP_HOST: "smtp.gmail.com",
|
||||
SMTP_PORT: "587",
|
||||
SMTP_USER: "quantfortune@gmail.com",
|
||||
SMTP_PASS: "wkbk mwbi aiqo yvwl",
|
||||
SMTP_FROM_NAME: "Quantfortune Support",
|
||||
|
||||
SUPER_ADMIN_EMAIL: "thigazhezhilanj007@gmail.com",
|
||||
SUPER_ADMIN_PASSWORD: "AdminPass123!",
|
||||
RESET_OTP_SECRET: "change_this_secret"
|
||||
}
|
||||
}
|
||||
]
|
||||
APP_ENV: process.env.APP_ENV || "production",
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_PORT: process.env.DB_PORT,
|
||||
DB_NAME: process.env.DB_NAME,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
DB_SCHEMA: process.env.DB_SCHEMA,
|
||||
CORS_ORIGINS: process.env.CORS_ORIGINS,
|
||||
BROKER_TOKEN_KEY: process.env.BROKER_TOKEN_KEY,
|
||||
SMTP_HOST: process.env.SMTP_HOST,
|
||||
SMTP_PORT: process.env.SMTP_PORT,
|
||||
SMTP_USER: process.env.SMTP_USER,
|
||||
SMTP_PASS: process.env.SMTP_PASS,
|
||||
SMTP_FROM_NAME: process.env.SMTP_FROM_NAME,
|
||||
RESET_OTP_SECRET: process.env.RESET_OTP_SECRET,
|
||||
ENABLE_SUPER_ADMIN_BOOTSTRAP: process.env.ENABLE_SUPER_ADMIN_BOOTSTRAP,
|
||||
SUPER_ADMIN_EMAIL: process.env.SUPER_ADMIN_EMAIL,
|
||||
SUPER_ADMIN_PASSWORD: process.env.SUPER_ADMIN_PASSWORD,
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
Set-Location $PSScriptRoot
|
||||
$appEnv = if ($env:APP_ENV) { $env:APP_ENV } else { 'development' }
|
||||
if (-not $env:APP_ENV) { $env:APP_ENV = $appEnv }
|
||||
if (-not $env:DB_HOST) { $env:DB_HOST = 'localhost' }
|
||||
if (-not $env:DB_PORT) { $env:DB_PORT = '5432' }
|
||||
if (-not $env:DB_NAME) { $env:DB_NAME = 'trading_db' }
|
||||
@ -17,13 +19,13 @@ if (Test-Path $frontendUrlFile) {
|
||||
$env:ZERODHA_REDIRECT_URL = "$frontendUrl/login"
|
||||
}
|
||||
}
|
||||
if (-not $env:BROKER_TOKEN_KEY) { $env:BROKER_TOKEN_KEY = '6SuYLz0n7-KM5nB_Bs6ueYgDXZZvbmf-K-WpFbOMbH4=' }
|
||||
if (-not $env:SUPER_ADMIN_EMAIL) { $env:SUPER_ADMIN_EMAIL = 'thigazhezhilanj007@gmail.com' }
|
||||
if (-not $env:SUPER_ADMIN_PASSWORD) { $env:SUPER_ADMIN_PASSWORD = 'AdminPass123!' }
|
||||
if (-not $env:SMTP_HOST) { $env:SMTP_HOST = 'smtp.gmail.com' }
|
||||
if (-not $env:SMTP_PORT) { $env:SMTP_PORT = '587' }
|
||||
if (-not $env:SMTP_USER) { $env:SMTP_USER = 'quantfortune@gmail.com' }
|
||||
if (-not $env:SMTP_PASS) { $env:SMTP_PASS = 'wkbk mwbi aiqo yvwl' }
|
||||
if (-not $env:SMTP_FROM_NAME) { $env:SMTP_FROM_NAME = 'Quantfortune Support' }
|
||||
if (-not $env:RESET_OTP_SECRET) { $env:RESET_OTP_SECRET = 'change_this_secret' }
|
||||
if (-not $env:BROKER_TOKEN_KEY -and $env:APP_ENV -in @('development', 'dev', 'test', 'testing', 'local')) {
|
||||
$env:BROKER_TOKEN_KEY = (& .\venv\Scripts\python.exe -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode('utf-8'))").Trim()
|
||||
}
|
||||
if (-not $env:RESET_OTP_SECRET -and $env:APP_ENV -in @('development', 'dev', 'test', 'testing', 'local')) {
|
||||
$env:RESET_OTP_SECRET = 'dev-reset-otp-secret'
|
||||
}
|
||||
.\venv\Scripts\uvicorn.exe app.main:app --host 0.0.0.0 --port 8000
|
||||
|
||||
@ -101,7 +101,10 @@ def test_bootstrap_schema_contains_migrated_core_columns_and_tables():
|
||||
"ALTER TABLE user_broker",
|
||||
"ADD COLUMN IF NOT EXISTS api_secret TEXT",
|
||||
"ADD COLUMN IF NOT EXISTS auth_state TEXT",
|
||||
"ALTER TABLE live_equity_snapshot",
|
||||
"ADD COLUMN IF NOT EXISTS positions_adjustment_value NUMERIC NOT NULL DEFAULT 0",
|
||||
"CREATE TABLE IF NOT EXISTS broker_callback_state",
|
||||
"CREATE TABLE IF NOT EXISTS broker_order_state",
|
||||
"CREATE TABLE IF NOT EXISTS execution_claim",
|
||||
"CREATE TABLE IF NOT EXISTS run_leases",
|
||||
"CREATE TABLE IF NOT EXISTS support_request_audit",
|
||||
|
||||
252
backend/tests/test_broker_disconnect_handling.py
Normal file
252
backend/tests/test_broker_disconnect_handling.py
Normal file
@ -0,0 +1,252 @@
|
||||
import importlib
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
def _build_app(monkeypatch):
|
||||
monkeypatch.setenv("APP_ENV", "test")
|
||||
monkeypatch.setenv("DISABLE_STARTUP_TASKS", "1")
|
||||
monkeypatch.setenv("DB_HOST", "localhost")
|
||||
monkeypatch.setenv("DB_NAME", "trading_db")
|
||||
monkeypatch.setenv("DB_USER", "trader")
|
||||
monkeypatch.setenv("DB_PASSWORD", "test-password")
|
||||
monkeypatch.setenv("CORS_ORIGINS", "http://localhost:3000")
|
||||
monkeypatch.setenv("BROKER_TOKEN_KEY", "test-broker-token-key")
|
||||
monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret")
|
||||
|
||||
import app.main as app_main
|
||||
|
||||
importlib.reload(app_main)
|
||||
return app_main.create_app()
|
||||
|
||||
|
||||
def test_disconnect_route_stops_live_strategy_and_disconnects_broker(monkeypatch):
|
||||
app = _build_app(monkeypatch)
|
||||
client = TestClient(app)
|
||||
|
||||
import app.routers.broker as broker_router
|
||||
|
||||
calls = {
|
||||
"disconnect": [],
|
||||
"clear_session": [],
|
||||
}
|
||||
|
||||
monkeypatch.setattr(
|
||||
broker_router,
|
||||
"_require_user",
|
||||
lambda _request: {"id": "user-1", "username": "user@example.com"},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
broker_router,
|
||||
"block_live_strategy_for_broker_disconnect",
|
||||
lambda user_id, reason="broker_disconnected": {
|
||||
"run_id": "run-1",
|
||||
"status": "STOPPED",
|
||||
"reason": reason,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
broker_router,
|
||||
"disconnect_user_broker",
|
||||
lambda user_id: calls["disconnect"].append(user_id),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
broker_router,
|
||||
"clear_zerodha_session",
|
||||
lambda user_id: calls["clear_session"].append(user_id),
|
||||
)
|
||||
monkeypatch.setattr(broker_router, "send_email_async", lambda *_args, **_kwargs: None)
|
||||
|
||||
response = client.post("/api/broker/disconnect", cookies={"session_id": "session-1"})
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"connected": False,
|
||||
"brokerState": "DISCONNECTED",
|
||||
"strategy": {
|
||||
"run_id": "run-1",
|
||||
"status": "STOPPED",
|
||||
"reason": "broker_disconnected",
|
||||
"user_id": "user-1",
|
||||
},
|
||||
}
|
||||
assert calls["disconnect"] == ["user-1"]
|
||||
assert calls["clear_session"] == ["user-1"]
|
||||
|
||||
|
||||
def test_block_live_strategy_for_broker_disconnect_auto_stops_active_live_run(monkeypatch):
|
||||
import app.services.strategy_service as strategy_service
|
||||
|
||||
calls = []
|
||||
|
||||
monkeypatch.setattr(strategy_service, "_effective_running_run_id", lambda _user_id: "run-1")
|
||||
monkeypatch.setattr(strategy_service, "_load_config", lambda _user_id, _run_id: {"mode": "LIVE"})
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"stop_engine",
|
||||
lambda user_id, run_id, timeout=10.0: calls.append(("stop_engine", user_id, run_id, timeout)),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"deactivate_strategy_config",
|
||||
lambda user_id, run_id: calls.append(("deactivate", user_id, run_id)),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"stop_run",
|
||||
lambda user_id, run_id, reason="user_request": calls.append(("stop_run", user_id, run_id, reason)),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_write_status",
|
||||
lambda user_id, run_id, status: calls.append(("write_status", user_id, run_id, status)),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_set_run_status_or_raise",
|
||||
lambda user_id, run_id, status, meta=None: calls.append(("set_run_status", user_id, run_id, status, meta)),
|
||||
)
|
||||
|
||||
result = strategy_service.block_live_strategy_for_broker_disconnect(
|
||||
"user-1",
|
||||
reason="broker_disconnected",
|
||||
)
|
||||
|
||||
assert result == {
|
||||
"run_id": "run-1",
|
||||
"status": "STOPPED",
|
||||
"reason": "broker_disconnected",
|
||||
}
|
||||
assert ("stop_engine", "user-1", "run-1", 10.0) in calls
|
||||
assert ("deactivate", "user-1", "run-1") in calls
|
||||
assert ("stop_run", "user-1", "run-1", "broker_disconnected") in calls
|
||||
assert ("write_status", "user-1", "run-1", "STOPPED") in calls
|
||||
|
||||
|
||||
def test_strategy_status_marks_broker_disconnected_block(monkeypatch):
|
||||
import app.services.strategy_service as strategy_service
|
||||
|
||||
monkeypatch.setattr(strategy_service, "_effective_running_run_id", lambda _user_id: None)
|
||||
monkeypatch.setattr(strategy_service, "get_active_run_id", lambda _user_id: "run-1")
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_load_config",
|
||||
lambda _user_id, _run_id: {
|
||||
"strategy": "Golden Nifty",
|
||||
"mode": "LIVE",
|
||||
"broker": "ZERODHA",
|
||||
"active": True,
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_get_run_row",
|
||||
lambda _user_id, _run_id: {"status": "STOPPED", "meta": {}, "started_at": None, "stopped_at": None},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_broker_block_state",
|
||||
lambda _user_id, _cfg: {
|
||||
"blocked": True,
|
||||
"reason": "broker_disconnected",
|
||||
"broker_state": "DISCONNECTED",
|
||||
"broker": "ZERODHA",
|
||||
},
|
||||
)
|
||||
|
||||
class FakeCursor:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def execute(self, _sql, _params):
|
||||
return None
|
||||
|
||||
def fetchone(self):
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def cursor(self):
|
||||
return FakeCursor()
|
||||
|
||||
monkeypatch.setattr(strategy_service, "db_connection", lambda: FakeConnection())
|
||||
|
||||
status = strategy_service.get_strategy_status("user-1")
|
||||
|
||||
assert status["status"] == "STOPPED"
|
||||
assert status["strategy_blocked"] is True
|
||||
assert status["strategy_block_reason"] == "broker_disconnected"
|
||||
assert status["broker_state"] == "DISCONNECTED"
|
||||
assert status["broker"] == "ZERODHA"
|
||||
|
||||
|
||||
def test_strategy_status_clears_broker_block_after_reconnect(monkeypatch):
|
||||
import app.services.strategy_service as strategy_service
|
||||
|
||||
monkeypatch.setattr(strategy_service, "_effective_running_run_id", lambda _user_id: None)
|
||||
monkeypatch.setattr(strategy_service, "get_active_run_id", lambda _user_id: "run-1")
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_load_config",
|
||||
lambda _user_id, _run_id: {
|
||||
"strategy": "Golden Nifty",
|
||||
"mode": "LIVE",
|
||||
"broker": "ZERODHA",
|
||||
"active": True,
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_get_run_row",
|
||||
lambda _user_id, _run_id: {"status": "STOPPED", "meta": {}, "started_at": None, "stopped_at": None},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
strategy_service,
|
||||
"_broker_block_state",
|
||||
lambda _user_id, _cfg: {
|
||||
"blocked": False,
|
||||
"reason": None,
|
||||
"broker_state": "VALID",
|
||||
"broker": "ZERODHA",
|
||||
},
|
||||
)
|
||||
|
||||
class FakeCursor:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def execute(self, _sql, _params):
|
||||
return None
|
||||
|
||||
def fetchone(self):
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def cursor(self):
|
||||
return FakeCursor()
|
||||
|
||||
monkeypatch.setattr(strategy_service, "db_connection", lambda: FakeConnection())
|
||||
|
||||
status = strategy_service.get_strategy_status("user-1")
|
||||
|
||||
assert status["strategy_blocked"] is False
|
||||
assert status["strategy_block_reason"] is None
|
||||
assert status["broker_state"] == "VALID"
|
||||
@ -85,6 +85,11 @@ def test_try_execute_sip_live_emits_one_order_batch_when_claim_is_lost(monkeypat
|
||||
monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False)
|
||||
monkeypatch.setattr(execution, "claim_execution_window", fake_claim_execution_window)
|
||||
monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"reconcile_live_orders",
|
||||
lambda **kwargs: {"blocked": False, "orders": [], "summaries": []},
|
||||
)
|
||||
monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None))
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
|
||||
124
backend/tests/test_health_readiness.py
Normal file
124
backend/tests/test_health_readiness.py
Normal file
@ -0,0 +1,124 @@
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
def _load_app_main(monkeypatch, *, disable_startup_tasks="1"):
|
||||
monkeypatch.setenv("APP_ENV", "test")
|
||||
monkeypatch.setenv("DISABLE_STARTUP_TASKS", disable_startup_tasks)
|
||||
monkeypatch.setenv("DB_HOST", "localhost")
|
||||
monkeypatch.setenv("DB_NAME", "trading_db")
|
||||
monkeypatch.setenv("DB_USER", "trader")
|
||||
monkeypatch.setenv("DB_PASSWORD", "test-password")
|
||||
monkeypatch.setenv("CORS_ORIGINS", "http://localhost:3000")
|
||||
monkeypatch.setenv("BROKER_TOKEN_KEY", "test-broker-token-key")
|
||||
monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret")
|
||||
|
||||
import app.main as app_main
|
||||
|
||||
importlib.reload(app_main)
|
||||
return app_main
|
||||
|
||||
|
||||
def test_liveness_succeeds_even_if_db_is_unavailable(monkeypatch):
|
||||
app_main = _load_app_main(monkeypatch)
|
||||
app = app_main.create_app()
|
||||
|
||||
import app.routers.health as health_router
|
||||
|
||||
monkeypatch.setattr(health_router, "health_check", lambda: False)
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/health/live")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "ok"}
|
||||
|
||||
|
||||
def test_readiness_fails_when_db_is_unavailable(monkeypatch):
|
||||
app_main = _load_app_main(monkeypatch)
|
||||
app = app_main.create_app()
|
||||
|
||||
import app.routers.health as health_router
|
||||
|
||||
monkeypatch.setattr(health_router, "health_check", lambda: False)
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/health/ready")
|
||||
|
||||
assert response.status_code == 503
|
||||
assert response.json()["detail"]["db"] == "unavailable"
|
||||
|
||||
|
||||
def test_readiness_succeeds_when_startup_complete_and_db_available(monkeypatch):
|
||||
app_main = _load_app_main(monkeypatch)
|
||||
app = app_main.create_app()
|
||||
|
||||
import app.routers.health as health_router
|
||||
|
||||
monkeypatch.setattr(health_router, "health_check", lambda: True)
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/health/ready")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
assert response.json()["startup"]["complete"] is True
|
||||
|
||||
|
||||
def test_startup_failure_keeps_readiness_unhealthy(monkeypatch):
|
||||
app_main = _load_app_main(monkeypatch, disable_startup_tasks="0")
|
||||
app = app_main.create_app()
|
||||
|
||||
import app.routers.health as health_router
|
||||
|
||||
monkeypatch.setattr(health_router, "health_check", lambda: True)
|
||||
|
||||
def fake_startup_tasks(app):
|
||||
app.state.startup_started = True
|
||||
app.state.startup_complete = False
|
||||
app.state.startup_error = "resume_failed"
|
||||
|
||||
monkeypatch.setattr(app_main, "_run_startup_tasks", fake_startup_tasks)
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/health")
|
||||
|
||||
assert response.status_code == 503
|
||||
assert response.json()["detail"]["startup"]["error"] == "resume_failed"
|
||||
|
||||
|
||||
def test_production_boot_fails_without_broker_token_key(monkeypatch):
|
||||
monkeypatch.setenv("APP_ENV", "production")
|
||||
monkeypatch.setenv("DB_HOST", "localhost")
|
||||
monkeypatch.setenv("DB_NAME", "trading_db")
|
||||
monkeypatch.setenv("DB_USER", "trader")
|
||||
monkeypatch.setenv("DB_PASSWORD", "test-password")
|
||||
monkeypatch.setenv("CORS_ORIGINS", "https://app.quantfortune.com")
|
||||
monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret")
|
||||
monkeypatch.delenv("BROKER_TOKEN_KEY", raising=False)
|
||||
|
||||
import app.main as app_main
|
||||
|
||||
with pytest.raises(RuntimeError, match="BROKER_TOKEN_KEY must be configured in production"):
|
||||
importlib.reload(app_main)
|
||||
|
||||
|
||||
def test_production_bootstrap_requires_explicit_admin_credentials(monkeypatch):
|
||||
monkeypatch.setenv("APP_ENV", "production")
|
||||
monkeypatch.setenv("DB_HOST", "localhost")
|
||||
monkeypatch.setenv("DB_NAME", "trading_db")
|
||||
monkeypatch.setenv("DB_USER", "trader")
|
||||
monkeypatch.setenv("DB_PASSWORD", "test-password")
|
||||
monkeypatch.setenv("CORS_ORIGINS", "https://app.quantfortune.com")
|
||||
monkeypatch.setenv("RESET_OTP_SECRET", "test-reset-secret")
|
||||
monkeypatch.setenv("BROKER_TOKEN_KEY", "test-broker-token-key")
|
||||
monkeypatch.setenv("ENABLE_SUPER_ADMIN_BOOTSTRAP", "1")
|
||||
monkeypatch.delenv("SUPER_ADMIN_EMAIL", raising=False)
|
||||
monkeypatch.delenv("SUPER_ADMIN_PASSWORD", raising=False)
|
||||
|
||||
import app.main as app_main
|
||||
|
||||
with pytest.raises(RuntimeError, match="SUPER_ADMIN_EMAIL must be configured when bootstrap is enabled"):
|
||||
importlib.reload(app_main)
|
||||
127
backend/tests/test_live_equity_positions.py
Normal file
127
backend/tests/test_live_equity_positions.py
Normal file
@ -0,0 +1,127 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def _holding(symbol: str, *, quantity: float, t1_quantity: float = 0.0, last_price: float = 0.0, exchange: str = "NSE"):
|
||||
return {
|
||||
"symbol": symbol,
|
||||
"exchange": exchange,
|
||||
"quantity": quantity,
|
||||
"t1_quantity": t1_quantity,
|
||||
"last_price": last_price,
|
||||
}
|
||||
|
||||
|
||||
def _position(symbol: str, *, net_quantity: float, last_price: float, exchange: str = "NSE"):
|
||||
return {
|
||||
"symbol": symbol,
|
||||
"exchange": exchange,
|
||||
"net_quantity": net_quantity,
|
||||
"last_price": last_price,
|
||||
}
|
||||
|
||||
|
||||
def test_positions_adjustment_is_zero_when_only_holdings_exist():
|
||||
from app.services.live_equity_service import _extract_positions_adjustment_value
|
||||
|
||||
adjustment = _extract_positions_adjustment_value(
|
||||
[_holding("NIFTYBEES", quantity=2, last_price=250.0)],
|
||||
[],
|
||||
)
|
||||
|
||||
assert adjustment == 0.0
|
||||
|
||||
|
||||
def test_positions_only_are_counted_in_live_equity_snapshot(monkeypatch):
|
||||
import app.services.live_equity_service as live_equity_service
|
||||
|
||||
captured = {}
|
||||
monkeypatch.setattr(live_equity_service, "get_user_broker", lambda _user_id: {})
|
||||
monkeypatch.setattr(
|
||||
live_equity_service,
|
||||
"_upsert_snapshot",
|
||||
lambda **kwargs: captured.update(kwargs) or kwargs,
|
||||
)
|
||||
|
||||
result = live_equity_service.capture_live_equity_snapshot(
|
||||
"user-1",
|
||||
holdings=[],
|
||||
positions=[_position("NIFTYBEES", net_quantity=2, last_price=250.0)],
|
||||
funds_data={"equity": {"available": {"live_balance": 1000.0}}},
|
||||
captured_at=datetime(2026, 4, 9, 10, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
assert captured["cash_value"] == 1000.0
|
||||
assert captured["holdings_value"] == 0.0
|
||||
assert captured["positions_adjustment_value"] == 500.0
|
||||
assert result["positions_adjustment_value"] == 500.0
|
||||
|
||||
|
||||
def test_same_day_position_already_reflected_in_t1_is_not_double_counted():
|
||||
from app.services.live_equity_service import _extract_positions_adjustment_value
|
||||
|
||||
adjustment = _extract_positions_adjustment_value(
|
||||
[_holding("GOLDBEES", quantity=2, t1_quantity=1, last_price=120.0)],
|
||||
[_position("GOLDBEES", net_quantity=1, last_price=120.0)],
|
||||
)
|
||||
|
||||
assert adjustment == 0.0
|
||||
|
||||
|
||||
def test_negative_same_day_position_reduces_exposure_without_going_below_zero():
|
||||
from app.services.live_equity_service import _extract_positions_adjustment_value
|
||||
|
||||
adjustment = _extract_positions_adjustment_value(
|
||||
[_holding("NIFTYBEES", quantity=3, last_price=250.0)],
|
||||
[_position("NIFTYBEES", net_quantity=-1, last_price=250.0)],
|
||||
)
|
||||
|
||||
assert adjustment == -250.0
|
||||
|
||||
|
||||
def test_capture_live_equity_snapshot_avoids_double_counting_when_holdings_and_t1_overlap(monkeypatch):
|
||||
import app.services.live_equity_service as live_equity_service
|
||||
|
||||
captured = {}
|
||||
monkeypatch.setattr(live_equity_service, "get_user_broker", lambda _user_id: {})
|
||||
monkeypatch.setattr(
|
||||
live_equity_service,
|
||||
"_upsert_snapshot",
|
||||
lambda **kwargs: captured.update(kwargs) or kwargs,
|
||||
)
|
||||
|
||||
result = live_equity_service.capture_live_equity_snapshot(
|
||||
"user-1",
|
||||
holdings=[_holding("NIFTYBEES", quantity=2, t1_quantity=1, last_price=250.0)],
|
||||
positions=[_position("NIFTYBEES", net_quantity=1, last_price=250.0)],
|
||||
funds_data={"equity": {"available": {"live_balance": 1000.0}}},
|
||||
captured_at=datetime(2026, 4, 9, 10, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
assert captured["holdings_value"] == 750.0
|
||||
assert captured["positions_adjustment_value"] == 0.0
|
||||
assert result["positions_adjustment_value"] == 0.0
|
||||
|
||||
|
||||
def test_capture_live_equity_snapshot_adds_same_day_position_exposure_to_total(monkeypatch):
|
||||
import app.services.live_equity_service as live_equity_service
|
||||
|
||||
captured = {}
|
||||
monkeypatch.setattr(live_equity_service, "get_user_broker", lambda _user_id: {})
|
||||
monkeypatch.setattr(
|
||||
live_equity_service,
|
||||
"_upsert_snapshot",
|
||||
lambda **kwargs: captured.update(kwargs) or kwargs,
|
||||
)
|
||||
|
||||
result = live_equity_service.capture_live_equity_snapshot(
|
||||
"user-1",
|
||||
holdings=[_holding("NIFTYBEES", quantity=1, last_price=250.0)],
|
||||
positions=[_position("NIFTYBEES", net_quantity=1, last_price=250.0)],
|
||||
funds_data={"equity": {"available": {"live_balance": 1000.0}}},
|
||||
captured_at=datetime(2026, 4, 9, 10, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
assert captured["cash_value"] == 1000.0
|
||||
assert captured["holdings_value"] == 250.0
|
||||
assert captured["positions_adjustment_value"] == 250.0
|
||||
assert result["positions_adjustment_value"] == 250.0
|
||||
337
backend/tests/test_order_reconciliation.py
Normal file
337
backend/tests/test_order_reconciliation.py
Normal file
@ -0,0 +1,337 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def _base_state():
|
||||
return {
|
||||
"nifty_units": 0.0,
|
||||
"gold_units": 0.0,
|
||||
"total_invested": 0.0,
|
||||
"last_run": None,
|
||||
"last_sip_ts": None,
|
||||
}
|
||||
|
||||
|
||||
class _Broker:
|
||||
def __init__(self, payload):
|
||||
self.payload = payload
|
||||
|
||||
def refresh_order_status(self, order):
|
||||
merged = dict(order)
|
||||
merged.update(self.payload)
|
||||
return merged
|
||||
|
||||
|
||||
def test_reconciliation_applies_late_fill_once(monkeypatch):
|
||||
import indian_paper_trading_strategy.engine.execution as execution
|
||||
|
||||
logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc)
|
||||
checked_at = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc)
|
||||
state = _base_state()
|
||||
saved = {}
|
||||
|
||||
pending_orders = [
|
||||
{
|
||||
"local_order_id": "order-1",
|
||||
"logical_time": logical_time,
|
||||
"broker": "ZERODHA",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"side": "BUY",
|
||||
"requested_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"status": "PENDING",
|
||||
"created_at": checked_at,
|
||||
}
|
||||
]
|
||||
|
||||
cycle_orders = [
|
||||
{
|
||||
"local_order_id": "order-1",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"side": "BUY",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 1.0,
|
||||
"accounted_fill_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 251.0,
|
||||
"status": execution.LOCAL_ORDER_FILLED,
|
||||
"needs_reconciliation": False,
|
||||
}
|
||||
]
|
||||
|
||||
monkeypatch.setattr(execution, "_normalize_now", lambda value: value)
|
||||
monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None))
|
||||
monkeypatch.setattr(execution, "_list_orders_to_reconcile", lambda cur, checked_before: list(pending_orders))
|
||||
monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state)
|
||||
monkeypatch.setattr(execution, "save_state", lambda snapshot, **kwargs: saved.update(snapshot))
|
||||
monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False)
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_upsert_broker_order_state",
|
||||
lambda cur, broker_name, logical_time, order, checked_at: {
|
||||
"local_order_id": "order-1",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 251.0,
|
||||
"status": execution.LOCAL_ORDER_FILLED,
|
||||
"accounted_fill_qty": 0.0,
|
||||
"delta_fill_qty": 1.0,
|
||||
"needs_reconciliation": False,
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "_load_cycle_order_rows", lambda cur, logical_time: list(cycle_orders))
|
||||
|
||||
result = execution.reconcile_live_orders(
|
||||
broker=_Broker({"status": "FILLED", "filled_qty": 1.0, "average_price": 251.0}),
|
||||
mode="LIVE",
|
||||
now_ts=checked_at,
|
||||
)
|
||||
|
||||
assert result["blocked"] is False
|
||||
assert state["nifty_units"] == 1.0
|
||||
assert state["total_invested"] == 251.0
|
||||
assert state["last_sip_ts"] == checked_at.isoformat()
|
||||
assert saved["last_sip_ts"] == checked_at.isoformat()
|
||||
|
||||
|
||||
def test_partial_fill_remains_partial_and_does_not_advance_cycle(monkeypatch):
|
||||
import indian_paper_trading_strategy.engine.execution as execution
|
||||
|
||||
logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc)
|
||||
checked_at = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc)
|
||||
state = _base_state()
|
||||
|
||||
monkeypatch.setattr(execution, "_normalize_now", lambda value: value)
|
||||
monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None))
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_list_orders_to_reconcile",
|
||||
lambda cur, checked_before: [
|
||||
{
|
||||
"local_order_id": "order-2",
|
||||
"logical_time": logical_time,
|
||||
"broker": "ZERODHA",
|
||||
"symbol": "GOLDBEES.NS",
|
||||
"side": "BUY",
|
||||
"requested_qty": 2.0,
|
||||
"requested_price": 100.0,
|
||||
"status": "PENDING",
|
||||
"created_at": checked_at,
|
||||
}
|
||||
],
|
||||
)
|
||||
monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state)
|
||||
monkeypatch.setattr(execution, "save_state", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False)
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_upsert_broker_order_state",
|
||||
lambda cur, broker_name, logical_time, order, checked_at: {
|
||||
"local_order_id": "order-2",
|
||||
"symbol": "GOLDBEES.NS",
|
||||
"requested_qty": 2.0,
|
||||
"filled_qty": 1.0,
|
||||
"requested_price": 100.0,
|
||||
"average_price": 101.0,
|
||||
"status": execution.LOCAL_ORDER_PARTIAL,
|
||||
"accounted_fill_qty": 0.0,
|
||||
"delta_fill_qty": 1.0,
|
||||
"needs_reconciliation": False,
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_load_cycle_order_rows",
|
||||
lambda cur, logical_time: [
|
||||
{
|
||||
"local_order_id": "order-2",
|
||||
"symbol": "GOLDBEES.NS",
|
||||
"requested_qty": 2.0,
|
||||
"filled_qty": 1.0,
|
||||
"requested_price": 100.0,
|
||||
"average_price": 101.0,
|
||||
"status": execution.LOCAL_ORDER_PARTIAL,
|
||||
"needs_reconciliation": False,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
result = execution.reconcile_live_orders(
|
||||
broker=_Broker({"status": "CANCELLED", "filled_qty": 1.0, "average_price": 101.0}),
|
||||
mode="LIVE",
|
||||
now_ts=checked_at,
|
||||
)
|
||||
|
||||
assert result["blocked"] is False
|
||||
assert state["gold_units"] == 1.0
|
||||
assert state["last_sip_ts"] is None
|
||||
assert state["last_run"] == checked_at.isoformat()
|
||||
|
||||
|
||||
def test_rejected_order_does_not_mark_cycle_executed(monkeypatch):
|
||||
import indian_paper_trading_strategy.engine.execution as execution
|
||||
|
||||
logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc)
|
||||
checked_at = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc)
|
||||
state = _base_state()
|
||||
|
||||
monkeypatch.setattr(execution, "_normalize_now", lambda value: value)
|
||||
monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None))
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_list_orders_to_reconcile",
|
||||
lambda cur, checked_before: [
|
||||
{
|
||||
"local_order_id": "order-3",
|
||||
"logical_time": logical_time,
|
||||
"broker": "ZERODHA",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"side": "BUY",
|
||||
"requested_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"status": "PENDING",
|
||||
"created_at": checked_at,
|
||||
}
|
||||
],
|
||||
)
|
||||
monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state)
|
||||
monkeypatch.setattr(execution, "save_state", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False)
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_upsert_broker_order_state",
|
||||
lambda cur, broker_name, logical_time, order, checked_at: {
|
||||
"local_order_id": "order-3",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 0.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 0.0,
|
||||
"status": execution.LOCAL_ORDER_REJECTED,
|
||||
"accounted_fill_qty": 0.0,
|
||||
"delta_fill_qty": 0.0,
|
||||
"needs_reconciliation": False,
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_load_cycle_order_rows",
|
||||
lambda cur, logical_time: [
|
||||
{
|
||||
"local_order_id": "order-3",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 0.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 0.0,
|
||||
"status": execution.LOCAL_ORDER_REJECTED,
|
||||
"needs_reconciliation": False,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
result = execution.reconcile_live_orders(
|
||||
broker=_Broker({"status": "REJECTED", "filled_qty": 0.0}),
|
||||
mode="LIVE",
|
||||
now_ts=checked_at,
|
||||
)
|
||||
|
||||
assert result["blocked"] is False
|
||||
assert state["nifty_units"] == 0.0
|
||||
assert state["last_sip_ts"] is None
|
||||
assert state["last_run"] == checked_at.isoformat()
|
||||
|
||||
|
||||
def test_reconciliation_is_repeat_safe(monkeypatch):
|
||||
import indian_paper_trading_strategy.engine.execution as execution
|
||||
|
||||
logical_time = datetime(2026, 4, 9, 4, 0, tzinfo=timezone.utc)
|
||||
first_check = datetime(2026, 4, 9, 4, 1, tzinfo=timezone.utc)
|
||||
second_check = datetime(2026, 4, 9, 4, 2, tzinfo=timezone.utc)
|
||||
state = _base_state()
|
||||
upserts = [
|
||||
{
|
||||
"local_order_id": "order-4",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 250.0,
|
||||
"status": execution.LOCAL_ORDER_FILLED,
|
||||
"accounted_fill_qty": 0.0,
|
||||
"delta_fill_qty": 1.0,
|
||||
"needs_reconciliation": False,
|
||||
},
|
||||
{
|
||||
"local_order_id": "order-4",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 250.0,
|
||||
"status": execution.LOCAL_ORDER_FILLED,
|
||||
"accounted_fill_qty": 1.0,
|
||||
"delta_fill_qty": 0.0,
|
||||
"needs_reconciliation": False,
|
||||
},
|
||||
]
|
||||
|
||||
monkeypatch.setattr(execution, "_normalize_now", lambda value: value)
|
||||
monkeypatch.setattr(execution, "run_with_retry", lambda op, retries=None, delay=None: op(object(), None))
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_list_orders_to_reconcile",
|
||||
lambda cur, checked_before: [
|
||||
{
|
||||
"local_order_id": "order-4",
|
||||
"logical_time": logical_time,
|
||||
"broker": "ZERODHA",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"side": "BUY",
|
||||
"requested_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"status": "PENDING",
|
||||
"created_at": first_check,
|
||||
}
|
||||
],
|
||||
)
|
||||
monkeypatch.setattr(execution, "load_state", lambda mode=None, cur=None, for_update=False: state)
|
||||
monkeypatch.setattr(execution, "save_state", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "insert_engine_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "log_event", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "event_exists", lambda *args, **kwargs: False)
|
||||
monkeypatch.setattr(execution, "_mark_accounted_fill", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr(execution, "_load_cycle_order_rows", lambda cur, logical_time: [
|
||||
{
|
||||
"local_order_id": "order-4",
|
||||
"symbol": "NIFTYBEES.NS",
|
||||
"requested_qty": 1.0,
|
||||
"filled_qty": 1.0,
|
||||
"requested_price": 250.0,
|
||||
"average_price": 250.0,
|
||||
"status": execution.LOCAL_ORDER_FILLED,
|
||||
"needs_reconciliation": False,
|
||||
}
|
||||
])
|
||||
monkeypatch.setattr(
|
||||
execution,
|
||||
"_upsert_broker_order_state",
|
||||
lambda cur, broker_name, logical_time, order, checked_at: upserts.pop(0),
|
||||
)
|
||||
|
||||
broker = _Broker({"status": "FILLED", "filled_qty": 1.0, "average_price": 250.0})
|
||||
execution.reconcile_live_orders(broker=broker, mode="LIVE", now_ts=first_check)
|
||||
execution.reconcile_live_orders(broker=broker, mode="LIVE", now_ts=second_check)
|
||||
|
||||
assert state["nifty_units"] == 1.0
|
||||
assert state["total_invested"] == 250.0
|
||||
@ -37,10 +37,13 @@ class Broker(ABC):
|
||||
def get_orders(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
@abstractmethod
|
||||
def get_funds(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def refresh_order_status(self, order: dict):
|
||||
return order
|
||||
|
||||
|
||||
class BrokerError(Exception):
|
||||
pass
|
||||
@ -296,6 +299,33 @@ class LiveZerodhaBroker(Broker):
|
||||
|
||||
time.sleep(self.POLL_INTERVAL_SECONDS)
|
||||
|
||||
def refresh_order_status(self, order: dict):
|
||||
from app.services.zerodha_service import KiteTokenError, fetch_order_history
|
||||
|
||||
session = self._session()
|
||||
order_id = str(order.get("broker_order_id") or order.get("id") or "").strip()
|
||||
if not order_id:
|
||||
return dict(order)
|
||||
try:
|
||||
history = fetch_order_history(
|
||||
session["api_key"],
|
||||
session["access_token"],
|
||||
order_id,
|
||||
)
|
||||
except KiteTokenError as exc:
|
||||
self._raise_auth_expired(exc)
|
||||
|
||||
entry = history[-1] if history else None
|
||||
return self._normalize_order_payload(
|
||||
order_id=order_id,
|
||||
symbol=str(order.get("symbol") or ""),
|
||||
side=str(order.get("side") or ""),
|
||||
requested_qty=int(order.get("requested_qty") or order.get("qty") or 0),
|
||||
requested_price=float(order.get("requested_price") or order.get("price") or 0.0),
|
||||
history_entry=entry,
|
||||
logical_time=_parse_ts(order.get("logical_time"), assume_local=False),
|
||||
)
|
||||
|
||||
def get_funds(self, cur=None):
|
||||
from app.services.zerodha_service import KiteTokenError, fetch_funds
|
||||
|
||||
@ -693,6 +723,41 @@ class LiveGrowwBroker(Broker):
|
||||
|
||||
time.sleep(self.POLL_INTERVAL_SECONDS)
|
||||
|
||||
def refresh_order_status(self, order: dict):
|
||||
from app.services.groww_service import GrowwApiError, GrowwTokenError, fetch_order_detail, fetch_order_status
|
||||
|
||||
session = self._session()
|
||||
order_id = self._extract_order_id(order)
|
||||
if not order_id:
|
||||
return dict(order)
|
||||
segment = self._first_text(order.get("segment"), default="CASH")
|
||||
merged = {}
|
||||
try:
|
||||
detail = fetch_order_detail(session["access_token"], order_id, segment=segment)
|
||||
status_payload = fetch_order_status(session["access_token"], order_id, segment=segment)
|
||||
if isinstance(detail, dict):
|
||||
merged.update(detail)
|
||||
if isinstance(status_payload, dict):
|
||||
merged.update(status_payload)
|
||||
except GrowwTokenError as exc:
|
||||
self._raise_auth_expired(exc)
|
||||
except GrowwApiError as exc:
|
||||
merged = {
|
||||
"groww_order_id": order_id,
|
||||
"order_status": "FAILED",
|
||||
"remark": getattr(exc, "message", str(exc)),
|
||||
}
|
||||
|
||||
return self._normalize_order_payload(
|
||||
order_id=order_id,
|
||||
symbol=str(order.get("symbol") or ""),
|
||||
side=str(order.get("side") or ""),
|
||||
requested_qty=int(order.get("requested_qty") or order.get("qty") or 0),
|
||||
requested_price=float(order.get("requested_price") or order.get("price") or 0.0),
|
||||
order_entry=merged,
|
||||
logical_time=_parse_ts(order.get("logical_time"), assume_local=False),
|
||||
)
|
||||
|
||||
def get_funds(self, cur=None):
|
||||
from app.services.groww_service import GrowwTokenError, fetch_funds
|
||||
|
||||
|
||||
@ -1,12 +1,32 @@
|
||||
# engine/execution.py
|
||||
from datetime import datetime, timezone
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from indian_paper_trading_strategy.engine.state import load_state, save_state
|
||||
|
||||
from indian_paper_trading_strategy.engine.broker import Broker, BrokerAuthExpired
|
||||
from indian_paper_trading_strategy.engine.ledger import claim_execution_window, log_event, event_exists
|
||||
from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry
|
||||
from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry, get_context
|
||||
from indian_paper_trading_strategy.engine.market import market_now
|
||||
from indian_paper_trading_strategy.engine.time_utils import compute_logical_time
|
||||
|
||||
LOCAL_ORDER_PENDING = "PENDING"
|
||||
LOCAL_ORDER_PARTIAL = "PARTIAL"
|
||||
LOCAL_ORDER_FILLED = "FILLED"
|
||||
LOCAL_ORDER_REJECTED = "REJECTED"
|
||||
LOCAL_ORDER_CANCELLED = "CANCELLED"
|
||||
LOCAL_ORDER_UNKNOWN = "UNKNOWN"
|
||||
|
||||
_RECONCILEABLE_ORDER_STATES = {
|
||||
LOCAL_ORDER_PENDING,
|
||||
LOCAL_ORDER_PARTIAL,
|
||||
LOCAL_ORDER_UNKNOWN,
|
||||
}
|
||||
RECONCILIATION_INTERVAL = timedelta(
|
||||
seconds=int(os.getenv("ORDER_RECONCILIATION_INTERVAL_SEC", "15"))
|
||||
)
|
||||
RECONCILIATION_TIMEOUT = timedelta(
|
||||
seconds=int(os.getenv("ORDER_RECONCILIATION_TIMEOUT_SEC", "300"))
|
||||
)
|
||||
|
||||
def _as_float(value):
|
||||
if hasattr(value, "item"):
|
||||
@ -88,6 +108,506 @@ def _apply_filled_orders_to_state(state, orders):
|
||||
}
|
||||
|
||||
|
||||
def _utc_now():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _current_scope():
|
||||
return get_context()
|
||||
|
||||
|
||||
def _order_status_details(order: dict) -> dict:
|
||||
payload = dict(order or {})
|
||||
requested_qty = float(payload.get("requested_qty") or payload.get("qty") or 0.0)
|
||||
filled_qty = max(float(payload.get("filled_qty") or 0.0), 0.0)
|
||||
requested_price = float(payload.get("requested_price") or payload.get("price") or 0.0)
|
||||
average_price = float(payload.get("average_price") or requested_price or 0.0)
|
||||
raw_status = str(payload.get("status") or "").strip().upper()
|
||||
|
||||
if requested_qty > 0 and filled_qty >= requested_qty:
|
||||
local_status = LOCAL_ORDER_FILLED
|
||||
needs_reconciliation = False
|
||||
elif filled_qty > 0:
|
||||
local_status = LOCAL_ORDER_PARTIAL
|
||||
needs_reconciliation = raw_status not in {LOCAL_ORDER_FILLED, LOCAL_ORDER_REJECTED, LOCAL_ORDER_CANCELLED}
|
||||
elif raw_status == LOCAL_ORDER_REJECTED:
|
||||
local_status = LOCAL_ORDER_REJECTED
|
||||
needs_reconciliation = False
|
||||
elif raw_status == LOCAL_ORDER_CANCELLED:
|
||||
local_status = LOCAL_ORDER_CANCELLED
|
||||
needs_reconciliation = False
|
||||
elif raw_status == LOCAL_ORDER_PENDING:
|
||||
local_status = LOCAL_ORDER_PENDING
|
||||
needs_reconciliation = True
|
||||
else:
|
||||
local_status = LOCAL_ORDER_UNKNOWN
|
||||
needs_reconciliation = True
|
||||
|
||||
return {
|
||||
"local_order_id": str(payload.get("id") or payload.get("broker_order_id") or ""),
|
||||
"broker_order_id": str(payload.get("broker_order_id") or payload.get("id") or "") or None,
|
||||
"symbol": str(payload.get("symbol") or ""),
|
||||
"side": str(payload.get("side") or "").upper(),
|
||||
"requested_qty": requested_qty,
|
||||
"filled_qty": filled_qty,
|
||||
"requested_price": requested_price,
|
||||
"average_price": average_price,
|
||||
"status": local_status,
|
||||
"broker_status": raw_status or None,
|
||||
"status_message": payload.get("status_message"),
|
||||
"needs_reconciliation": needs_reconciliation,
|
||||
"logical_time": payload.get("logical_time"),
|
||||
}
|
||||
|
||||
|
||||
def _upsert_broker_order_state(cur, *, broker_name: str, logical_time, order: dict, checked_at: datetime):
|
||||
user_id, run_id = _current_scope()
|
||||
details = _order_status_details(order)
|
||||
if not details["local_order_id"]:
|
||||
raise ValueError("Broker order state requires a stable order id")
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT status, filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at
|
||||
FROM broker_order_state
|
||||
WHERE local_order_id = %s
|
||||
FOR UPDATE
|
||||
""",
|
||||
(details["local_order_id"],),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
previous = {
|
||||
"status": row[0],
|
||||
"filled_qty": float(row[1] or 0.0),
|
||||
"accounted_fill_qty": float(row[2] or 0.0),
|
||||
"needs_reconciliation": bool(row[3]),
|
||||
"last_checked_at": row[4],
|
||||
} if row else {
|
||||
"status": None,
|
||||
"filled_qty": 0.0,
|
||||
"accounted_fill_qty": 0.0,
|
||||
"needs_reconciliation": False,
|
||||
"last_checked_at": None,
|
||||
}
|
||||
|
||||
logical_ts = logical_time or checked_at
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO broker_order_state (
|
||||
local_order_id,
|
||||
user_id,
|
||||
run_id,
|
||||
logical_time,
|
||||
broker,
|
||||
symbol,
|
||||
side,
|
||||
broker_order_id,
|
||||
requested_qty,
|
||||
filled_qty,
|
||||
accounted_fill_qty,
|
||||
requested_price,
|
||||
average_price,
|
||||
status,
|
||||
broker_status,
|
||||
status_message,
|
||||
needs_reconciliation,
|
||||
last_checked_at,
|
||||
created_at,
|
||||
updated_at
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (local_order_id) DO UPDATE
|
||||
SET broker_order_id = EXCLUDED.broker_order_id,
|
||||
requested_qty = EXCLUDED.requested_qty,
|
||||
filled_qty = EXCLUDED.filled_qty,
|
||||
requested_price = EXCLUDED.requested_price,
|
||||
average_price = EXCLUDED.average_price,
|
||||
status = EXCLUDED.status,
|
||||
broker_status = EXCLUDED.broker_status,
|
||||
status_message = EXCLUDED.status_message,
|
||||
needs_reconciliation = EXCLUDED.needs_reconciliation,
|
||||
last_checked_at = EXCLUDED.last_checked_at,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
""",
|
||||
(
|
||||
details["local_order_id"],
|
||||
user_id,
|
||||
run_id,
|
||||
logical_ts,
|
||||
broker_name,
|
||||
details["symbol"],
|
||||
details["side"],
|
||||
details["broker_order_id"],
|
||||
details["requested_qty"],
|
||||
details["filled_qty"],
|
||||
previous["accounted_fill_qty"],
|
||||
details["requested_price"],
|
||||
details["average_price"],
|
||||
details["status"],
|
||||
details["broker_status"],
|
||||
details["status_message"],
|
||||
details["needs_reconciliation"],
|
||||
checked_at,
|
||||
checked_at,
|
||||
checked_at,
|
||||
),
|
||||
)
|
||||
|
||||
details["accounted_fill_qty"] = previous["accounted_fill_qty"]
|
||||
details["delta_fill_qty"] = max(details["filled_qty"] - previous["accounted_fill_qty"], 0.0)
|
||||
details["previous_status"] = previous["status"]
|
||||
details["previous_filled_qty"] = previous["filled_qty"]
|
||||
details["previous_needs_reconciliation"] = previous["needs_reconciliation"]
|
||||
return details
|
||||
|
||||
|
||||
def _mark_accounted_fill(cur, local_order_id: str, accounted_fill_qty: float, checked_at: datetime):
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE broker_order_state
|
||||
SET accounted_fill_qty = %s,
|
||||
updated_at = %s
|
||||
WHERE local_order_id = %s
|
||||
""",
|
||||
(accounted_fill_qty, checked_at, local_order_id),
|
||||
)
|
||||
|
||||
|
||||
def _apply_fill_delta_to_state(state: dict, order_details: dict):
|
||||
delta_qty = max(float(order_details.get("delta_fill_qty") or 0.0), 0.0)
|
||||
average_price = float(order_details.get("average_price") or order_details.get("requested_price") or 0.0)
|
||||
symbol = str(order_details.get("symbol") or "").upper()
|
||||
delta_amount = delta_qty * average_price
|
||||
|
||||
if delta_qty <= 0:
|
||||
return {"nifty_units": 0.0, "gold_units": 0.0, "amount": 0.0}
|
||||
|
||||
nifty_units = 0.0
|
||||
gold_units = 0.0
|
||||
if symbol.startswith("NIFTYBEES"):
|
||||
state["nifty_units"] += delta_qty
|
||||
nifty_units = delta_qty
|
||||
elif symbol.startswith("GOLDBEES"):
|
||||
state["gold_units"] += delta_qty
|
||||
gold_units = delta_qty
|
||||
state["total_invested"] += delta_amount
|
||||
return {"nifty_units": nifty_units, "gold_units": gold_units, "amount": delta_amount}
|
||||
|
||||
|
||||
def _load_cycle_order_rows(cur, logical_time):
|
||||
user_id, run_id = _current_scope()
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT local_order_id, symbol, side, requested_qty, filled_qty, accounted_fill_qty,
|
||||
requested_price, average_price, status, broker_status, status_message,
|
||||
needs_reconciliation, broker_order_id
|
||||
FROM broker_order_state
|
||||
WHERE user_id = %s AND run_id = %s AND logical_time = %s
|
||||
ORDER BY local_order_id
|
||||
""",
|
||||
(user_id, run_id, logical_time),
|
||||
)
|
||||
rows = []
|
||||
for row in cur.fetchall():
|
||||
rows.append(
|
||||
{
|
||||
"local_order_id": row[0],
|
||||
"symbol": row[1],
|
||||
"side": row[2],
|
||||
"requested_qty": float(row[3] or 0.0),
|
||||
"filled_qty": float(row[4] or 0.0),
|
||||
"accounted_fill_qty": float(row[5] or 0.0),
|
||||
"requested_price": float(row[6] or 0.0),
|
||||
"average_price": float(row[7] or 0.0),
|
||||
"status": row[8],
|
||||
"broker_status": row[9],
|
||||
"status_message": row[10],
|
||||
"needs_reconciliation": bool(row[11]),
|
||||
"broker_order_id": row[12],
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def _aggregate_cycle_orders(orders: list[dict]):
|
||||
summary = {
|
||||
"nifty_units": 0.0,
|
||||
"gold_units": 0.0,
|
||||
"nifty_price": 0.0,
|
||||
"gold_price": 0.0,
|
||||
"amount": 0.0,
|
||||
"has_any_fill": False,
|
||||
"all_filled": bool(orders),
|
||||
"has_unresolved": False,
|
||||
}
|
||||
for order in orders:
|
||||
symbol = str(order.get("symbol") or "").upper()
|
||||
filled_qty = float(order.get("filled_qty") or 0.0)
|
||||
requested_qty = float(order.get("requested_qty") or 0.0)
|
||||
avg_price = float(order.get("average_price") or order.get("requested_price") or 0.0)
|
||||
status = str(order.get("status") or "")
|
||||
needs_reconciliation = bool(order.get("needs_reconciliation"))
|
||||
if status != LOCAL_ORDER_FILLED:
|
||||
summary["all_filled"] = False
|
||||
if needs_reconciliation or status in _RECONCILEABLE_ORDER_STATES:
|
||||
summary["has_unresolved"] = True
|
||||
if filled_qty <= 0:
|
||||
continue
|
||||
summary["has_any_fill"] = True
|
||||
summary["amount"] += filled_qty * avg_price
|
||||
if symbol.startswith("NIFTYBEES"):
|
||||
summary["nifty_units"] += filled_qty
|
||||
summary["nifty_price"] = avg_price or summary["nifty_price"]
|
||||
elif symbol.startswith("GOLDBEES"):
|
||||
summary["gold_units"] += filled_qty
|
||||
summary["gold_price"] = avg_price or summary["gold_price"]
|
||||
if requested_qty > 0 and filled_qty < requested_qty:
|
||||
summary["all_filled"] = False
|
||||
return summary
|
||||
|
||||
|
||||
def _record_reconciliation_event(cur, event: str, *, logical_time, payload: dict, ts: datetime):
|
||||
if event_exists(event, logical_time, cur=cur):
|
||||
return
|
||||
if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}:
|
||||
log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time)
|
||||
return
|
||||
insert_engine_event(cur, event, data=payload, ts=ts)
|
||||
|
||||
|
||||
def _advance_cycle_state(cur, *, state: dict, logical_time, cycle_orders: list[dict], event_ts: datetime, failure_reason: str | None = None):
|
||||
cycle_summary = _aggregate_cycle_orders(cycle_orders)
|
||||
if cycle_orders or failure_reason:
|
||||
state["last_run"] = event_ts.isoformat()
|
||||
|
||||
if cycle_summary["all_filled"]:
|
||||
state["last_sip_ts"] = event_ts.isoformat()
|
||||
_record_reconciliation_event(
|
||||
cur,
|
||||
"SIP_EXECUTED",
|
||||
logical_time=logical_time,
|
||||
payload={
|
||||
"nifty_units": cycle_summary["nifty_units"],
|
||||
"gold_units": cycle_summary["gold_units"],
|
||||
"nifty_price": cycle_summary["nifty_price"],
|
||||
"gold_price": cycle_summary["gold_price"],
|
||||
"amount": cycle_summary["amount"],
|
||||
},
|
||||
ts=event_ts,
|
||||
)
|
||||
return True
|
||||
|
||||
if cycle_summary["has_any_fill"] and not cycle_summary["has_unresolved"]:
|
||||
_record_reconciliation_event(
|
||||
cur,
|
||||
"SIP_PARTIAL",
|
||||
logical_time=logical_time,
|
||||
payload={
|
||||
"nifty_units": cycle_summary["nifty_units"],
|
||||
"gold_units": cycle_summary["gold_units"],
|
||||
"nifty_price": cycle_summary["nifty_price"],
|
||||
"gold_price": cycle_summary["gold_price"],
|
||||
"amount": cycle_summary["amount"],
|
||||
"reason": "partial_fill",
|
||||
},
|
||||
ts=event_ts,
|
||||
)
|
||||
return False
|
||||
|
||||
if cycle_summary["has_unresolved"]:
|
||||
_record_reconciliation_event(
|
||||
cur,
|
||||
"ORDER_RECONCILIATION_PENDING",
|
||||
logical_time=logical_time,
|
||||
payload={
|
||||
"logical_time": logical_time.isoformat(),
|
||||
"orders": cycle_orders,
|
||||
},
|
||||
ts=event_ts,
|
||||
)
|
||||
return False
|
||||
|
||||
_record_reconciliation_event(
|
||||
cur,
|
||||
"SIP_NO_FILL",
|
||||
logical_time=logical_time,
|
||||
payload={
|
||||
"reason": failure_reason or "no_fill",
|
||||
"orders": cycle_orders,
|
||||
},
|
||||
ts=event_ts,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def _list_orders_to_reconcile(cur, *, checked_before: datetime):
|
||||
user_id, run_id = _current_scope()
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT local_order_id, logical_time, broker, symbol, side, broker_order_id, requested_qty,
|
||||
requested_price, average_price, status, broker_status, status_message,
|
||||
filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at, created_at
|
||||
FROM broker_order_state
|
||||
WHERE user_id = %s
|
||||
AND run_id = %s
|
||||
AND needs_reconciliation = TRUE
|
||||
AND (last_checked_at IS NULL OR last_checked_at <= %s)
|
||||
ORDER BY logical_time ASC, created_at ASC
|
||||
""",
|
||||
(user_id, run_id, checked_before),
|
||||
)
|
||||
rows = []
|
||||
for row in cur.fetchall():
|
||||
rows.append(
|
||||
{
|
||||
"local_order_id": row[0],
|
||||
"logical_time": row[1],
|
||||
"broker": row[2],
|
||||
"symbol": row[3],
|
||||
"side": row[4],
|
||||
"broker_order_id": row[5],
|
||||
"requested_qty": float(row[6] or 0.0),
|
||||
"requested_price": float(row[7] or 0.0),
|
||||
"average_price": float(row[8] or 0.0),
|
||||
"status": row[9],
|
||||
"broker_status": row[10],
|
||||
"status_message": row[11],
|
||||
"filled_qty": float(row[12] or 0.0),
|
||||
"accounted_fill_qty": float(row[13] or 0.0),
|
||||
"needs_reconciliation": bool(row[14]),
|
||||
"last_checked_at": row[15],
|
||||
"created_at": row[16],
|
||||
"id": row[0],
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def reconcile_live_orders(*, broker: Broker, mode: str | None = "LIVE", now_ts: datetime | None = None):
|
||||
checked_at = _normalize_now(now_ts or _utc_now())
|
||||
blocked = False
|
||||
summaries: list[dict] = []
|
||||
|
||||
pending_orders = run_with_retry(
|
||||
lambda cur, _conn: _list_orders_to_reconcile(
|
||||
cur,
|
||||
checked_before=checked_at - RECONCILIATION_INTERVAL,
|
||||
)
|
||||
)
|
||||
if not pending_orders:
|
||||
return {"blocked": False, "orders": [], "summaries": []}
|
||||
|
||||
def _op(cur, _conn):
|
||||
state = load_state(mode=mode, cur=cur, for_update=True)
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"ORDER_RECONCILIATION_STARTED",
|
||||
data={"count": len(pending_orders)},
|
||||
ts=checked_at,
|
||||
)
|
||||
|
||||
nonlocal blocked
|
||||
for pending in pending_orders:
|
||||
refreshed = broker.refresh_order_status(pending)
|
||||
refreshed["logical_time"] = pending["logical_time"]
|
||||
details = _upsert_broker_order_state(
|
||||
cur,
|
||||
broker_name=str(pending.get("broker") or ""),
|
||||
logical_time=pending["logical_time"],
|
||||
order=refreshed,
|
||||
checked_at=checked_at,
|
||||
)
|
||||
|
||||
if details["status"] != pending.get("status"):
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"ORDER_RECONCILIATION_STATUS_CHANGED",
|
||||
data={
|
||||
"order_id": details["local_order_id"],
|
||||
"from": pending.get("status"),
|
||||
"to": details["status"],
|
||||
},
|
||||
ts=checked_at,
|
||||
)
|
||||
|
||||
if details["status"] == LOCAL_ORDER_PARTIAL:
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"ORDER_PARTIAL_FILL_DETECTED",
|
||||
data={
|
||||
"order_id": details["local_order_id"],
|
||||
"filled_qty": details["filled_qty"],
|
||||
"requested_qty": details["requested_qty"],
|
||||
},
|
||||
ts=checked_at,
|
||||
)
|
||||
|
||||
applied = _apply_fill_delta_to_state(state, details)
|
||||
if details["delta_fill_qty"] > 0:
|
||||
_mark_accounted_fill(
|
||||
cur,
|
||||
details["local_order_id"],
|
||||
details["accounted_fill_qty"] + details["delta_fill_qty"],
|
||||
checked_at,
|
||||
)
|
||||
|
||||
cycle_orders = _load_cycle_order_rows(cur, pending["logical_time"])
|
||||
executed = _advance_cycle_state(
|
||||
cur,
|
||||
state=state,
|
||||
logical_time=pending["logical_time"],
|
||||
cycle_orders=cycle_orders,
|
||||
event_ts=checked_at,
|
||||
)
|
||||
|
||||
unresolved = any(order["needs_reconciliation"] for order in cycle_orders)
|
||||
if unresolved:
|
||||
oldest_ts = min(
|
||||
[
|
||||
pending.get("created_at") or checked_at
|
||||
]
|
||||
+ [checked_at]
|
||||
)
|
||||
if checked_at - oldest_ts >= RECONCILIATION_TIMEOUT:
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"ORDER_RECONCILIATION_TIMEOUT",
|
||||
data={
|
||||
"logical_time": pending["logical_time"].isoformat(),
|
||||
"orders": cycle_orders,
|
||||
},
|
||||
ts=checked_at,
|
||||
)
|
||||
blocked = True
|
||||
|
||||
summaries.append(
|
||||
{
|
||||
"logical_time": pending["logical_time"],
|
||||
"executed": executed,
|
||||
"applied_amount": applied["amount"],
|
||||
"unresolved": unresolved,
|
||||
}
|
||||
)
|
||||
|
||||
save_state(
|
||||
state,
|
||||
mode=mode,
|
||||
cur=cur,
|
||||
emit_event=True,
|
||||
event_meta={"source": "live_reconciliation"},
|
||||
)
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"ORDER_RECONCILIATION_COMPLETE",
|
||||
data={"blocked": blocked, "count": len(pending_orders)},
|
||||
ts=checked_at,
|
||||
)
|
||||
return {"blocked": blocked}
|
||||
|
||||
run_with_retry(_op)
|
||||
return {"blocked": blocked, "orders": pending_orders, "summaries": summaries}
|
||||
|
||||
|
||||
def _record_live_order_events(cur, orders, event_ts):
|
||||
for order in orders:
|
||||
insert_engine_event(cur, "ORDER_PLACED", data=order, ts=event_ts)
|
||||
@ -272,19 +792,53 @@ def _finalize_live_execution(
|
||||
):
|
||||
def _op(cur, _conn):
|
||||
state = load_state(mode=mode, cur=cur, for_update=True)
|
||||
_record_live_order_events(cur, orders, now_ts)
|
||||
|
||||
applied = _apply_filled_orders_to_state(state, orders)
|
||||
executed = applied["amount"] > 0
|
||||
broker_name = ""
|
||||
for order in orders:
|
||||
if not broker_name:
|
||||
broker_name = str(order.get("broker") or "").strip().upper()
|
||||
order.setdefault("logical_time", logical_time)
|
||||
_record_live_order_events(cur, [order], now_ts)
|
||||
details = _upsert_broker_order_state(
|
||||
cur,
|
||||
broker_name=broker_name,
|
||||
logical_time=logical_time,
|
||||
order=order,
|
||||
checked_at=now_ts,
|
||||
)
|
||||
if details["status"] == LOCAL_ORDER_PARTIAL:
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"ORDER_PARTIAL_FILL_DETECTED",
|
||||
data={
|
||||
"order_id": details["local_order_id"],
|
||||
"filled_qty": details["filled_qty"],
|
||||
"requested_qty": details["requested_qty"],
|
||||
},
|
||||
ts=now_ts,
|
||||
)
|
||||
if details["delta_fill_qty"] > 0:
|
||||
_apply_fill_delta_to_state(state, details)
|
||||
_mark_accounted_fill(
|
||||
cur,
|
||||
details["local_order_id"],
|
||||
details["accounted_fill_qty"] + details["delta_fill_qty"],
|
||||
now_ts,
|
||||
)
|
||||
|
||||
if funds_after is not None:
|
||||
cash_after = funds_after.get("cash")
|
||||
if cash_after is not None:
|
||||
state["cash"] = float(cash_after)
|
||||
|
||||
if executed:
|
||||
state["last_run"] = now_ts.isoformat()
|
||||
state["last_sip_ts"] = now_ts.isoformat()
|
||||
cycle_orders = _load_cycle_order_rows(cur, logical_time)
|
||||
executed = _advance_cycle_state(
|
||||
cur,
|
||||
state=state,
|
||||
logical_time=logical_time,
|
||||
cycle_orders=cycle_orders,
|
||||
event_ts=now_ts,
|
||||
failure_reason=failure_reason or ("broker_auth_expired" if auth_failed else None),
|
||||
)
|
||||
|
||||
save_state(
|
||||
state,
|
||||
@ -294,31 +848,6 @@ def _finalize_live_execution(
|
||||
event_meta={"source": "sip_live"},
|
||||
)
|
||||
|
||||
if executed:
|
||||
log_event(
|
||||
"SIP_EXECUTED",
|
||||
{
|
||||
"nifty_units": applied["nifty_units"],
|
||||
"gold_units": applied["gold_units"],
|
||||
"nifty_price": sp_price_val,
|
||||
"gold_price": gd_price_val,
|
||||
"amount": applied["amount"],
|
||||
},
|
||||
cur=cur,
|
||||
ts=now_ts,
|
||||
logical_time=logical_time,
|
||||
)
|
||||
else:
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"SIP_NO_FILL",
|
||||
data={
|
||||
"reason": failure_reason or ("broker_auth_expired" if auth_failed else "no_fill"),
|
||||
"orders": orders,
|
||||
},
|
||||
ts=now_ts,
|
||||
)
|
||||
|
||||
return state, executed
|
||||
|
||||
return run_with_retry(_op)
|
||||
@ -340,6 +869,10 @@ def _try_execute_sip_live(
|
||||
if not market_open or broker is None:
|
||||
return load_state(mode=mode), False
|
||||
|
||||
reconciliation = reconcile_live_orders(broker=broker, mode=mode, now_ts=now_ts)
|
||||
if reconciliation.get("blocked"):
|
||||
return load_state(mode=mode), False
|
||||
|
||||
sp_price_val = _as_float(sp_price)
|
||||
gd_price_val = _as_float(gd_price)
|
||||
eq_w_val = _as_float(eq_w)
|
||||
@ -367,6 +900,7 @@ def _try_execute_sip_live(
|
||||
funds_after = None
|
||||
failure_reason = None
|
||||
auth_failed = False
|
||||
broker_name = getattr(broker, "__class__", type(broker)).__name__.replace("Live", "").replace("Broker", "").upper()
|
||||
|
||||
try:
|
||||
funds_before = broker.get_funds()
|
||||
@ -384,6 +918,8 @@ def _try_execute_sip_live(
|
||||
logical_time=logical_time,
|
||||
)
|
||||
)
|
||||
orders[-1]["broker"] = broker_name
|
||||
orders[-1]["logical_time"] = logical_time
|
||||
if gold_qty > 0:
|
||||
orders.append(
|
||||
broker.place_order(
|
||||
@ -394,6 +930,8 @@ def _try_execute_sip_live(
|
||||
logical_time=logical_time,
|
||||
)
|
||||
)
|
||||
orders[-1]["broker"] = broker_name
|
||||
orders[-1]["logical_time"] = logical_time
|
||||
funds_after = broker.get_funds()
|
||||
except BrokerAuthExpired:
|
||||
auth_failed = True
|
||||
|
||||
@ -12,7 +12,7 @@ from indian_paper_trading_strategy.engine.market import (
|
||||
market_now,
|
||||
market_session,
|
||||
)
|
||||
from indian_paper_trading_strategy.engine.execution import try_execute_sip
|
||||
from indian_paper_trading_strategy.engine.execution import reconcile_live_orders, try_execute_sip
|
||||
from indian_paper_trading_strategy.engine.broker import (
|
||||
BrokerAuthExpired,
|
||||
LiveGrowwBroker,
|
||||
@ -279,7 +279,7 @@ def _pause_for_auth_expiry(
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE strategy_run
|
||||
SET status = 'STOPPED',
|
||||
SET status = 'PAUSED_AUTH_EXPIRED',
|
||||
stopped_at = %s,
|
||||
meta = COALESCE(meta, '{}'::jsonb) || %s
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
@ -296,10 +296,10 @@ def _pause_for_auth_expiry(
|
||||
_set_state(
|
||||
user_id,
|
||||
run_id,
|
||||
state="STOPPED",
|
||||
state="PAUSED_AUTH_EXPIRED",
|
||||
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
||||
)
|
||||
_update_engine_status(user_id, run_id, "STOPPED")
|
||||
_update_engine_status(user_id, run_id, "PAUSED_AUTH_EXPIRED")
|
||||
log_event(
|
||||
event="BROKER_AUTH_EXPIRED",
|
||||
message="Broker authentication expired",
|
||||
@ -317,8 +317,8 @@ def _pause_for_auth_expiry(
|
||||
meta={"reason": reason},
|
||||
)
|
||||
emit_event_cb(
|
||||
event="STRATEGY_STOPPED",
|
||||
message="Strategy stopped",
|
||||
event="STRATEGY_BLOCKED",
|
||||
message="Strategy blocked until broker reconnect",
|
||||
meta={"reason": "broker_auth_expired"},
|
||||
)
|
||||
|
||||
@ -440,12 +440,40 @@ def _engine_loop(config, stop_event: threading.Event):
|
||||
last_run = _last_execution_anchor(state, mode)
|
||||
is_first_run = last_run is None
|
||||
now = market_now()
|
||||
debug_event(
|
||||
"ENGINE_LOOP_TICK",
|
||||
"engine loop tick",
|
||||
{"now": now.isoformat(), "frequency": frequency_label},
|
||||
)
|
||||
|
||||
debug_event(
|
||||
"ENGINE_LOOP_TICK",
|
||||
"engine loop tick",
|
||||
{"now": now.isoformat(), "frequency": frequency_label},
|
||||
)
|
||||
|
||||
if getattr(broker, "external_orders", False):
|
||||
try:
|
||||
reconciliation = reconcile_live_orders(
|
||||
broker=broker,
|
||||
mode=mode,
|
||||
now_ts=now,
|
||||
)
|
||||
if reconciliation.get("blocked"):
|
||||
debug_event(
|
||||
"ORDER_RECONCILIATION_BLOCKED",
|
||||
"Unresolved broker orders are still being reconciled",
|
||||
{"now": now.isoformat()},
|
||||
)
|
||||
except BrokerAuthExpired as exc:
|
||||
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
|
||||
exit_reason = "AUTH_EXPIRED"
|
||||
break
|
||||
except Exception as exc:
|
||||
debug_event(
|
||||
"ORDER_RECONCILIATION_ERROR",
|
||||
"broker order reconciliation failed",
|
||||
{"error": str(exc)},
|
||||
)
|
||||
if not sleep_with_heartbeat(15, stop_event, scope_user, scope_run, owner_id):
|
||||
exit_reason = "LEASE_LOST"
|
||||
break
|
||||
continue
|
||||
|
||||
if last_run and not is_first_run:
|
||||
parsed_last_run = _parse_market_timestamp(last_run)
|
||||
next_run = align_to_market_open(parsed_last_run + delta) if parsed_last_run else None
|
||||
|
||||
@ -7,6 +7,8 @@ $python = Join-Path $root ".venv\\Scripts\\python.exe"
|
||||
$status = [ordered]@{}
|
||||
$engineExternal = $env:ENGINE_EXTERNAL -and $env:ENGINE_EXTERNAL.ToLower() -in @("1", "true", "yes")
|
||||
|
||||
if (-not $env:APP_ENV) { $env:APP_ENV = "development" }
|
||||
|
||||
if (-not $env:DB_HOST) { $env:DB_HOST = "localhost" }
|
||||
if (-not $env:DB_PORT) { $env:DB_PORT = "5432" }
|
||||
if (-not $env:DB_NAME) { $env:DB_NAME = "trading_db" }
|
||||
@ -17,6 +19,12 @@ if (-not $env:PGPORT) { $env:PGPORT = $env:DB_PORT }
|
||||
if (-not $env:PGDATABASE) { $env:PGDATABASE = $env:DB_NAME }
|
||||
if (-not $env:PGUSER) { $env:PGUSER = $env:DB_USER }
|
||||
if (-not $env:PGPASSWORD) { $env:PGPASSWORD = $env:DB_PASSWORD }
|
||||
if (-not $env:BROKER_TOKEN_KEY -and $env:APP_ENV.ToLower() -in @("development", "dev", "test", "testing", "local")) {
|
||||
$env:BROKER_TOKEN_KEY = (& $python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode('utf-8'))").Trim()
|
||||
}
|
||||
if (-not $env:RESET_OTP_SECRET -and $env:APP_ENV.ToLower() -in @("development", "dev", "test", "testing", "local")) {
|
||||
$env:RESET_OTP_SECRET = "dev-reset-otp-secret"
|
||||
}
|
||||
|
||||
function Write-Status {
|
||||
param(
|
||||
@ -130,7 +138,7 @@ Write-Status "Frontend" $true "pid $($frontendProc.Id)"
|
||||
} | ConvertTo-Json | Set-Content -Encoding ascii $pidFile
|
||||
|
||||
Write-Host "Waiting for backend health..."
|
||||
$healthUrl = "http://localhost:8000/health"
|
||||
$healthUrl = "http://localhost:8000/health/ready"
|
||||
$deadline = [DateTime]::UtcNow.AddMinutes(2)
|
||||
$healthy = $false
|
||||
while ([DateTime]::UtcNow -lt $deadline) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user