import json import os import sys import threading from datetime import datetime, timedelta, timezone from pathlib import Path ENGINE_ROOT = Path(__file__).resolve().parents[3] if str(ENGINE_ROOT) not in sys.path: sys.path.append(str(ENGINE_ROOT)) from indian_paper_trading_strategy.engine.market import is_market_open, align_to_market_open from indian_paper_trading_strategy.engine.runner import start_engine, stop_engine from indian_paper_trading_strategy.engine.state import init_paper_state, load_state, save_state from indian_paper_trading_strategy.engine.broker import PaperBroker from indian_paper_trading_strategy.engine.time_utils import frequency_to_timedelta from indian_paper_trading_strategy.engine.db import engine_context from app.broker_store import get_user_broker, set_broker_auth_state from app.services.db import db_connection from app.services.run_service import ( create_strategy_run, get_active_run_id, get_running_run_id, update_run_status, ) from app.services.auth_service import get_user_by_id from app.services.email_service import send_email_async from app.services.zerodha_service import ( KiteTokenError, fetch_funds, ) from app.services.zerodha_storage import get_session from psycopg2.extras import Json from psycopg2 import errors SEQ_LOCK = threading.Lock() SEQ = 0 LAST_WAIT_LOG_TS = {} WAIT_LOG_INTERVAL = timedelta(seconds=60) def init_log_state(): global SEQ with db_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT COALESCE(MAX(seq), 0) FROM strategy_log") row = cur.fetchone() SEQ = row[0] if row and row[0] is not None else 0 def start_new_run(user_id: str, run_id: str): LAST_WAIT_LOG_TS.pop(run_id, None) emit_event( user_id=user_id, run_id=run_id, event="STRATEGY_STARTED", message="Strategy started", meta={}, ) def stop_run(user_id: str, run_id: str, reason="user_request"): emit_event( user_id=user_id, run_id=run_id, event="STRATEGY_STOPPED", message="Strategy stopped", meta={"reason": reason}, ) def emit_event( *, user_id: str, run_id: str, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None ): global SEQ, LAST_WAIT_LOG_TS if not user_id or not run_id: return now = datetime.now(timezone.utc) if event == "SIP_WAITING": last_ts = LAST_WAIT_LOG_TS.get(run_id) if last_ts and (now - last_ts) < WAIT_LOG_INTERVAL: return LAST_WAIT_LOG_TS[run_id] = now with SEQ_LOCK: SEQ += 1 seq = SEQ evt = { "seq": seq, "ts": now.isoformat().replace("+00:00", "Z"), "level": level, "category": category, "event": event, "message": message, "run_id": run_id, "meta": meta or {} } with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO strategy_log ( seq, ts, level, category, event, message, user_id, run_id, meta ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (seq) DO NOTHING """, ( evt["seq"], now, evt["level"], evt["category"], evt["event"], evt["message"], user_id, evt["run_id"], Json(evt["meta"]), ), ) def _maybe_parse_json(value): if value is None: return None if not isinstance(value, str): return value text = value.strip() if not text: return None try: return json.loads(text) except Exception: return value def _local_tz(): return datetime.now().astimezone().tzinfo def _format_local_ts(value: datetime | None): if value is None: return None return value.astimezone(_local_tz()).replace(tzinfo=None).isoformat() def _load_config(user_id: str, run_id: str): with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit, mode, broker, active, frequency, frequency_days, unit, next_run FROM strategy_config WHERE user_id = %s AND run_id = %s LIMIT 1 """, (user_id, run_id), ) row = cur.fetchone() if not row: return {} cfg = { "strategy": row[0], "sip_amount": float(row[1]) if row[1] is not None else None, "mode": row[4], "broker": row[5], "active": row[6], "frequency": _maybe_parse_json(row[7]), "frequency_days": row[8], "unit": row[9], "next_run": _format_local_ts(row[10]), } if row[2] is not None or row[3] is not None: cfg["sip_frequency"] = { "value": row[2], "unit": row[3], } return cfg def _save_config(cfg, user_id: str, run_id: str): sip_frequency = cfg.get("sip_frequency") sip_value = None sip_unit = None if isinstance(sip_frequency, dict): sip_value = sip_frequency.get("value") sip_unit = sip_frequency.get("unit") frequency = cfg.get("frequency") if not isinstance(frequency, str) and frequency is not None: frequency = json.dumps(frequency) next_run = cfg.get("next_run") next_run_dt = None if isinstance(next_run, str): try: parsed = datetime.fromisoformat(next_run) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=_local_tz()) next_run_dt = parsed except ValueError: next_run_dt = None with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO strategy_config ( user_id, run_id, strategy, sip_amount, sip_frequency_value, sip_frequency_unit, mode, broker, active, frequency, frequency_days, unit, next_run ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET strategy = EXCLUDED.strategy, sip_amount = EXCLUDED.sip_amount, sip_frequency_value = EXCLUDED.sip_frequency_value, sip_frequency_unit = EXCLUDED.sip_frequency_unit, mode = EXCLUDED.mode, broker = EXCLUDED.broker, active = EXCLUDED.active, frequency = EXCLUDED.frequency, frequency_days = EXCLUDED.frequency_days, unit = EXCLUDED.unit, next_run = EXCLUDED.next_run """, ( user_id, run_id, cfg.get("strategy"), cfg.get("sip_amount"), sip_value, sip_unit, cfg.get("mode"), cfg.get("broker"), cfg.get("active"), frequency, cfg.get("frequency_days"), cfg.get("unit"), next_run_dt, ), ) def save_strategy_config(cfg, user_id: str, run_id: str): _save_config(cfg, user_id, run_id) def deactivate_strategy_config(user_id: str, run_id: str): cfg = _load_config(user_id, run_id) cfg["active"] = False _save_config(cfg, user_id, run_id) def _write_status(user_id: str, run_id: str, status): now_local = datetime.now().astimezone() with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO engine_status (user_id, run_id, status, last_updated) VALUES (%s, %s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET status = EXCLUDED.status, last_updated = EXCLUDED.last_updated """, (user_id, run_id, status, now_local), ) def validate_frequency(freq: dict, mode: str): if not isinstance(freq, dict): raise ValueError("Frequency payload is required") value = int(freq.get("value", 0)) unit = freq.get("unit") if unit not in {"minutes", "days"}: raise ValueError(f"Unsupported frequency unit: {unit}") if unit == "minutes": if mode != "PAPER": raise ValueError("Minute-level frequency allowed only in PAPER mode") if value < 1: raise ValueError("Minimum frequency is 1 minute") if unit == "days" and value < 1: raise ValueError("Minimum frequency is 1 day") def _validate_live_broker_session(user_id: str): broker_state = get_user_broker(user_id) or {} broker_name = (broker_state.get("broker") or "").strip().upper() if not broker_state.get("connected") or broker_name != "ZERODHA": return False, broker_state, "broker_not_connected" session = get_session(user_id) if not session: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" try: fetch_funds(session["api_key"], session["access_token"]) except KiteTokenError: set_broker_auth_state(user_id, "EXPIRED") return False, broker_state, "broker_auth_required" set_broker_auth_state(user_id, "VALID") return True, broker_state, "ok" def compute_next_eligible(last_run: str | None, sip_frequency: dict | None): if not last_run or not sip_frequency: return None try: last_dt = datetime.fromisoformat(last_run) except ValueError: return None try: delta = frequency_to_timedelta(sip_frequency) except ValueError: return None next_dt = last_dt + delta next_dt = align_to_market_open(next_dt) return next_dt.isoformat() def _last_execution_ts(state: dict, mode: str) -> str | None: mode_key = (mode or "LIVE").strip().upper() if mode_key == "LIVE": return state.get("last_sip_ts") return state.get("last_run") or state.get("last_sip_ts") def start_strategy(req, user_id: str): engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} running_run_id = get_running_run_id(user_id) if running_run_id: running_cfg = _load_config(user_id, running_run_id) running_mode = (running_cfg.get("mode") or req.mode or "PAPER").strip().upper() if running_mode == "LIVE": is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id) if not is_valid: return { "status": "broker_auth_required", "redirect_url": "/api/broker/login", "broker": broker_state.get("broker"), } if engine_external: return {"status": "already_running", "run_id": running_run_id} engine_config = _build_engine_config(user_id, running_run_id, req) if engine_config: started = start_engine(engine_config) if started: _write_status(user_id, running_run_id, "RUNNING") return {"status": "restarted", "run_id": running_run_id} return {"status": "already_running", "run_id": running_run_id} mode = (req.mode or "PAPER").strip().upper() frequency_payload = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency) validate_frequency(frequency_payload, mode) if mode == "PAPER": initial_cash = float(req.initial_cash) if req.initial_cash is not None else 1_000_000.0 broker_name = "paper" elif mode == "LIVE": is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id) if not is_valid: return { "status": "broker_auth_required", "redirect_url": "/api/broker/login", "broker": broker_state.get("broker"), } initial_cash = None broker_name = ((broker_state.get("broker") or "ZERODHA").strip().lower()) else: return {"status": "unsupported_mode"} meta = { "sip_amount": req.sip_amount, "sip_frequency": frequency_payload, } if initial_cash is not None: meta["initial_cash"] = initial_cash try: run_id = create_strategy_run( user_id, strategy=req.strategy_name, mode=mode, broker=broker_name, meta=meta, ) except errors.UniqueViolation: return {"status": "already_running"} with engine_context(user_id, run_id): if mode == "PAPER": init_paper_state(initial_cash, frequency_payload) with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO paper_broker_account (user_id, run_id, cash) VALUES (%s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET cash = EXCLUDED.cash """, (user_id, run_id, initial_cash), ) PaperBroker(initial_cash=initial_cash) else: save_state( { "total_invested": 0.0, "nifty_units": 0.0, "gold_units": 0.0, "last_sip_ts": None, "last_run": None, }, mode="LIVE", emit_event=True, event_meta={"source": "live_start"}, ) config = { "strategy": req.strategy_name, "sip_amount": req.sip_amount, "sip_frequency": frequency_payload, "mode": mode, "broker": broker_name, "active": True, } save_strategy_config(config, user_id, run_id) start_new_run(user_id, run_id) _write_status(user_id, run_id, "RUNNING") if not engine_external: def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None): emit_event( user_id=user_id, run_id=run_id, event=event, message=message, level=level, category=category, meta=meta, ) engine_config = dict(config) if initial_cash is not None: engine_config["initial_cash"] = initial_cash engine_config["run_id"] = run_id engine_config["user_id"] = user_id engine_config["emit_event"] = emit_event_cb start_engine(engine_config) try: user = get_user_by_id(user_id) if user: body = ( "Your strategy has been started.\n\n" f"Strategy: {req.strategy_name}\n" f"Mode: {mode}\n" f"Run ID: {run_id}\n" ) send_email_async(user["username"], "Strategy started", body) except Exception: pass return {"status": "started", "run_id": run_id} def _build_engine_config(user_id: str, run_id: str, req=None): cfg = _load_config(user_id, run_id) sip_frequency = cfg.get("sip_frequency") if not isinstance(sip_frequency, dict) and req is not None: sip_frequency = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency) if not isinstance(sip_frequency, dict): sip_frequency = {"value": cfg.get("frequency_days") or 1, "unit": cfg.get("unit") or "days"} sip_amount = cfg.get("sip_amount") if sip_amount is None and req is not None: sip_amount = req.sip_amount mode = (cfg.get("mode") or (req.mode if req is not None else "PAPER") or "PAPER").strip().upper() broker = cfg.get("broker") or "paper" strategy_name = cfg.get("strategy") or cfg.get("strategy_name") or (req.strategy_name if req is not None else None) with engine_context(user_id, run_id): state = load_state(mode=mode) initial_cash = float(state.get("initial_cash") or 1_000_000.0) def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None): emit_event( user_id=user_id, run_id=run_id, event=event, message=message, level=level, category=category, meta=meta, ) return { "strategy": strategy_name or "Golden Nifty", "sip_amount": sip_amount or 0, "sip_frequency": sip_frequency, "mode": mode, "broker": broker, "active": cfg.get("active", True), "initial_cash": initial_cash, "user_id": user_id, "run_id": run_id, "emit_event": emit_event_cb, } def resume_running_runs(): engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} if engine_external: return with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT user_id, run_id FROM strategy_run WHERE status = 'RUNNING' ORDER BY created_at DESC """ ) runs = cur.fetchall() for user_id, run_id in runs: engine_config = _build_engine_config(user_id, run_id, None) if not engine_config: continue started = start_engine(engine_config) if started: _write_status(user_id, run_id, "RUNNING") def stop_strategy(user_id: str): run_id = get_active_run_id(user_id) engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"} if not engine_external: stop_engine(user_id, run_id, timeout=15.0) deactivate_strategy_config(user_id, run_id) stop_run(user_id, run_id, reason="user_request") _write_status(user_id, run_id, "STOPPED") update_run_status(user_id, run_id, "STOPPED", meta={"reason": "user_request"}) try: user = get_user_by_id(user_id) if user: body = "Your strategy has been stopped." send_email_async(user["username"], "Strategy stopped", body) except Exception: pass return {"status": "stopped"} def get_strategy_status(user_id: str): run_id = get_active_run_id(user_id) with db_connection() as conn: with conn.cursor() as cur: cur.execute( "SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s", (user_id, run_id), ) row = cur.fetchone() if not row: status = {"status": "IDLE", "last_updated": None} else: status = { "status": row[0], "last_updated": _format_local_ts(row[1]), } if status.get("status") == "RUNNING": cfg = _load_config(user_id, run_id) mode = (cfg.get("mode") or "LIVE").strip().upper() with engine_context(user_id, run_id): state = load_state(mode=mode) last_execution_ts = _last_execution_ts(state, mode) sip_frequency = cfg.get("sip_frequency") if not isinstance(sip_frequency, dict): frequency = cfg.get("frequency") unit = cfg.get("unit") if isinstance(frequency, dict): unit = frequency.get("unit", unit) frequency = frequency.get("value") if frequency is None and cfg.get("frequency_days") is not None: frequency = cfg.get("frequency_days") unit = unit or "days" if frequency is not None and unit: sip_frequency = {"value": frequency, "unit": unit} next_eligible = compute_next_eligible(last_execution_ts, sip_frequency) status["last_execution_ts"] = last_execution_ts status["next_eligible_ts"] = next_eligible if next_eligible: try: parsed_next = datetime.fromisoformat(next_eligible) now_cmp = datetime.now(parsed_next.tzinfo) if parsed_next.tzinfo else datetime.now() if parsed_next > now_cmp: status["status"] = "WAITING" except ValueError: pass return status def get_engine_status(user_id: str): run_id = get_active_run_id(user_id) status = { "state": "STOPPED", "run_id": run_id, "user_id": user_id, "last_heartbeat_ts": None, } with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s ORDER BY last_updated DESC LIMIT 1 """, (user_id, run_id), ) row = cur.fetchone() if row: status["state"] = row[0] last_updated = row[1] if last_updated is not None: status["last_heartbeat_ts"] = ( last_updated.astimezone(timezone.utc) .isoformat() .replace("+00:00", "Z") ) cfg = _load_config(user_id, run_id) mode = (cfg.get("mode") or "LIVE").strip().upper() with engine_context(user_id, run_id): state = load_state(mode=mode) last_execution_ts = _last_execution_ts(state, mode) sip_frequency = cfg.get("sip_frequency") if isinstance(sip_frequency, dict): sip_frequency = { "value": sip_frequency.get("value"), "unit": sip_frequency.get("unit"), } else: frequency = cfg.get("frequency") unit = cfg.get("unit") if isinstance(frequency, dict): unit = frequency.get("unit", unit) frequency = frequency.get("value") if frequency is None and cfg.get("frequency_days") is not None: frequency = cfg.get("frequency_days") unit = unit or "days" if frequency is not None and unit: sip_frequency = {"value": frequency, "unit": unit} status["last_execution_ts"] = last_execution_ts status["next_eligible_ts"] = compute_next_eligible(last_execution_ts, sip_frequency) status["run_id"] = run_id return status def get_strategy_logs(user_id: str, since_seq: int): run_id = get_active_run_id(user_id) with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT seq, ts, level, category, event, message, run_id, meta FROM strategy_log WHERE user_id = %s AND run_id = %s AND seq > %s ORDER BY seq """, (user_id, run_id, since_seq), ) rows = cur.fetchall() events = [] for row in rows: ts = row[1] if ts is not None: ts_str = ts.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") else: ts_str = None events.append( { "seq": row[0], "ts": ts_str, "level": row[2], "category": row[3], "event": row[4], "message": row[5], "run_id": row[6], "meta": row[7] if isinstance(row[7], dict) else {}, } ) cur.execute( "SELECT COALESCE(MAX(seq), 0) FROM strategy_log WHERE user_id = %s AND run_id = %s", (user_id, run_id), ) latest_seq = cur.fetchone()[0] return {"events": events, "latest_seq": latest_seq} def _humanize_reason(reason: str | None): if not reason: return None return reason.replace("_", " ").strip().capitalize() def _issue_message(event: str, message: str | None, data: dict | None, meta: dict | None): payload = data if isinstance(data, dict) else {} extra = meta if isinstance(meta, dict) else {} reason = payload.get("reason") or extra.get("reason") reason_key = str(reason or "").strip().lower() if event == "SIP_NO_FILL": if reason_key == "insufficient_funds": return "Insufficient funds for this SIP." if reason_key == "broker_auth_expired": return "Broker session expired. Reconnect broker." if reason_key == "no_fill": return "Order was not filled." return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}." if event == "BROKER_AUTH_EXPIRED": return "Broker session expired. Reconnect broker." if event == "PRICE_FETCH_ERROR": return "Could not fetch prices. Retrying." if event == "HISTORY_LOAD_ERROR": return "Could not load price history. Retrying." if event == "ENGINE_ERROR": return message or "Strategy engine hit an error." if event == "EXECUTION_BLOCKED": if reason_key == "market_closed": return "Market is closed. Execution will resume next session." return f"Execution blocked: {_humanize_reason(reason) or 'Unknown reason'}." if event == "ORDER_REJECTED": return message or payload.get("status_message") or "Broker rejected the order." if event == "ORDER_CANCELLED": return message or "Order was cancelled." return message or _humanize_reason(reason) or "Strategy update available." def get_strategy_summary(user_id: str): run_id = get_active_run_id(user_id) status = get_strategy_status(user_id) next_eligible_ts = status.get("next_eligible_ts") summary = { "run_id": run_id, "status": status.get("status"), "tone": "neutral", "message": "No active strategy.", "event": None, "ts": None, } issue_row = None if run_id: with db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT event, message, data, meta, ts FROM engine_event WHERE user_id = %s AND run_id = %s AND event IN ( 'SIP_NO_FILL', 'BROKER_AUTH_EXPIRED', 'PRICE_FETCH_ERROR', 'HISTORY_LOAD_ERROR', 'ENGINE_ERROR', 'EXECUTION_BLOCKED', 'ORDER_REJECTED', 'ORDER_CANCELLED' ) ORDER BY ts DESC LIMIT 1 """, (user_id, run_id), ) issue_row = cur.fetchone() if issue_row: event, message, data, meta, ts = issue_row summary.update( { "tone": "error" if event in {"ENGINE_ERROR", "ORDER_REJECTED"} else "warning", "message": _issue_message(event, message, data, meta), "event": event, "ts": _format_local_ts(ts), } ) return summary status_key = (status.get("status") or "IDLE").upper() if status_key == "WAITING" and next_eligible_ts: summary.update( { "tone": "warning", "message": f"Waiting until {next_eligible_ts}.", } ) return summary if status_key == "RUNNING": summary.update( { "tone": "success", "message": "Strategy is running.", } ) return summary if status_key == "STOPPED": summary.update( { "tone": "neutral", "message": "Strategy is stopped.", } ) return summary return summary def get_market_status(): now = datetime.now() return { "status": "OPEN" if is_market_open(now) else "CLOSED", "checked_at": now.isoformat(), }