import json import os import sys import threading from datetime import datetime, timedelta, timezone from pathlib import Path ENGINE_ROOT = Path(__file__).resolve().parents[3] if str(ENGINE_ROOT) not in sys.path: sys.path.append(str(ENGINE_ROOT)) from indian_paper_trading_strategy.engine.market import ( align_to_market_open, market_now, market_session, next_market_open_after, ) from indian_paper_trading_strategy.engine.market_calendar import UnsupportedCalendarYearError from indian_paper_trading_strategy.engine.runner import RunLeaseNotAcquiredError, start_engine, stop_engine from indian_paper_trading_strategy.engine.state import init_paper_state, load_state, save_state from indian_paper_trading_strategy.engine.broker import PaperBroker from indian_paper_trading_strategy.engine.time_utils import ( UTC, frequency_to_timedelta, parse_market_timestamp, parse_persisted_timestamp, serialize_timestamp, ) from indian_paper_trading_strategy.engine.db import engine_context from app.broker_store import get_user_broker, set_broker_auth_state from app.services.db import db_connection from app.services.run_service import ( create_strategy_run, get_active_run_id, get_running_run_id, update_run_status, ) from app.services.auth_service import get_user_by_id from app.services.email_service import send_email_async from app.services.groww_service import GrowwApiError, GrowwTokenError, fetch_funds as fetch_groww_funds from app.services.groww_storage import get_session as get_groww_session from app.services.zerodha_service import ( KiteTokenError, fetch_funds as fetch_zerodha_funds, ) from app.services.zerodha_storage import get_session as get_zerodha_session from psycopg2.extras import Json from psycopg2 import errors SEQ_LOCK = threading.Lock() SEQ = 0 LAST_WAIT_LOG_TS = {} WAIT_LOG_INTERVAL = timedelta(seconds=60) def init_log_state(): global SEQ with db_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT COALESCE(MAX(seq), 0) FROM strategy_log") row = cur.fetchone() SEQ = row[0] if row and row[0] is not None else 0 def start_new_run(user_id: str, run_id: str): LAST_WAIT_LOG_TS.pop(run_id, None) emit_event( user_id=user_id, run_id=run_id, event="STRATEGY_STARTED", message="Strategy started", meta={}, ) def stop_run(user_id: str, run_id: str, reason="user_request"): emit_event( user_id=user_id, run_id=run_id, event="STRATEGY_STOPPED", message="Strategy stopped", meta={"reason": reason}, ) def resume_run(user_id: str, run_id: str): emit_event( user_id=user_id, run_id=run_id, event="STRATEGY_RESUMED", message="Strategy resumed", meta={}, ) def emit_event( *, user_id: str, run_id: str, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None ): global SEQ, LAST_WAIT_LOG_TS if not user_id or not run_id: return now = datetime.now(timezone.utc) if event == "SIP_WAITING": last_ts = LAST_WAIT_LOG_TS.get(run_id) if last_ts and (now - last_ts) < WAIT_LOG_INTERVAL: return LAST_WAIT_LOG_TS[run_id] = now with SEQ_LOCK: SEQ += 1 seq = SEQ evt = { "seq": seq, "ts": serialize_timestamp(now), "level": level, "category": category, "event": event, "message": message, "run_id": run_id, "meta": meta or {} } with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO strategy_log ( seq, ts, level, category, event, message, user_id, run_id, meta ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (seq) DO NOTHING """, ( evt["seq"], now, evt["level"], evt["category"], evt["event"], evt["message"], user_id, evt["run_id"], Json(evt["meta"]), ), ) def _maybe_parse_json(value): if value is None: return None if not isinstance(value, str): return value text = value.strip() if not text: return None try: return json.loads(text) except Exception: return value def _utc_now(): return datetime.now(UTC) def _load_config(user_id: str, run_id: str): with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit, mode, broker, active, frequency, frequency_days, unit, next_run FROM strategy_config WHERE user_id = %s AND run_id = %s LIMIT 1 """, (user_id, run_id), ) row = cur.fetchone() if not row: return {} cfg = { "strategy": row[0], "sip_amount": float(row[1]) if row[1] is not None else None, "mode": row[4], "broker": row[5], "active": row[6], "frequency": _maybe_parse_json(row[7]), "frequency_days": row[8], "unit": row[9], "next_run": serialize_timestamp(row[10]), } if row[2] is not None or row[3] is not None: cfg["sip_frequency"] = { "value": row[2], "unit": row[3], } return cfg def _save_config(cfg, user_id: str, run_id: str): sip_frequency = cfg.get("sip_frequency") sip_value = None sip_unit = None if isinstance(sip_frequency, dict): sip_value = sip_frequency.get("value") sip_unit = sip_frequency.get("unit") frequency = cfg.get("frequency") if not isinstance(frequency, str) and frequency is not None: frequency = json.dumps(frequency) next_run = cfg.get("next_run") next_run_dt = None if isinstance(next_run, str): next_run_dt = parse_persisted_timestamp(next_run) with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO strategy_config ( user_id, run_id, strategy, sip_amount, sip_frequency_value, sip_frequency_unit, mode, broker, active, frequency, frequency_days, unit, next_run ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET strategy = EXCLUDED.strategy, sip_amount = EXCLUDED.sip_amount, sip_frequency_value = EXCLUDED.sip_frequency_value, sip_frequency_unit = EXCLUDED.sip_frequency_unit, mode = EXCLUDED.mode, broker = EXCLUDED.broker, active = EXCLUDED.active, frequency = EXCLUDED.frequency, frequency_days = EXCLUDED.frequency_days, unit = EXCLUDED.unit, next_run = EXCLUDED.next_run """, ( user_id, run_id, cfg.get("strategy"), cfg.get("sip_amount"), sip_value, sip_unit, cfg.get("mode"), cfg.get("broker"), cfg.get("active"), frequency, cfg.get("frequency_days"), cfg.get("unit"), next_run_dt, ), ) def save_strategy_config(cfg, user_id: str, run_id: str): _save_config(cfg, user_id, run_id) def deactivate_strategy_config(user_id: str, run_id: str): cfg = _load_config(user_id, run_id) cfg["active"] = False _save_config(cfg, user_id, run_id) def reactivate_strategy_config(user_id: str, run_id: str): cfg = _load_config(user_id, run_id) if not cfg: return {} cfg["active"] = True _save_config(cfg, user_id, run_id) return cfg def _write_status(user_id: str, run_id: str, status): now_local = _utc_now() with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO engine_status (user_id, run_id, status, last_updated) VALUES (%s, %s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET status = EXCLUDED.status, last_updated = EXCLUDED.last_updated """, (user_id, run_id, status, now_local), ) def _get_engine_status_row(user_id: str, run_id: str): if not user_id or not run_id: return None with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s LIMIT 1 """, (user_id, run_id), ) row = cur.fetchone() if not row: return None return {"status": row[0], "last_updated": row[1]} def _effective_running_run_id(user_id: str): run_id = get_running_run_id(user_id) if not run_id: return None engine_row = _get_engine_status_row(user_id, run_id) engine_state = str((engine_row or {}).get("status") or "").strip().upper() if engine_state not in {"STOPPED", "ERROR", "PAUSED_AUTH_EXPIRED"}: return run_id if engine_state == "PAUSED_AUTH_EXPIRED": update_run_status( user_id, run_id, "PAUSED_AUTH_EXPIRED", meta={"reason": "broker_auth_expired", "engine_state": engine_state}, ) return None update_run_status( user_id, run_id, "STOPPED", meta={"reason": "engine_inactive", "engine_state": engine_state}, ) return None def _set_run_status_or_raise(user_id: str, run_id: str, status: str, meta: dict | None = None): updated = update_run_status(user_id, run_id, status, meta=meta) if not updated: raise RuntimeError(f"Run {run_id} for user {user_id} no longer exists") def _get_run_row(user_id: str, run_id: str | None): if not user_id or not run_id: return None with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT status, meta, started_at, stopped_at FROM strategy_run WHERE user_id = %s AND run_id = %s LIMIT 1 """, (user_id, run_id), ) row = cur.fetchone() if not row: return None return { "status": row[0], "meta": row[1] if isinstance(row[1], dict) else {}, "started_at": row[2], "stopped_at": row[3], } def _broker_block_state(user_id: str, cfg: dict | None): config = cfg or {} mode = str(config.get("mode") or "").strip().upper() if mode != "LIVE": return {"blocked": False, "reason": None, "broker_state": None} broker_state = get_user_broker(user_id) or {} auth_state = str(broker_state.get("auth_state") or "").strip().upper() connected = bool(broker_state.get("connected")) broker_name = str(broker_state.get("broker") or "").strip().upper() or None if not connected or auth_state == "DISCONNECTED": return { "blocked": True, "reason": "broker_disconnected", "broker_state": "DISCONNECTED", "broker": broker_name, } if auth_state in {"EXPIRED", "PENDING"}: return { "blocked": True, "reason": "broker_auth_required", "broker_state": auth_state or "EXPIRED", "broker": broker_name, } return { "blocked": False, "reason": None, "broker_state": auth_state or "VALID", "broker": broker_name, } def block_live_strategy_for_broker_disconnect(user_id: str, *, reason: str = "broker_disconnected"): running_run_id = _effective_running_run_id(user_id) run_id = running_run_id or get_active_run_id(user_id) if not run_id: return None cfg = _load_config(user_id, run_id) if str(cfg.get("mode") or "").strip().upper() != "LIVE": return None stop_warning = None try: stop_engine(user_id, run_id, timeout=10.0) except Exception as exc: stop_warning = str(exc) print(f"[STRATEGY] stop_engine failed during broker disconnect for {user_id}/{run_id}: {exc}", flush=True) deactivate_strategy_config(user_id, run_id) stop_run(user_id, run_id, reason=reason) _write_status(user_id, run_id, "STOPPED") _set_run_status_or_raise( user_id, run_id, "STOPPED", meta={"reason": reason, "broker_blocked": True}, ) result = {"run_id": run_id, "status": "STOPPED", "reason": reason} if stop_warning: result["warning"] = stop_warning return result def validate_frequency(freq: dict, mode: str): if not isinstance(freq, dict): raise ValueError("Frequency payload is required") value = int(freq.get("value", 0)) unit = freq.get("unit") if unit not in {"minutes", "days"}: raise ValueError(f"Unsupported frequency unit: {unit}") if unit == "minutes": if value < 1: raise ValueError("Minimum frequency is 1 minute") if unit == "days" and value < 1: raise ValueError("Minimum frequency is 1 day") def _validate_live_broker_session(user_id: str): broker_state = get_user_broker(user_id) or {} broker_name = (broker_state.get("broker") or "").strip().upper() if not broker_state.get("connected") or broker_name not in {"ZERODHA", "GROWW"}: return False, broker_state, "broker_not_connected" if broker_name == "ZERODHA": try: session = get_zerodha_session(user_id) except Exception as exc: print(f"[STRATEGY] failed to load Zerodha session for {user_id}: {exc}", flush=True) set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" if not session: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" api_key = str(session.get("api_key") or "").strip() access_token = str(session.get("access_token") or "").strip() if not api_key or not access_token: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" try: fetch_zerodha_funds(api_key, access_token) except KiteTokenError: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" except Exception as exc: print(f"[STRATEGY] failed to validate Zerodha session for {user_id}: {exc}", flush=True) set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" set_broker_auth_state(user_id, "VALID") return True, broker_state, "ok" try: session = get_groww_session(user_id) except Exception as exc: print(f"[STRATEGY] failed to load Groww session for {user_id}: {exc}", flush=True) set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" if not session: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" access_token = str(session.get("access_token") or "").strip() if not access_token: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" try: fetch_groww_funds(access_token) except GrowwTokenError: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" except GrowwApiError as exc: print(f"[STRATEGY] failed to validate Groww session for {user_id}: {exc}", flush=True) set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" except Exception as exc: print(f"[STRATEGY] failed to validate Groww session for {user_id}: {exc}", flush=True) set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" set_broker_auth_state(user_id, "VALID") return True, broker_state, "ok" def compute_next_eligible(last_run: str | None, sip_frequency: dict | None): if not last_run or not sip_frequency: return None last_dt = parse_market_timestamp(last_run) if last_dt is None: return None try: delta = frequency_to_timedelta(sip_frequency) except ValueError: return None next_dt = last_dt + delta next_dt = align_to_market_open(next_dt) return serialize_timestamp(next_dt) def _last_execution_ts(state: dict, mode: str) -> str | None: mode_key = (mode or "LIVE").strip().upper() if mode_key == "LIVE": return state.get("last_sip_ts") return state.get("last_run") or state.get("last_sip_ts") def start_strategy(req, user_id: str): engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} running_run_id = _effective_running_run_id(user_id) if running_run_id: running_cfg = _load_config(user_id, running_run_id) running_mode = (running_cfg.get("mode") or req.mode or "PAPER").strip().upper() if running_mode == "LIVE": is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id) if not is_valid: return { "status": "broker_auth_required", "redirect_url": "/api/broker/login", "broker": broker_state.get("broker"), } if engine_external: return {"status": "already_running", "run_id": running_run_id} engine_config = _build_engine_config(user_id, running_run_id, req) if engine_config: try: started = start_engine(engine_config) except RunLeaseNotAcquiredError: return {"status": "already_running", "run_id": running_run_id} if started: _write_status(user_id, running_run_id, "RUNNING") return {"status": "restarted", "run_id": running_run_id} return {"status": "already_running", "run_id": running_run_id} mode = (req.mode or "PAPER").strip().upper() frequency_payload = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency) validate_frequency(frequency_payload, mode) if mode == "PAPER": initial_cash = float(req.initial_cash) if req.initial_cash is not None else 1_000_000.0 broker_name = "paper" elif mode == "LIVE": is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id) if not is_valid: return { "status": "broker_auth_required", "redirect_url": "/api/broker/login", "broker": broker_state.get("broker"), } initial_cash = None broker_name = ((broker_state.get("broker") or "ZERODHA").strip().lower()) else: return {"status": "unsupported_mode"} meta = { "sip_amount": req.sip_amount, "sip_frequency": frequency_payload, } if initial_cash is not None: meta["initial_cash"] = initial_cash try: run_id = create_strategy_run( user_id, strategy=req.strategy_name, mode=mode, broker=broker_name, meta=meta, ) except errors.UniqueViolation: return {"status": "already_running"} with engine_context(user_id, run_id): if mode == "PAPER": init_paper_state(initial_cash, frequency_payload) with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO paper_broker_account (user_id, run_id, cash) VALUES (%s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET cash = EXCLUDED.cash """, (user_id, run_id, initial_cash), ) PaperBroker(initial_cash=initial_cash) else: save_state( { "total_invested": 0.0, "nifty_units": 0.0, "gold_units": 0.0, "last_sip_ts": None, "last_run": None, }, mode="LIVE", emit_event=True, event_meta={"source": "live_start"}, ) config = { "strategy": req.strategy_name, "sip_amount": req.sip_amount, "sip_frequency": frequency_payload, "mode": mode, "broker": broker_name, "active": True, } save_strategy_config(config, user_id, run_id) start_new_run(user_id, run_id) _write_status(user_id, run_id, "RUNNING") if not engine_external: def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None): emit_event( user_id=user_id, run_id=run_id, event=event, message=message, level=level, category=category, meta=meta, ) engine_config = dict(config) if initial_cash is not None: engine_config["initial_cash"] = initial_cash engine_config["run_id"] = run_id engine_config["user_id"] = user_id engine_config["emit_event"] = emit_event_cb try: start_engine(engine_config) except RunLeaseNotAcquiredError: pass try: user = get_user_by_id(user_id) if user: body = ( "Your strategy has been started.\n\n" f"Strategy: {req.strategy_name}\n" f"Mode: {mode}\n" f"Run ID: {run_id}\n" ) send_email_async(user["username"], "Strategy started", body) except Exception: pass return {"status": "started", "run_id": run_id} def _build_engine_config(user_id: str, run_id: str, req=None): cfg = _load_config(user_id, run_id) sip_frequency = cfg.get("sip_frequency") if not isinstance(sip_frequency, dict) and req is not None: sip_frequency = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency) if not isinstance(sip_frequency, dict): sip_frequency = {"value": cfg.get("frequency_days") or 1, "unit": cfg.get("unit") or "days"} sip_amount = cfg.get("sip_amount") if sip_amount is None and req is not None: sip_amount = req.sip_amount mode = (cfg.get("mode") or (req.mode if req is not None else "PAPER") or "PAPER").strip().upper() broker = cfg.get("broker") or "paper" strategy_name = cfg.get("strategy") or cfg.get("strategy_name") or (req.strategy_name if req is not None else None) with engine_context(user_id, run_id): state = load_state(mode=mode) initial_cash = float(state.get("initial_cash") or 1_000_000.0) def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None): emit_event( user_id=user_id, run_id=run_id, event=event, message=message, level=level, category=category, meta=meta, ) return { "strategy": strategy_name or "Golden Nifty", "sip_amount": sip_amount or 0, "sip_frequency": sip_frequency, "mode": mode, "broker": broker, "active": cfg.get("active", True), "initial_cash": initial_cash, "user_id": user_id, "run_id": run_id, "emit_event": emit_event_cb, } def resume_running_runs(): engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} if engine_external: return with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT user_id, run_id FROM strategy_run WHERE status = 'RUNNING' ORDER BY created_at DESC """ ) runs = cur.fetchall() for user_id, run_id in runs: engine_config = _build_engine_config(user_id, run_id, None) if not engine_config: continue try: started = start_engine(engine_config) except RunLeaseNotAcquiredError: started = False if started: _write_status(user_id, run_id, "RUNNING") def stop_strategy(user_id: str): run_id = _effective_running_run_id(user_id) if not run_id: latest_run_id = get_running_run_id(user_id) or get_active_run_id(user_id) return {"status": "already_stopped", "run_id": latest_run_id} engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} stop_warning = None if not engine_external: try: stop_engine(user_id, run_id, timeout=15.0) except Exception as exc: print(f"[STRATEGY] stop_engine failed for {user_id}/{run_id}: {exc}", flush=True) stop_warning = str(exc) deactivate_strategy_config(user_id, run_id) stop_run(user_id, run_id, reason="user_request") try: _write_status(user_id, run_id, "STOPPED") except Exception as exc: print(f"[STRATEGY] engine status update failed during stop for {user_id}/{run_id}: {exc}", flush=True) if not stop_warning: stop_warning = str(exc) try: _set_run_status_or_raise(user_id, run_id, "STOPPED", meta={"reason": "user_request"}) except RuntimeError as exc: return { "status": "stop_failed", "run_id": run_id, "message": str(exc), } try: user = get_user_by_id(user_id) if user: body = "Your strategy has been stopped." send_email_async(user["username"], "Strategy stopped", body) except Exception: pass result = {"status": "stopped", "run_id": run_id} if stop_warning: result["warning"] = stop_warning return result def resume_strategy(user_id: str): engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} running_run_id = _effective_running_run_id(user_id) if running_run_id: return {"status": "already_running", "run_id": running_run_id} run_id = get_active_run_id(user_id) if not run_id: return {"status": "no_resumable_run"} cfg = _load_config(user_id, run_id) strategy_name = (cfg.get("strategy") or "").strip() mode = (cfg.get("mode") or "").strip().upper() if not strategy_name or mode not in {"LIVE", "PAPER"}: return {"status": "no_resumable_run"} if mode == "LIVE": is_valid, broker_state, _failure_reason = _validate_live_broker_session(user_id) if not is_valid: return { "status": "broker_auth_required", "redirect_url": "/api/broker/login", "broker": broker_state.get("broker"), } engine_config = None if not engine_external: try: engine_config = _build_engine_config(user_id, run_id, None) except Exception as exc: return { "status": "resume_failed", "run_id": run_id, "message": f"Unable to load the saved strategy state: {exc}", } if not engine_config: return { "status": "resume_failed", "run_id": run_id, "message": "Saved strategy configuration is incomplete.", } reactivate_strategy_config(user_id, run_id) try: _set_run_status_or_raise(user_id, run_id, "RUNNING", meta={"reason": "user_resume"}) except RuntimeError as exc: deactivate_strategy_config(user_id, run_id) return { "status": "resume_failed", "run_id": run_id, "message": str(exc), } _write_status(user_id, run_id, "RUNNING") if not engine_external: try: started = start_engine(engine_config) except RunLeaseNotAcquiredError: return {"status": "already_running", "run_id": run_id} except Exception as exc: deactivate_strategy_config(user_id, run_id) _write_status(user_id, run_id, "STOPPED") _set_run_status_or_raise(user_id, run_id, "STOPPED", meta={"reason": "resume_start_failed"}) return { "status": "resume_failed", "run_id": run_id, "message": f"Unable to resume the strategy engine: {exc}", } if not started: deactivate_strategy_config(user_id, run_id) _write_status(user_id, run_id, "STOPPED") _set_run_status_or_raise(user_id, run_id, "STOPPED", meta={"reason": "resume_start_failed"}) return { "status": "resume_failed", "run_id": run_id, "message": "Strategy engine could not be started.", } resume_run(user_id, run_id) try: user = get_user_by_id(user_id) if user: body = ( "Your strategy has been resumed.\n\n" f"Strategy: {strategy_name}\n" f"Mode: {mode}\n" f"Run ID: {run_id}\n" ) send_email_async(user["username"], "Strategy resumed", body) except Exception: pass return {"status": "resumed", "run_id": run_id} def get_strategy_status(user_id: str): running_run_id = _effective_running_run_id(user_id) run_id = running_run_id or get_active_run_id(user_id) cfg = _load_config(user_id, run_id) if run_id else {} run_row = _get_run_row(user_id, run_id) default_status = (run_row or {}).get("status") or ("RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE")) engine_row = None with db_connection() as conn: with conn.cursor() as cur: cur.execute( "SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s", (user_id, run_id), ) engine_row = cur.fetchone() if not engine_row: status = {"status": default_status, "last_updated": None} else: status = { "status": default_status, "last_updated": serialize_timestamp(engine_row[1]), } status["run_id"] = run_id engine_state = str((engine_row or [None])[0] or "").strip().upper() if running_run_id and engine_state in {"STOPPED", "ERROR"}: status["status"] = "STOPPED" elif engine_state == "PAUSED_AUTH_EXPIRED": status["status"] = "PAUSED_AUTH_EXPIRED" sip_frequency = cfg.get("sip_frequency") if not isinstance(sip_frequency, dict): frequency = cfg.get("frequency") unit = cfg.get("unit") if isinstance(frequency, dict): unit = frequency.get("unit", unit) frequency = frequency.get("value") if frequency is None and cfg.get("frequency_days") is not None: frequency = cfg.get("frequency_days") unit = unit or "days" if frequency is not None and unit: sip_frequency = {"value": frequency, "unit": unit} status["config"] = { "strategy": cfg.get("strategy"), "sip_amount": cfg.get("sip_amount"), "sip_frequency": sip_frequency, "mode": cfg.get("mode"), "broker": cfg.get("broker"), "active": cfg.get("active"), } if running_run_id: mode = (cfg.get("mode") or "LIVE").strip().upper() with engine_context(user_id, run_id): state = load_state(mode=mode) last_execution_ts = _last_execution_ts(state, mode) next_eligible = compute_next_eligible(last_execution_ts, sip_frequency) status["last_execution_ts"] = last_execution_ts status["next_eligible_ts"] = next_eligible if next_eligible: parsed_next = parse_persisted_timestamp(next_eligible) if parsed_next and parsed_next > _utc_now(): status["status"] = "WAITING" broker_block = _broker_block_state(user_id, cfg) status["broker_state"] = broker_block.get("broker_state") status["strategy_blocked"] = bool(broker_block.get("blocked")) status["strategy_block_reason"] = broker_block.get("reason") status["broker"] = broker_block.get("broker") or cfg.get("broker") status_key = (status.get("status") or "IDLE").upper() resumable = bool(cfg.get("strategy")) and bool(cfg.get("mode")) status["can_resume"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"} status["can_restart"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"} return status def get_engine_status(user_id: str): running_run_id = _effective_running_run_id(user_id) run_id = running_run_id or get_active_run_id(user_id) status = { "state": "STOPPED", "run_id": run_id, "user_id": user_id, "last_heartbeat_ts": None, } if running_run_id: with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s ORDER BY last_updated DESC LIMIT 1 """, (user_id, run_id), ) row = cur.fetchone() if row: status["state"] = row[0] last_updated = row[1] if last_updated is not None: status["last_heartbeat_ts"] = serialize_timestamp(last_updated) cfg = _load_config(user_id, run_id) mode = (cfg.get("mode") or "LIVE").strip().upper() with engine_context(user_id, run_id): state = load_state(mode=mode) last_execution_ts = _last_execution_ts(state, mode) sip_frequency = cfg.get("sip_frequency") if isinstance(sip_frequency, dict): sip_frequency = { "value": sip_frequency.get("value"), "unit": sip_frequency.get("unit"), } else: frequency = cfg.get("frequency") unit = cfg.get("unit") if isinstance(frequency, dict): unit = frequency.get("unit", unit) frequency = frequency.get("value") if frequency is None and cfg.get("frequency_days") is not None: frequency = cfg.get("frequency_days") unit = unit or "days" if frequency is not None and unit: sip_frequency = {"value": frequency, "unit": unit} status["last_execution_ts"] = last_execution_ts status["next_eligible_ts"] = compute_next_eligible(last_execution_ts, sip_frequency) status["run_id"] = run_id return status def get_strategy_logs(user_id: str, since_seq: int): run_id = get_active_run_id(user_id) with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT seq, ts, level, category, event, message, run_id, meta FROM strategy_log WHERE user_id = %s AND run_id = %s AND seq > %s ORDER BY seq """, (user_id, run_id, since_seq), ) rows = cur.fetchall() events = [] for row in rows: ts = row[1] ts_str = serialize_timestamp(ts) events.append( { "seq": row[0], "ts": ts_str, "level": row[2], "category": row[3], "event": row[4], "message": row[5], "run_id": row[6], "meta": row[7] if isinstance(row[7], dict) else {}, } ) cur.execute( "SELECT COALESCE(MAX(seq), 0) FROM strategy_log WHERE user_id = %s AND run_id = %s", (user_id, run_id), ) latest_seq = cur.fetchone()[0] return {"events": events, "latest_seq": latest_seq} def _humanize_reason(reason: str | None): if not reason: return None return reason.replace("_", " ").strip().capitalize() def _issue_message(event: str, message: str | None, data: dict | None, meta: dict | None): payload = data if isinstance(data, dict) else {} extra = meta if isinstance(meta, dict) else {} reason = payload.get("reason") or extra.get("reason") reason_key = str(reason or "").strip().lower() if event == "SIP_NO_FILL": if reason_key == "insufficient_funds": return "Insufficient funds for this SIP." if reason_key == "broker_auth_expired": return "Broker session expired. Reconnect broker." if reason_key == "no_fill": return "Order was not filled." return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}." if event == "BROKER_AUTH_EXPIRED": return "Broker session expired. Reconnect broker." if event == "PRICE_FETCH_ERROR": return "Could not fetch prices. Retrying." if event == "HISTORY_LOAD_ERROR": return "Could not load price history. Retrying." if event == "ENGINE_ERROR": return message or "Strategy engine hit an error." if event == "EXECUTION_BLOCKED": if reason_key == "market_holiday": return "Exchange holiday. Execution will resume next session." if reason_key == "market_weekend": return "Weekend closure. Execution will resume next session." if reason_key == "market_pre_open": return "Market has not opened yet. Execution will begin after 9:15 AM IST." if reason_key == "market_post_close": return "Market is closed for the day. Execution will resume next session." if reason_key == "market_calendar_unavailable": return "Market calendar unavailable. Execution paused for safety." if reason_key == "market_closed": return "Market is closed. Execution will resume next session." return f"Execution blocked: {_humanize_reason(reason) or 'Unknown reason'}." if event == "ORDER_REJECTED": return message or payload.get("status_message") or "Broker rejected the order." if event == "ORDER_CANCELLED": return message or "Order was cancelled." return message or _humanize_reason(reason) or "Strategy update available." def _issue_is_stale_for_current_state( user_id: str, status: dict, event: str, data: dict | None, meta: dict | None, ): status_key = (status.get("status") or "IDLE").upper() cfg = status.get("config") if isinstance(status.get("config"), dict) else {} mode = str(cfg.get("mode") or "").strip().upper() payload = data if isinstance(data, dict) else {} extra = meta if isinstance(meta, dict) else {} reason = payload.get("reason") or extra.get("reason") reason_key = str(reason or "").strip().lower() if status_key == "STOPPED" and event in { "BROKER_AUTH_EXPIRED", "EXECUTION_BLOCKED", "SIP_NO_FILL", "PRICE_FETCH_ERROR", "HISTORY_LOAD_ERROR", "ENGINE_ERROR", "ORDER_REJECTED", "ORDER_CANCELLED", }: return True if event == "EXECUTION_BLOCKED" and reason_key.startswith("market_"): current_session = market_session(market_now()) current_reason = str(current_session.get("reason") or "").strip().lower() current_status = str(current_session.get("status") or "").strip().upper() if reason_key == "market_holiday": return current_status != "HOLIDAY" if reason_key == "market_calendar_unavailable": return current_reason != "calendar_unavailable" if reason_key in {"market_weekend", "market_pre_open", "market_post_close", "market_closed"}: return current_status == "OPEN" return False if mode != "LIVE": return False auth_related_issue = event == "BROKER_AUTH_EXPIRED" or ( event == "SIP_NO_FILL" and reason_key == "broker_auth_expired" ) or (event == "EXECUTION_BLOCKED" and reason_key in {"broker_auth_expired", "auth_expired"}) if not auth_related_issue: return False broker_state = get_user_broker(user_id) or {} auth_state = str(broker_state.get("auth_state") or "").strip().upper() return auth_state == "VALID" def get_strategy_summary(user_id: str): run_id = get_active_run_id(user_id) status = get_strategy_status(user_id) next_eligible_ts = status.get("next_eligible_ts") summary = { "run_id": run_id, "status": status.get("status"), "tone": "neutral", "message": "No active strategy.", "event": None, "ts": None, } issue_row = None block_reason = str(status.get("strategy_block_reason") or "").strip().lower() if block_reason == "broker_disconnected": summary.update( { "tone": "warning", "message": "Broker disconnected. Live strategy is stopped until you reconnect.", "event": "BROKER_DISCONNECTED", } ) return summary if block_reason == "broker_auth_required" or (status.get("status") or "").upper() == "PAUSED_AUTH_EXPIRED": summary.update( { "tone": "warning", "message": "Broker session expired. Reconnect broker to resume the strategy.", "event": "BROKER_AUTH_EXPIRED", } ) return summary if run_id: with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT event, message, data, meta, ts FROM engine_event WHERE user_id = %s AND run_id = %s AND event IN ( 'SIP_NO_FILL', 'BROKER_AUTH_EXPIRED', 'PRICE_FETCH_ERROR', 'HISTORY_LOAD_ERROR', 'ENGINE_ERROR', 'EXECUTION_BLOCKED', 'ORDER_REJECTED', 'ORDER_CANCELLED' ) ORDER BY ts DESC LIMIT 1 """, (user_id, run_id), ) issue_row = cur.fetchone() if issue_row: event, message, data, meta, ts = issue_row if not _issue_is_stale_for_current_state(user_id, status, event, data, meta): summary.update( { "tone": "error" if event in {"ENGINE_ERROR", "ORDER_REJECTED"} else "warning", "message": _issue_message(event, message, data, meta), "event": event, "ts": serialize_timestamp(ts), } ) return summary status_key = (status.get("status") or "IDLE").upper() if status_key == "WAITING" and next_eligible_ts: summary.update( { "tone": "warning", "message": f"Waiting until {next_eligible_ts}.", } ) return summary if status_key == "RUNNING": summary.update( { "tone": "success", "message": "Strategy is running.", } ) return summary if status_key == "STOPPED": summary.update( { "tone": "neutral", "message": "Strategy is stopped.", } ) return summary return summary def get_market_status(): now = market_now() session = market_session(now) status = str(session.get("status") or "CLOSED") reason = str(session.get("reason") or "") next_open_at = None try: next_open_at = serialize_timestamp(next_market_open_after(now)) except UnsupportedCalendarYearError: next_open_at = None return { "status": status, "reason": reason, "checked_at": serialize_timestamp(now), "next_open_at": next_open_at, }