839 lines
30 KiB
Python
839 lines
30 KiB
Python
import os
|
|
import socket
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from psycopg2.extras import Json
|
|
|
|
from indian_paper_trading_strategy.engine.market import (
|
|
align_to_market_open,
|
|
market_now,
|
|
market_session,
|
|
)
|
|
from indian_paper_trading_strategy.engine.execution import reconcile_live_orders, try_execute_sip
|
|
from indian_paper_trading_strategy.engine.broker import (
|
|
BrokerAuthExpired,
|
|
LiveGrowwBroker,
|
|
LiveZerodhaBroker,
|
|
PaperBroker,
|
|
)
|
|
from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm
|
|
from indian_paper_trading_strategy.engine.state import load_state
|
|
from indian_paper_trading_strategy.engine.data import fetch_live_price
|
|
from indian_paper_trading_strategy.engine.history import load_monthly_close
|
|
from indian_paper_trading_strategy.engine.strategy import allocation
|
|
from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time
|
|
from app.services.zerodha_service import KiteTokenError
|
|
|
|
from indian_paper_trading_strategy.engine.db import (
|
|
acquire_run_lease,
|
|
db_transaction,
|
|
heartbeat_run_lease,
|
|
insert_engine_event,
|
|
release_run_lease,
|
|
run_with_retry,
|
|
get_context,
|
|
set_context,
|
|
)
|
|
|
|
|
|
def _update_engine_status(user_id: str, run_id: str, status: str):
|
|
now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
|
|
|
def _op(cur, _conn):
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_status (user_id, run_id, status, last_updated)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET status = EXCLUDED.status,
|
|
last_updated = EXCLUDED.last_updated
|
|
""",
|
|
(user_id, run_id, status, now),
|
|
)
|
|
|
|
run_with_retry(_op)
|
|
|
|
NIFTY = "NIFTYBEES.NS"
|
|
GOLD = "GOLDBEES.NS"
|
|
SMA_MONTHS = 36
|
|
|
|
_DEFAULT_ENGINE_STATE = {
|
|
"state": "STOPPED",
|
|
"run_id": None,
|
|
"user_id": None,
|
|
"last_heartbeat_ts": None,
|
|
}
|
|
|
|
_ENGINE_STATES = {}
|
|
_ENGINE_STATES_LOCK = threading.Lock()
|
|
|
|
_RUNNERS = {}
|
|
_RUNNERS_LOCK = threading.Lock()
|
|
|
|
engine_state = _ENGINE_STATES
|
|
|
|
RUNNER_OWNER_ID = os.getenv("RUNNER_OWNER_ID") or f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex}"
|
|
RUN_LEASE_SECONDS = int(os.getenv("RUN_LEASE_SECONDS", "90"))
|
|
|
|
|
|
class RunLeaseNotAcquiredError(RuntimeError):
|
|
def __init__(self, run_id: str, owner_id: str, details: dict | None = None):
|
|
super().__init__(f"Run lease not acquired for run {run_id}")
|
|
self.run_id = run_id
|
|
self.owner_id = owner_id
|
|
self.details = details or {}
|
|
|
|
|
|
def _state_key(user_id: str, run_id: str):
|
|
return (user_id, run_id)
|
|
|
|
|
|
def _get_state(user_id: str, run_id: str):
|
|
key = _state_key(user_id, run_id)
|
|
with _ENGINE_STATES_LOCK:
|
|
state = _ENGINE_STATES.get(key)
|
|
if state is None:
|
|
state = dict(_DEFAULT_ENGINE_STATE)
|
|
state["user_id"] = user_id
|
|
state["run_id"] = run_id
|
|
_ENGINE_STATES[key] = state
|
|
return state
|
|
|
|
|
|
def _set_state(user_id: str, run_id: str, **updates):
|
|
key = _state_key(user_id, run_id)
|
|
with _ENGINE_STATES_LOCK:
|
|
state = _ENGINE_STATES.get(key)
|
|
if state is None:
|
|
state = dict(_DEFAULT_ENGINE_STATE)
|
|
state["user_id"] = user_id
|
|
state["run_id"] = run_id
|
|
_ENGINE_STATES[key] = state
|
|
state.update(updates)
|
|
|
|
|
|
def get_engine_state(user_id: str, run_id: str):
|
|
state = _get_state(user_id, run_id)
|
|
return dict(state)
|
|
|
|
def log_event(
|
|
event: str,
|
|
data: dict | None = None,
|
|
message: str | None = None,
|
|
meta: dict | None = None,
|
|
):
|
|
entry = {
|
|
"ts": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
|
|
"event": event,
|
|
}
|
|
if message is not None or meta is not None:
|
|
entry["message"] = message or ""
|
|
entry["meta"] = meta or {}
|
|
else:
|
|
entry["data"] = data or {}
|
|
event_ts = datetime.fromisoformat(entry["ts"].replace("Z", "+00:00"))
|
|
data = entry.get("data") if "data" in entry else None
|
|
meta = entry.get("meta") if "meta" in entry else None
|
|
|
|
def _op(cur, _conn):
|
|
insert_engine_event(
|
|
cur,
|
|
entry.get("event"),
|
|
data=data,
|
|
message=entry.get("message"),
|
|
meta=meta,
|
|
ts=event_ts,
|
|
)
|
|
|
|
run_with_retry(_op)
|
|
|
|
|
|
def _log_runner_lease_event(
|
|
user_id: str,
|
|
run_id: str,
|
|
event: str,
|
|
message: str,
|
|
meta: dict | None = None,
|
|
):
|
|
details = meta or {}
|
|
print(f"[ENGINE] {event} {message} {details}", flush=True)
|
|
|
|
def _op(cur, _conn):
|
|
insert_engine_event(
|
|
cur,
|
|
event,
|
|
data=details,
|
|
message=message,
|
|
ts=datetime.utcnow().replace(tzinfo=timezone.utc),
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
)
|
|
|
|
try:
|
|
run_with_retry(_op)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _refresh_run_lease_or_stop(
|
|
user_id: str,
|
|
run_id: str,
|
|
owner_id: str,
|
|
):
|
|
lease = heartbeat_run_lease(
|
|
run_id,
|
|
owner_id,
|
|
lease_seconds=RUN_LEASE_SECONDS,
|
|
)
|
|
if lease.get("active"):
|
|
print(
|
|
f"[ENGINE] RUNNER_LEASE_HEARTBEAT lease heartbeat refreshed "
|
|
f"{{'run_id': '{run_id}', 'owner_id': '{owner_id}', 'expires_at': '{lease.get('expires_at')}'}}",
|
|
flush=True,
|
|
)
|
|
return True
|
|
|
|
_log_runner_lease_event(
|
|
user_id,
|
|
run_id,
|
|
"RUNNER_LEASE_LOST",
|
|
"Runner exiting due to lost lease",
|
|
{"owner_id": owner_id},
|
|
)
|
|
return False
|
|
|
|
def sleep_with_heartbeat(
|
|
total_seconds: int,
|
|
stop_event: threading.Event,
|
|
user_id: str,
|
|
run_id: str,
|
|
owner_id: str,
|
|
step_seconds: int = 5,
|
|
):
|
|
remaining = total_seconds
|
|
while remaining > 0 and not stop_event.is_set():
|
|
chunk = min(step_seconds, remaining)
|
|
time.sleep(chunk)
|
|
_set_state(user_id, run_id, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
|
|
_update_engine_status(user_id, run_id, "RUNNING")
|
|
if not _refresh_run_lease_or_stop(user_id, run_id, owner_id):
|
|
return False
|
|
remaining -= chunk
|
|
return True
|
|
|
|
def _clear_runner(user_id: str, run_id: str):
|
|
key = _state_key(user_id, run_id)
|
|
with _RUNNERS_LOCK:
|
|
_RUNNERS.pop(key, None)
|
|
|
|
def can_execute(now: datetime) -> tuple[bool, str]:
|
|
session = market_session(now)
|
|
status = str(session.get("status") or "CLOSED").upper()
|
|
reason = str(session.get("reason") or "").upper()
|
|
if status == "HOLIDAY":
|
|
return False, "MARKET_HOLIDAY"
|
|
if status != "OPEN":
|
|
if reason == "WEEKEND":
|
|
return False, "MARKET_WEEKEND"
|
|
if reason == "PRE_OPEN":
|
|
return False, "MARKET_PRE_OPEN"
|
|
if reason == "POST_CLOSE":
|
|
return False, "MARKET_POST_CLOSE"
|
|
if reason == "CALENDAR_UNAVAILABLE":
|
|
return False, "MARKET_CALENDAR_UNAVAILABLE"
|
|
return False, "MARKET_CLOSED"
|
|
return True, "OK"
|
|
|
|
|
|
def _last_execution_anchor(state: dict, mode: str) -> str | None:
|
|
mode_key = (mode or "LIVE").strip().upper()
|
|
if mode_key == "LIVE":
|
|
return state.get("last_sip_ts")
|
|
return state.get("last_run") or state.get("last_sip_ts")
|
|
|
|
|
|
def _parse_market_timestamp(value: str | None) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
try:
|
|
parsed = datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
market_tz = market_now().tzinfo
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=market_tz)
|
|
return parsed.astimezone(market_tz)
|
|
|
|
|
|
def _pause_for_auth_expiry(
|
|
user_id: str,
|
|
run_id: str,
|
|
reason: str,
|
|
emit_event_cb=None,
|
|
):
|
|
def _op(cur, _conn):
|
|
now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
|
cur.execute(
|
|
"""
|
|
UPDATE strategy_run
|
|
SET status = 'PAUSED_AUTH_EXPIRED',
|
|
stopped_at = %s,
|
|
meta = COALESCE(meta, '{}'::jsonb) || %s
|
|
WHERE user_id = %s AND run_id = %s
|
|
""",
|
|
(
|
|
now,
|
|
Json({"reason": "auth_expired", "lifecycle": "auth_expired"}),
|
|
user_id,
|
|
run_id,
|
|
),
|
|
)
|
|
|
|
run_with_retry(_op)
|
|
_set_state(
|
|
user_id,
|
|
run_id,
|
|
state="PAUSED_AUTH_EXPIRED",
|
|
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
|
)
|
|
_update_engine_status(user_id, run_id, "PAUSED_AUTH_EXPIRED")
|
|
log_event(
|
|
event="BROKER_AUTH_EXPIRED",
|
|
message="Broker authentication expired",
|
|
meta={"reason": reason},
|
|
)
|
|
log_event(
|
|
event="ENGINE_PAUSED",
|
|
message="Engine paused until broker reconnect",
|
|
meta={"reason": "auth_expired"},
|
|
)
|
|
if callable(emit_event_cb):
|
|
emit_event_cb(
|
|
event="BROKER_AUTH_EXPIRED",
|
|
message="Broker authentication expired",
|
|
meta={"reason": reason},
|
|
)
|
|
emit_event_cb(
|
|
event="STRATEGY_BLOCKED",
|
|
message="Strategy blocked until broker reconnect",
|
|
meta={"reason": "broker_auth_expired"},
|
|
)
|
|
|
|
|
|
def _engine_loop(config, stop_event: threading.Event):
|
|
print("Strategy engine started with config:", config)
|
|
|
|
user_id = config.get("user_id")
|
|
run_id = config.get("run_id")
|
|
owner_id = config.get("runner_owner_id") or RUNNER_OWNER_ID
|
|
scope_user, scope_run = get_context(user_id, run_id)
|
|
set_context(scope_user, scope_run)
|
|
|
|
strategy_name = config.get("strategy_name") or config.get("strategy") or "golden_nifty"
|
|
sip_amount = config["sip_amount"]
|
|
configured_frequency = config.get("sip_frequency") or {}
|
|
if not isinstance(configured_frequency, dict):
|
|
configured_frequency = {}
|
|
frequency_value = int(configured_frequency.get("value", config.get("frequency", 0)))
|
|
frequency_unit = configured_frequency.get("unit", config.get("unit", "days"))
|
|
frequency_label = f"{frequency_value} {frequency_unit}"
|
|
emit_event_cb = config.get("emit_event")
|
|
if not callable(emit_event_cb):
|
|
emit_event_cb = None
|
|
debug_enabled = os.getenv("ENGINE_DEBUG", "1").strip().lower() not in {"0", "false", "no"}
|
|
|
|
def debug_event(event: str, message: str, meta: dict | None = None):
|
|
if not debug_enabled:
|
|
return
|
|
try:
|
|
log_event(event=event, message=message, meta=meta or {})
|
|
except Exception:
|
|
pass
|
|
if emit_event_cb:
|
|
emit_event_cb(event=event, message=message, meta=meta or {})
|
|
print(f"[ENGINE] {event} {message} {meta or {}}", flush=True)
|
|
mode = (config.get("mode") or "LIVE").strip().upper()
|
|
if mode not in {"PAPER", "LIVE"}:
|
|
mode = "LIVE"
|
|
broker_type = (config.get("broker") or ("paper" if mode == "PAPER" else "zerodha")).strip().lower()
|
|
initial_cash = float(config.get("initial_cash", 0))
|
|
if broker_type == "paper":
|
|
mode = "PAPER"
|
|
broker = PaperBroker(initial_cash=initial_cash)
|
|
log_event(
|
|
event="DEBUG_PAPER_STORE_PATH",
|
|
message="Paper broker store path",
|
|
meta={
|
|
"cwd": os.getcwd(),
|
|
"paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH",
|
|
"abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A",
|
|
},
|
|
)
|
|
if emit_event_cb:
|
|
emit_event_cb(
|
|
event="DEBUG_PAPER_STORE_PATH",
|
|
message="Paper broker store path",
|
|
meta={
|
|
"cwd": os.getcwd(),
|
|
"paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH",
|
|
"abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A",
|
|
},
|
|
)
|
|
elif broker_type == "zerodha":
|
|
broker = LiveZerodhaBroker(user_id=scope_user, run_id=scope_run)
|
|
elif broker_type == "groww":
|
|
broker = LiveGrowwBroker(user_id=scope_user, run_id=scope_run)
|
|
else:
|
|
raise ValueError(f"Unsupported broker: {broker_type}")
|
|
market_data_provider = "yfinance"
|
|
|
|
log_event("ENGINE_START", {
|
|
"strategy": strategy_name,
|
|
"sip_amount": sip_amount,
|
|
"frequency": frequency_label,
|
|
})
|
|
debug_event("ENGINE_START_DEBUG", "engine loop started", {"run_id": scope_run, "user_id": scope_user})
|
|
|
|
_set_state(
|
|
scope_user,
|
|
scope_run,
|
|
state="RUNNING",
|
|
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
|
)
|
|
_update_engine_status(scope_user, scope_run, "RUNNING")
|
|
exit_reason = "STOPPED"
|
|
|
|
try:
|
|
while not stop_event.is_set():
|
|
if not _refresh_run_lease_or_stop(scope_user, scope_run, owner_id):
|
|
exit_reason = "LEASE_LOST"
|
|
break
|
|
_set_state(scope_user, scope_run, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
|
|
_update_engine_status(scope_user, scope_run, "RUNNING")
|
|
|
|
state = load_state(mode=mode)
|
|
debug_event(
|
|
"STATE_LOADED",
|
|
"loaded engine state",
|
|
{
|
|
"last_sip_ts": state.get("last_sip_ts"),
|
|
"last_run": state.get("last_run"),
|
|
"cash": state.get("cash"),
|
|
"total_invested": state.get("total_invested"),
|
|
},
|
|
)
|
|
state_frequency = state.get("sip_frequency")
|
|
if not isinstance(state_frequency, dict):
|
|
state_frequency = {"value": frequency_value, "unit": frequency_unit}
|
|
freq = int(state_frequency.get("value", frequency_value))
|
|
unit = state_frequency.get("unit", frequency_unit)
|
|
frequency_label = f"{freq} {unit}"
|
|
if unit == "minutes":
|
|
delta = timedelta(minutes=freq)
|
|
else:
|
|
delta = timedelta(days=freq)
|
|
|
|
# Gate 2: time to SIP
|
|
last_run = _last_execution_anchor(state, mode)
|
|
is_first_run = last_run is None
|
|
now = market_now()
|
|
debug_event(
|
|
"ENGINE_LOOP_TICK",
|
|
"engine loop tick",
|
|
{"now": now.isoformat(), "frequency": frequency_label},
|
|
)
|
|
|
|
if getattr(broker, "external_orders", False):
|
|
try:
|
|
reconciliation = reconcile_live_orders(
|
|
broker=broker,
|
|
mode=mode,
|
|
now_ts=now,
|
|
)
|
|
if reconciliation.get("blocked"):
|
|
debug_event(
|
|
"ORDER_RECONCILIATION_BLOCKED",
|
|
"Unresolved broker orders are still being reconciled",
|
|
{"now": now.isoformat()},
|
|
)
|
|
except BrokerAuthExpired as exc:
|
|
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
|
|
exit_reason = "AUTH_EXPIRED"
|
|
break
|
|
except Exception as exc:
|
|
debug_event(
|
|
"ORDER_RECONCILIATION_ERROR",
|
|
"broker order reconciliation failed",
|
|
{"error": str(exc)},
|
|
)
|
|
if not sleep_with_heartbeat(15, stop_event, scope_user, scope_run, owner_id):
|
|
exit_reason = "LEASE_LOST"
|
|
break
|
|
continue
|
|
|
|
if last_run and not is_first_run:
|
|
parsed_last_run = _parse_market_timestamp(last_run)
|
|
next_run = align_to_market_open(parsed_last_run + delta) if parsed_last_run else None
|
|
if next_run is not None and now < next_run:
|
|
wait_seconds = 5 if unit == "minutes" else 60
|
|
log_event(
|
|
event="SIP_WAITING",
|
|
message="Waiting for next SIP window",
|
|
meta={
|
|
"last_run": last_run,
|
|
"next_eligible": next_run.isoformat(),
|
|
"now": now.isoformat(),
|
|
"frequency": frequency_label,
|
|
},
|
|
)
|
|
if emit_event_cb:
|
|
emit_event_cb(
|
|
event="SIP_WAITING",
|
|
message="Waiting for next SIP window",
|
|
meta={
|
|
"last_run": last_run,
|
|
"next_eligible": next_run.isoformat(),
|
|
"now": now.isoformat(),
|
|
"frequency": frequency_label,
|
|
},
|
|
)
|
|
if not sleep_with_heartbeat(wait_seconds, stop_event, scope_user, scope_run, owner_id):
|
|
exit_reason = "LEASE_LOST"
|
|
break
|
|
continue
|
|
|
|
try:
|
|
debug_event("PRICE_FETCH_START", "fetching live prices", {"tickers": [NIFTY, GOLD]})
|
|
nifty_price = fetch_live_price(
|
|
NIFTY,
|
|
provider=market_data_provider,
|
|
user_id=scope_user,
|
|
run_id=scope_run,
|
|
)
|
|
gold_price = fetch_live_price(
|
|
GOLD,
|
|
provider=market_data_provider,
|
|
user_id=scope_user,
|
|
run_id=scope_run,
|
|
)
|
|
debug_event(
|
|
"PRICE_FETCHED",
|
|
"fetched live prices",
|
|
{"nifty_price": float(nifty_price), "gold_price": float(gold_price)},
|
|
)
|
|
except KiteTokenError as exc:
|
|
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
|
|
break
|
|
except Exception as exc:
|
|
debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)})
|
|
if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id):
|
|
exit_reason = "LEASE_LOST"
|
|
break
|
|
continue
|
|
|
|
try:
|
|
nifty_hist = load_monthly_close(
|
|
NIFTY,
|
|
provider=market_data_provider,
|
|
user_id=scope_user,
|
|
run_id=scope_run,
|
|
)
|
|
gold_hist = load_monthly_close(
|
|
GOLD,
|
|
provider=market_data_provider,
|
|
user_id=scope_user,
|
|
run_id=scope_run,
|
|
)
|
|
except KiteTokenError as exc:
|
|
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
|
|
break
|
|
except Exception as exc:
|
|
debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)})
|
|
if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id):
|
|
exit_reason = "LEASE_LOST"
|
|
break
|
|
continue
|
|
|
|
nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
|
gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
|
|
|
eq_w, gd_w = allocation(
|
|
sp_price=nifty_price,
|
|
gd_price=gold_price,
|
|
sp_sma=nifty_sma,
|
|
gd_sma=gold_sma
|
|
)
|
|
debug_event(
|
|
"WEIGHTS_COMPUTED",
|
|
"computed allocation weights",
|
|
{"equity_weight": float(eq_w), "gold_weight": float(gd_w)},
|
|
)
|
|
|
|
weights = {"equity": eq_w, "gold": gd_w}
|
|
allowed, reason = can_execute(now)
|
|
executed = False
|
|
if not allowed:
|
|
log_event(
|
|
event="EXECUTION_BLOCKED",
|
|
message="Execution blocked by market gate",
|
|
meta={
|
|
"reason": reason,
|
|
"eligible_since": last_run,
|
|
"checked_at": now.isoformat(),
|
|
},
|
|
)
|
|
debug_event("MARKET_GATE", "market closed", {"reason": reason})
|
|
if emit_event_cb:
|
|
emit_event_cb(
|
|
event="EXECUTION_BLOCKED",
|
|
message="Execution blocked by market gate",
|
|
meta={
|
|
"reason": reason,
|
|
"eligible_since": last_run,
|
|
"checked_at": now.isoformat(),
|
|
},
|
|
)
|
|
else:
|
|
log_event(
|
|
event="DEBUG_BEFORE_TRY_EXECUTE",
|
|
message="About to call try_execute_sip",
|
|
meta={
|
|
"last_run": last_run,
|
|
"frequency": frequency_label,
|
|
"allowed": allowed,
|
|
"reason": reason,
|
|
"sip_amount": sip_amount,
|
|
"broker": type(broker).__name__,
|
|
"now": now.isoformat(),
|
|
},
|
|
)
|
|
if emit_event_cb:
|
|
emit_event_cb(
|
|
event="DEBUG_BEFORE_TRY_EXECUTE",
|
|
message="About to call try_execute_sip",
|
|
meta={
|
|
"last_run": last_run,
|
|
"frequency": frequency_label,
|
|
"allowed": allowed,
|
|
"reason": reason,
|
|
"sip_amount": sip_amount,
|
|
"broker": type(broker).__name__,
|
|
"now": now.isoformat(),
|
|
},
|
|
)
|
|
debug_event(
|
|
"TRY_EXECUTE_START",
|
|
"calling try_execute_sip",
|
|
{"sip_interval_sec": delta.total_seconds(), "sip_amount": sip_amount},
|
|
)
|
|
state, executed = try_execute_sip(
|
|
now=now,
|
|
market_open=True,
|
|
sip_interval=delta.total_seconds(),
|
|
sip_amount=sip_amount,
|
|
sp_price=nifty_price,
|
|
gd_price=gold_price,
|
|
eq_w=eq_w,
|
|
gd_w=gd_w,
|
|
broker=broker,
|
|
mode=mode,
|
|
)
|
|
log_event(
|
|
event="DEBUG_AFTER_TRY_EXECUTE",
|
|
message="Returned from try_execute_sip",
|
|
meta={
|
|
"executed": executed,
|
|
"state_last_run": state.get("last_run"),
|
|
"state_last_sip_ts": state.get("last_sip_ts"),
|
|
},
|
|
)
|
|
if emit_event_cb:
|
|
emit_event_cb(
|
|
event="DEBUG_AFTER_TRY_EXECUTE",
|
|
message="Returned from try_execute_sip",
|
|
meta={
|
|
"executed": executed,
|
|
"state_last_run": state.get("last_run"),
|
|
"state_last_sip_ts": state.get("last_sip_ts"),
|
|
},
|
|
)
|
|
debug_event(
|
|
"TRY_EXECUTE_DONE",
|
|
"try_execute_sip finished",
|
|
{"executed": executed, "last_run": state.get("last_run")},
|
|
)
|
|
|
|
if executed:
|
|
log_event("SIP_TRIGGERED", {
|
|
"date": now.date().isoformat(),
|
|
"allocation": weights,
|
|
"cash_used": sip_amount
|
|
})
|
|
debug_event("SIP_TRIGGERED", "sip executed", {"cash_used": sip_amount})
|
|
portfolio_value = (
|
|
state["nifty_units"] * nifty_price
|
|
+ state["gold_units"] * gold_price
|
|
)
|
|
log_event("PORTFOLIO_UPDATED", {
|
|
"nifty_units": state["nifty_units"],
|
|
"gold_units": state["gold_units"],
|
|
"portfolio_value": portfolio_value
|
|
})
|
|
print("SIP executed at", now)
|
|
|
|
if should_log_mtm(None, now):
|
|
logical_time = normalize_logical_time(now)
|
|
with db_transaction() as cur:
|
|
log_mtm(
|
|
nifty_units=state["nifty_units"],
|
|
gold_units=state["gold_units"],
|
|
nifty_price=nifty_price,
|
|
gold_price=gold_price,
|
|
total_invested=state["total_invested"],
|
|
cur=cur,
|
|
logical_time=logical_time,
|
|
)
|
|
broker.update_equity(
|
|
{NIFTY: nifty_price, GOLD: gold_price},
|
|
now,
|
|
cur=cur,
|
|
logical_time=logical_time,
|
|
)
|
|
|
|
if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id):
|
|
exit_reason = "LEASE_LOST"
|
|
break
|
|
except BrokerAuthExpired as exc:
|
|
exit_reason = "AUTH_EXPIRED"
|
|
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
|
|
print(f"[ENGINE] broker auth expired for run {scope_run}: {exc}", flush=True)
|
|
except Exception as e:
|
|
exit_reason = "ERROR"
|
|
_set_state(scope_user, scope_run, state="ERROR", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
|
|
_update_engine_status(scope_user, scope_run, "ERROR")
|
|
log_event("ENGINE_ERROR", {"error": str(e)})
|
|
raise
|
|
finally:
|
|
try:
|
|
released = release_run_lease(scope_run, owner_id)
|
|
if released:
|
|
print(
|
|
f"[ENGINE] RUNNER_LEASE_RELEASED released run lease "
|
|
f"{{'run_id': '{scope_run}', 'owner_id': '{owner_id}'}}",
|
|
flush=True,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
if exit_reason not in {"ERROR", "LEASE_LOST", "AUTH_EXPIRED"}:
|
|
log_event("ENGINE_STOP")
|
|
_set_state(
|
|
scope_user,
|
|
scope_run,
|
|
state="STOPPED",
|
|
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
|
)
|
|
_update_engine_status(scope_user, scope_run, "STOPPED")
|
|
print("Strategy engine stopped")
|
|
elif exit_reason == "LEASE_LOST":
|
|
_set_state(
|
|
scope_user,
|
|
scope_run,
|
|
state="STOPPED",
|
|
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
|
)
|
|
_clear_runner(scope_user, scope_run)
|
|
|
|
def start_engine(config):
|
|
user_id = config.get("user_id")
|
|
run_id = config.get("run_id")
|
|
if not user_id:
|
|
raise ValueError("user_id is required to start engine")
|
|
if not run_id:
|
|
raise ValueError("run_id is required to start engine")
|
|
|
|
with _RUNNERS_LOCK:
|
|
key = _state_key(user_id, run_id)
|
|
runner = _RUNNERS.get(key)
|
|
if runner and runner["thread"].is_alive():
|
|
return False
|
|
|
|
lease = acquire_run_lease(
|
|
run_id,
|
|
RUNNER_OWNER_ID,
|
|
lease_seconds=RUN_LEASE_SECONDS,
|
|
)
|
|
if not lease.get("acquired"):
|
|
_log_runner_lease_event(
|
|
user_id,
|
|
run_id,
|
|
"RUNNER_LEASE_DENIED",
|
|
"Run lease denied",
|
|
{
|
|
"owner_id": RUNNER_OWNER_ID,
|
|
"current_owner": lease.get("owner_id"),
|
|
"expires_at": lease.get("expires_at").isoformat() if lease.get("expires_at") else None,
|
|
},
|
|
)
|
|
raise RunLeaseNotAcquiredError(run_id, RUNNER_OWNER_ID, lease)
|
|
|
|
lease_status = str(lease.get("status") or "ACQUIRED").upper()
|
|
event_name = "RUNNER_LEASE_REACQUIRED" if lease_status == "REACQUIRED" else "RUNNER_LEASE_ACQUIRED"
|
|
_log_runner_lease_event(
|
|
user_id,
|
|
run_id,
|
|
event_name,
|
|
"Run lease acquired" if lease_status != "REACQUIRED" else "Expired run lease reacquired",
|
|
{
|
|
"owner_id": RUNNER_OWNER_ID,
|
|
"expires_at": lease.get("expires_at").isoformat() if lease.get("expires_at") else None,
|
|
},
|
|
)
|
|
|
|
stop_event = threading.Event()
|
|
thread_config = dict(config)
|
|
thread_config["runner_owner_id"] = RUNNER_OWNER_ID
|
|
thread = threading.Thread(
|
|
target=_engine_loop,
|
|
args=(thread_config, stop_event),
|
|
daemon=True,
|
|
)
|
|
_RUNNERS[key] = {"thread": thread, "stop_event": stop_event}
|
|
try:
|
|
thread.start()
|
|
except Exception:
|
|
_RUNNERS.pop(key, None)
|
|
release_run_lease(run_id, RUNNER_OWNER_ID)
|
|
raise
|
|
return True
|
|
|
|
def stop_engine(user_id: str, run_id: str | None = None, timeout: float | None = 10.0):
|
|
runners = []
|
|
with _RUNNERS_LOCK:
|
|
if run_id:
|
|
key = _state_key(user_id, run_id)
|
|
runner = _RUNNERS.get(key)
|
|
if runner:
|
|
runners.append((key, runner))
|
|
else:
|
|
for key, runner in list(_RUNNERS.items()):
|
|
if key[0] == user_id:
|
|
runners.append((key, runner))
|
|
for _key, runner in runners:
|
|
runner["stop_event"].set()
|
|
stopped_all = True
|
|
for key, runner in runners:
|
|
thread = runner["thread"]
|
|
if timeout is not None:
|
|
thread.join(timeout=timeout)
|
|
stopped = not thread.is_alive()
|
|
if stopped:
|
|
_clear_runner(key[0], key[1])
|
|
else:
|
|
stopped_all = False
|
|
return stopped_all
|
|
|