import os import threading import time from datetime import datetime, timedelta, timezone from indian_paper_trading_strategy.engine.market import is_market_open, align_to_market_open from indian_paper_trading_strategy.engine.execution import try_execute_sip from indian_paper_trading_strategy.engine.broker import PaperBroker from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm from indian_paper_trading_strategy.engine.state import load_state from indian_paper_trading_strategy.engine.data import fetch_live_price from indian_paper_trading_strategy.engine.history import load_monthly_close from indian_paper_trading_strategy.engine.strategy import allocation from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time from indian_paper_trading_strategy.engine.db import db_transaction, insert_engine_event, run_with_retry, get_context, set_context def _update_engine_status(user_id: str, run_id: str, status: str): now = datetime.utcnow().replace(tzinfo=timezone.utc) def _op(cur, _conn): cur.execute( """ INSERT INTO engine_status (user_id, run_id, status, last_updated) VALUES (%s, %s, %s, %s) ON CONFLICT (user_id, run_id) DO UPDATE SET status = EXCLUDED.status, last_updated = EXCLUDED.last_updated """, (user_id, run_id, status, now), ) run_with_retry(_op) NIFTY = "NIFTYBEES.NS" GOLD = "GOLDBEES.NS" SMA_MONTHS = 36 _DEFAULT_ENGINE_STATE = { "state": "STOPPED", "run_id": None, "user_id": None, "last_heartbeat_ts": None, } _ENGINE_STATES = {} _ENGINE_STATES_LOCK = threading.Lock() _RUNNERS = {} _RUNNERS_LOCK = threading.Lock() engine_state = _ENGINE_STATES def _state_key(user_id: str, run_id: str): return (user_id, run_id) def _get_state(user_id: str, run_id: str): key = _state_key(user_id, run_id) with _ENGINE_STATES_LOCK: state = _ENGINE_STATES.get(key) if state is None: state = dict(_DEFAULT_ENGINE_STATE) state["user_id"] = user_id state["run_id"] = run_id _ENGINE_STATES[key] = state return state def _set_state(user_id: str, run_id: str, **updates): key = _state_key(user_id, run_id) with _ENGINE_STATES_LOCK: state = _ENGINE_STATES.get(key) if state is None: state = dict(_DEFAULT_ENGINE_STATE) state["user_id"] = user_id state["run_id"] = run_id _ENGINE_STATES[key] = state state.update(updates) def get_engine_state(user_id: str, run_id: str): state = _get_state(user_id, run_id) return dict(state) def log_event( event: str, data: dict | None = None, message: str | None = None, meta: dict | None = None, ): entry = { "ts": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(), "event": event, } if message is not None or meta is not None: entry["message"] = message or "" entry["meta"] = meta or {} else: entry["data"] = data or {} event_ts = datetime.fromisoformat(entry["ts"].replace("Z", "+00:00")) data = entry.get("data") if "data" in entry else None meta = entry.get("meta") if "meta" in entry else None def _op(cur, _conn): insert_engine_event( cur, entry.get("event"), data=data, message=entry.get("message"), meta=meta, ts=event_ts, ) run_with_retry(_op) def sleep_with_heartbeat( total_seconds: int, stop_event: threading.Event, user_id: str, run_id: str, step_seconds: int = 5, ): remaining = total_seconds while remaining > 0 and not stop_event.is_set(): time.sleep(min(step_seconds, remaining)) _set_state(user_id, run_id, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z") remaining -= step_seconds def _clear_runner(user_id: str, run_id: str): key = _state_key(user_id, run_id) with _RUNNERS_LOCK: _RUNNERS.pop(key, None) def can_execute(now: datetime) -> tuple[bool, str]: if not is_market_open(now): return False, "MARKET_CLOSED" return True, "OK" def _engine_loop(config, stop_event: threading.Event): print("Strategy engine started with config:", config) user_id = config.get("user_id") run_id = config.get("run_id") scope_user, scope_run = get_context(user_id, run_id) set_context(scope_user, scope_run) strategy_name = config.get("strategy_name") or config.get("strategy") or "golden_nifty" sip_amount = config["sip_amount"] configured_frequency = config.get("sip_frequency") or {} if not isinstance(configured_frequency, dict): configured_frequency = {} frequency_value = int(configured_frequency.get("value", config.get("frequency", 0))) frequency_unit = configured_frequency.get("unit", config.get("unit", "days")) frequency_label = f"{frequency_value} {frequency_unit}" emit_event_cb = config.get("emit_event") if not callable(emit_event_cb): emit_event_cb = None debug_enabled = os.getenv("ENGINE_DEBUG", "1").strip().lower() not in {"0", "false", "no"} def debug_event(event: str, message: str, meta: dict | None = None): if not debug_enabled: return try: log_event(event=event, message=message, meta=meta or {}) except Exception: pass if emit_event_cb: emit_event_cb(event=event, message=message, meta=meta or {}) print(f"[ENGINE] {event} {message} {meta or {}}", flush=True) mode = (config.get("mode") or "LIVE").strip().upper() if mode not in {"PAPER", "LIVE"}: mode = "LIVE" broker_type = config.get("broker") or "paper" if broker_type != "paper": broker_type = "paper" if broker_type == "paper": mode = "PAPER" initial_cash = float(config.get("initial_cash", 0)) broker = PaperBroker(initial_cash=initial_cash) log_event( event="DEBUG_PAPER_STORE_PATH", message="Paper broker store path", meta={ "cwd": os.getcwd(), "paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH", "abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A", }, ) if emit_event_cb: emit_event_cb( event="DEBUG_PAPER_STORE_PATH", message="Paper broker store path", meta={ "cwd": os.getcwd(), "paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH", "abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A", }, ) log_event("ENGINE_START", { "strategy": strategy_name, "sip_amount": sip_amount, "frequency": frequency_label, }) debug_event("ENGINE_START_DEBUG", "engine loop started", {"run_id": scope_run, "user_id": scope_user}) _set_state( scope_user, scope_run, state="RUNNING", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z", ) _update_engine_status(scope_user, scope_run, "RUNNING") try: while not stop_event.is_set(): _set_state(scope_user, scope_run, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z") _update_engine_status(scope_user, scope_run, "RUNNING") state = load_state(mode=mode) debug_event( "STATE_LOADED", "loaded engine state", { "last_sip_ts": state.get("last_sip_ts"), "last_run": state.get("last_run"), "cash": state.get("cash"), "total_invested": state.get("total_invested"), }, ) state_frequency = state.get("sip_frequency") if not isinstance(state_frequency, dict): state_frequency = {"value": frequency_value, "unit": frequency_unit} freq = int(state_frequency.get("value", frequency_value)) unit = state_frequency.get("unit", frequency_unit) frequency_label = f"{freq} {unit}" if unit == "minutes": delta = timedelta(minutes=freq) else: delta = timedelta(days=freq) # Gate 2: time to SIP last_run = state.get("last_run") or state.get("last_sip_ts") is_first_run = last_run is None now = datetime.now() debug_event( "ENGINE_LOOP_TICK", "engine loop tick", {"now": now.isoformat(), "frequency": frequency_label}, ) if last_run and not is_first_run: next_run = datetime.fromisoformat(last_run) + delta next_run = align_to_market_open(next_run) if now < next_run: log_event( event="SIP_WAITING", message="Waiting for next SIP window", meta={ "last_run": last_run, "next_eligible": next_run.isoformat(), "now": now.isoformat(), "frequency": frequency_label, }, ) if emit_event_cb: emit_event_cb( event="SIP_WAITING", message="Waiting for next SIP window", meta={ "last_run": last_run, "next_eligible": next_run.isoformat(), "now": now.isoformat(), "frequency": frequency_label, }, ) sleep_with_heartbeat(60, stop_event, scope_user, scope_run) continue try: debug_event("PRICE_FETCH_START", "fetching live prices", {"tickers": [NIFTY, GOLD]}) nifty_price = fetch_live_price(NIFTY) gold_price = fetch_live_price(GOLD) debug_event( "PRICE_FETCHED", "fetched live prices", {"nifty_price": float(nifty_price), "gold_price": float(gold_price)}, ) except Exception as exc: debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)}) sleep_with_heartbeat(30, stop_event, scope_user, scope_run) continue try: nifty_hist = load_monthly_close(NIFTY) gold_hist = load_monthly_close(GOLD) except Exception as exc: debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)}) sleep_with_heartbeat(30, stop_event, scope_user, scope_run) continue nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1] gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1] eq_w, gd_w = allocation( sp_price=nifty_price, gd_price=gold_price, sp_sma=nifty_sma, gd_sma=gold_sma ) debug_event( "WEIGHTS_COMPUTED", "computed allocation weights", {"equity_weight": float(eq_w), "gold_weight": float(gd_w)}, ) weights = {"equity": eq_w, "gold": gd_w} allowed, reason = can_execute(now) executed = False if not allowed: log_event( event="EXECUTION_BLOCKED", message="Execution blocked by market gate", meta={ "reason": reason, "eligible_since": last_run, "checked_at": now.isoformat(), }, ) debug_event("MARKET_GATE", "market closed", {"reason": reason}) if emit_event_cb: emit_event_cb( event="EXECUTION_BLOCKED", message="Execution blocked by market gate", meta={ "reason": reason, "eligible_since": last_run, "checked_at": now.isoformat(), }, ) else: log_event( event="DEBUG_BEFORE_TRY_EXECUTE", message="About to call try_execute_sip", meta={ "last_run": last_run, "frequency": frequency_label, "allowed": allowed, "reason": reason, "sip_amount": sip_amount, "broker": type(broker).__name__, "now": now.isoformat(), }, ) if emit_event_cb: emit_event_cb( event="DEBUG_BEFORE_TRY_EXECUTE", message="About to call try_execute_sip", meta={ "last_run": last_run, "frequency": frequency_label, "allowed": allowed, "reason": reason, "sip_amount": sip_amount, "broker": type(broker).__name__, "now": now.isoformat(), }, ) debug_event( "TRY_EXECUTE_START", "calling try_execute_sip", {"sip_interval_sec": delta.total_seconds(), "sip_amount": sip_amount}, ) state, executed = try_execute_sip( now=now, market_open=True, sip_interval=delta.total_seconds(), sip_amount=sip_amount, sp_price=nifty_price, gd_price=gold_price, eq_w=eq_w, gd_w=gd_w, broker=broker, mode=mode, ) log_event( event="DEBUG_AFTER_TRY_EXECUTE", message="Returned from try_execute_sip", meta={ "executed": executed, "state_last_run": state.get("last_run"), "state_last_sip_ts": state.get("last_sip_ts"), }, ) if emit_event_cb: emit_event_cb( event="DEBUG_AFTER_TRY_EXECUTE", message="Returned from try_execute_sip", meta={ "executed": executed, "state_last_run": state.get("last_run"), "state_last_sip_ts": state.get("last_sip_ts"), }, ) debug_event( "TRY_EXECUTE_DONE", "try_execute_sip finished", {"executed": executed, "last_run": state.get("last_run")}, ) if executed: log_event("SIP_TRIGGERED", { "date": now.date().isoformat(), "allocation": weights, "cash_used": sip_amount }) debug_event("SIP_TRIGGERED", "sip executed", {"cash_used": sip_amount}) portfolio_value = ( state["nifty_units"] * nifty_price + state["gold_units"] * gold_price ) log_event("PORTFOLIO_UPDATED", { "nifty_units": state["nifty_units"], "gold_units": state["gold_units"], "portfolio_value": portfolio_value }) print("SIP executed at", now) if should_log_mtm(None, now): logical_time = normalize_logical_time(now) with db_transaction() as cur: log_mtm( nifty_units=state["nifty_units"], gold_units=state["gold_units"], nifty_price=nifty_price, gold_price=gold_price, total_invested=state["total_invested"], cur=cur, logical_time=logical_time, ) broker.update_equity( {NIFTY: nifty_price, GOLD: gold_price}, now, cur=cur, logical_time=logical_time, ) sleep_with_heartbeat(30, stop_event, scope_user, scope_run) except Exception as e: _set_state(scope_user, scope_run, state="ERROR", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z") _update_engine_status(scope_user, scope_run, "ERROR") log_event("ENGINE_ERROR", {"error": str(e)}) raise log_event("ENGINE_STOP") _set_state( scope_user, scope_run, state="STOPPED", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z", ) _update_engine_status(scope_user, scope_run, "STOPPED") print("Strategy engine stopped") _clear_runner(scope_user, scope_run) def start_engine(config): user_id = config.get("user_id") run_id = config.get("run_id") if not user_id: raise ValueError("user_id is required to start engine") if not run_id: raise ValueError("run_id is required to start engine") with _RUNNERS_LOCK: key = _state_key(user_id, run_id) runner = _RUNNERS.get(key) if runner and runner["thread"].is_alive(): return False stop_event = threading.Event() thread = threading.Thread( target=_engine_loop, args=(config, stop_event), daemon=True, ) _RUNNERS[key] = {"thread": thread, "stop_event": stop_event} thread.start() return True def stop_engine(user_id: str, run_id: str | None = None, timeout: float | None = 10.0): runners = [] with _RUNNERS_LOCK: if run_id: key = _state_key(user_id, run_id) runner = _RUNNERS.get(key) if runner: runners.append((key, runner)) else: for key, runner in list(_RUNNERS.items()): if key[0] == user_id: runners.append((key, runner)) for _key, runner in runners: runner["stop_event"].set() stopped_all = True for key, runner in runners: thread = runner["thread"] if timeout is not None: thread.join(timeout=timeout) stopped = not thread.is_alive() if stopped: _clear_runner(key[0], key[1]) else: stopped_all = False return stopped_all