1126 lines
39 KiB
Python
1126 lines
39 KiB
Python
import json
|
|
import os
|
|
import sys
|
|
import threading
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
ENGINE_ROOT = Path(__file__).resolve().parents[3]
|
|
if str(ENGINE_ROOT) not in sys.path:
|
|
sys.path.append(str(ENGINE_ROOT))
|
|
|
|
from indian_paper_trading_strategy.engine.market import is_market_open, align_to_market_open, market_now
|
|
from indian_paper_trading_strategy.engine.runner import start_engine, stop_engine
|
|
from indian_paper_trading_strategy.engine.state import init_paper_state, load_state, save_state
|
|
from indian_paper_trading_strategy.engine.broker import PaperBroker
|
|
from indian_paper_trading_strategy.engine.time_utils import frequency_to_timedelta
|
|
from indian_paper_trading_strategy.engine.db import engine_context
|
|
|
|
from app.broker_store import get_user_broker, set_broker_auth_state
|
|
from app.services.db import db_connection
|
|
from app.services.run_service import (
|
|
create_strategy_run,
|
|
get_active_run_id,
|
|
get_running_run_id,
|
|
update_run_status,
|
|
)
|
|
from app.services.auth_service import get_user_by_id
|
|
from app.services.email_service import send_email_async
|
|
from app.services.groww_service import GrowwApiError, GrowwTokenError, fetch_funds as fetch_groww_funds
|
|
from app.services.groww_storage import get_session as get_groww_session
|
|
from app.services.zerodha_service import (
|
|
KiteTokenError,
|
|
fetch_funds as fetch_zerodha_funds,
|
|
)
|
|
from app.services.zerodha_storage import get_session as get_zerodha_session
|
|
from psycopg2.extras import Json
|
|
from psycopg2 import errors
|
|
|
|
SEQ_LOCK = threading.Lock()
|
|
SEQ = 0
|
|
LAST_WAIT_LOG_TS = {}
|
|
WAIT_LOG_INTERVAL = timedelta(seconds=60)
|
|
IST = ZoneInfo("Asia/Kolkata")
|
|
|
|
def init_log_state():
|
|
global SEQ
|
|
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT COALESCE(MAX(seq), 0) FROM strategy_log")
|
|
row = cur.fetchone()
|
|
SEQ = row[0] if row and row[0] is not None else 0
|
|
|
|
def start_new_run(user_id: str, run_id: str):
|
|
LAST_WAIT_LOG_TS.pop(run_id, None)
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event="STRATEGY_STARTED",
|
|
message="Strategy started",
|
|
meta={},
|
|
)
|
|
|
|
|
|
def stop_run(user_id: str, run_id: str, reason="user_request"):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event="STRATEGY_STOPPED",
|
|
message="Strategy stopped",
|
|
meta={"reason": reason},
|
|
)
|
|
|
|
|
|
def resume_run(user_id: str, run_id: str):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event="STRATEGY_RESUMED",
|
|
message="Strategy resumed",
|
|
meta={},
|
|
)
|
|
|
|
|
|
def emit_event(
|
|
*,
|
|
user_id: str,
|
|
run_id: str,
|
|
event: str,
|
|
message: str,
|
|
level: str = "INFO",
|
|
category: str = "ENGINE",
|
|
meta: dict | None = None
|
|
):
|
|
global SEQ, LAST_WAIT_LOG_TS
|
|
if not user_id or not run_id:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if event == "SIP_WAITING":
|
|
last_ts = LAST_WAIT_LOG_TS.get(run_id)
|
|
if last_ts and (now - last_ts) < WAIT_LOG_INTERVAL:
|
|
return
|
|
LAST_WAIT_LOG_TS[run_id] = now
|
|
|
|
with SEQ_LOCK:
|
|
SEQ += 1
|
|
seq = SEQ
|
|
|
|
evt = {
|
|
"seq": seq,
|
|
"ts": now.isoformat().replace("+00:00", "Z"),
|
|
"level": level,
|
|
"category": category,
|
|
"event": event,
|
|
"message": message,
|
|
"run_id": run_id,
|
|
"meta": meta or {}
|
|
}
|
|
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO strategy_log (
|
|
seq, ts, level, category, event, message, user_id, run_id, meta
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (seq) DO NOTHING
|
|
""",
|
|
(
|
|
evt["seq"],
|
|
now,
|
|
evt["level"],
|
|
evt["category"],
|
|
evt["event"],
|
|
evt["message"],
|
|
user_id,
|
|
evt["run_id"],
|
|
Json(evt["meta"]),
|
|
),
|
|
)
|
|
|
|
def _maybe_parse_json(value):
|
|
if value is None:
|
|
return None
|
|
if not isinstance(value, str):
|
|
return value
|
|
text = value.strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
return json.loads(text)
|
|
except Exception:
|
|
return value
|
|
|
|
|
|
def _local_tz():
|
|
return IST
|
|
|
|
|
|
def _format_local_ts(value: datetime | None):
|
|
if value is None:
|
|
return None
|
|
return value.astimezone(_local_tz()).replace(tzinfo=None).isoformat()
|
|
|
|
|
|
def _load_config(user_id: str, run_id: str):
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit,
|
|
mode, broker, active, frequency, frequency_days, unit, next_run
|
|
FROM strategy_config
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return {}
|
|
cfg = {
|
|
"strategy": row[0],
|
|
"sip_amount": float(row[1]) if row[1] is not None else None,
|
|
"mode": row[4],
|
|
"broker": row[5],
|
|
"active": row[6],
|
|
"frequency": _maybe_parse_json(row[7]),
|
|
"frequency_days": row[8],
|
|
"unit": row[9],
|
|
"next_run": _format_local_ts(row[10]),
|
|
}
|
|
if row[2] is not None or row[3] is not None:
|
|
cfg["sip_frequency"] = {
|
|
"value": row[2],
|
|
"unit": row[3],
|
|
}
|
|
return cfg
|
|
|
|
|
|
def _save_config(cfg, user_id: str, run_id: str):
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
sip_value = None
|
|
sip_unit = None
|
|
if isinstance(sip_frequency, dict):
|
|
sip_value = sip_frequency.get("value")
|
|
sip_unit = sip_frequency.get("unit")
|
|
|
|
frequency = cfg.get("frequency")
|
|
if not isinstance(frequency, str) and frequency is not None:
|
|
frequency = json.dumps(frequency)
|
|
|
|
next_run = cfg.get("next_run")
|
|
next_run_dt = None
|
|
if isinstance(next_run, str):
|
|
try:
|
|
parsed = datetime.fromisoformat(next_run)
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=_local_tz())
|
|
next_run_dt = parsed
|
|
except ValueError:
|
|
next_run_dt = None
|
|
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO strategy_config (
|
|
user_id,
|
|
run_id,
|
|
strategy,
|
|
sip_amount,
|
|
sip_frequency_value,
|
|
sip_frequency_unit,
|
|
mode,
|
|
broker,
|
|
active,
|
|
frequency,
|
|
frequency_days,
|
|
unit,
|
|
next_run
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET strategy = EXCLUDED.strategy,
|
|
sip_amount = EXCLUDED.sip_amount,
|
|
sip_frequency_value = EXCLUDED.sip_frequency_value,
|
|
sip_frequency_unit = EXCLUDED.sip_frequency_unit,
|
|
mode = EXCLUDED.mode,
|
|
broker = EXCLUDED.broker,
|
|
active = EXCLUDED.active,
|
|
frequency = EXCLUDED.frequency,
|
|
frequency_days = EXCLUDED.frequency_days,
|
|
unit = EXCLUDED.unit,
|
|
next_run = EXCLUDED.next_run
|
|
""",
|
|
(
|
|
user_id,
|
|
run_id,
|
|
cfg.get("strategy"),
|
|
cfg.get("sip_amount"),
|
|
sip_value,
|
|
sip_unit,
|
|
cfg.get("mode"),
|
|
cfg.get("broker"),
|
|
cfg.get("active"),
|
|
frequency,
|
|
cfg.get("frequency_days"),
|
|
cfg.get("unit"),
|
|
next_run_dt,
|
|
),
|
|
)
|
|
|
|
def save_strategy_config(cfg, user_id: str, run_id: str):
|
|
_save_config(cfg, user_id, run_id)
|
|
|
|
def deactivate_strategy_config(user_id: str, run_id: str):
|
|
cfg = _load_config(user_id, run_id)
|
|
cfg["active"] = False
|
|
_save_config(cfg, user_id, run_id)
|
|
|
|
|
|
def reactivate_strategy_config(user_id: str, run_id: str):
|
|
cfg = _load_config(user_id, run_id)
|
|
if not cfg:
|
|
return {}
|
|
cfg["active"] = True
|
|
_save_config(cfg, user_id, run_id)
|
|
return cfg
|
|
|
|
def _write_status(user_id: str, run_id: str, status):
|
|
now_local = market_now()
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_status (user_id, run_id, status, last_updated)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET status = EXCLUDED.status,
|
|
last_updated = EXCLUDED.last_updated
|
|
""",
|
|
(user_id, run_id, status, now_local),
|
|
)
|
|
|
|
|
|
def _get_engine_status_row(user_id: str, run_id: str):
|
|
if not user_id or not run_id:
|
|
return None
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT status, last_updated
|
|
FROM engine_status
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return {"status": row[0], "last_updated": row[1]}
|
|
|
|
|
|
def _effective_running_run_id(user_id: str):
|
|
run_id = get_running_run_id(user_id)
|
|
if not run_id:
|
|
return None
|
|
engine_row = _get_engine_status_row(user_id, run_id)
|
|
engine_state = str((engine_row or {}).get("status") or "").strip().upper()
|
|
if engine_state not in {"STOPPED", "ERROR"}:
|
|
return run_id
|
|
update_run_status(
|
|
user_id,
|
|
run_id,
|
|
"STOPPED",
|
|
meta={"reason": "engine_inactive", "engine_state": engine_state},
|
|
)
|
|
return None
|
|
|
|
def validate_frequency(freq: dict, mode: str):
|
|
if not isinstance(freq, dict):
|
|
raise ValueError("Frequency payload is required")
|
|
value = int(freq.get("value", 0))
|
|
unit = freq.get("unit")
|
|
|
|
if unit not in {"minutes", "days"}:
|
|
raise ValueError(f"Unsupported frequency unit: {unit}")
|
|
|
|
if unit == "minutes":
|
|
if value < 1:
|
|
raise ValueError("Minimum frequency is 1 minute")
|
|
|
|
if unit == "days" and value < 1:
|
|
raise ValueError("Minimum frequency is 1 day")
|
|
|
|
|
|
def _validate_live_broker_session(user_id: str):
|
|
broker_state = get_user_broker(user_id) or {}
|
|
broker_name = (broker_state.get("broker") or "").strip().upper()
|
|
if not broker_state.get("connected") or broker_name not in {"ZERODHA", "GROWW"}:
|
|
return False, broker_state, "broker_not_connected"
|
|
|
|
if broker_name == "ZERODHA":
|
|
try:
|
|
session = get_zerodha_session(user_id)
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to load Zerodha session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
if not session:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
api_key = str(session.get("api_key") or "").strip()
|
|
access_token = str(session.get("access_token") or "").strip()
|
|
if not api_key or not access_token:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
try:
|
|
fetch_zerodha_funds(api_key, access_token)
|
|
except KiteTokenError:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to validate Zerodha session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
set_broker_auth_state(user_id, "VALID")
|
|
return True, broker_state, "ok"
|
|
|
|
try:
|
|
session = get_groww_session(user_id)
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to load Groww session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
if not session:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
access_token = str(session.get("access_token") or "").strip()
|
|
if not access_token:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
try:
|
|
fetch_groww_funds(access_token)
|
|
except GrowwTokenError:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
except GrowwApiError as exc:
|
|
print(f"[STRATEGY] failed to validate Groww session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to validate Groww session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
set_broker_auth_state(user_id, "VALID")
|
|
return True, broker_state, "ok"
|
|
|
|
def compute_next_eligible(last_run: str | None, sip_frequency: dict | None):
|
|
if not last_run or not sip_frequency:
|
|
return None
|
|
try:
|
|
last_dt = datetime.fromisoformat(last_run)
|
|
except ValueError:
|
|
return None
|
|
try:
|
|
delta = frequency_to_timedelta(sip_frequency)
|
|
except ValueError:
|
|
return None
|
|
next_dt = last_dt + delta
|
|
next_dt = align_to_market_open(next_dt)
|
|
return next_dt.isoformat()
|
|
|
|
|
|
def _last_execution_ts(state: dict, mode: str) -> str | None:
|
|
mode_key = (mode or "LIVE").strip().upper()
|
|
if mode_key == "LIVE":
|
|
return state.get("last_sip_ts")
|
|
return state.get("last_run") or state.get("last_sip_ts")
|
|
|
|
def start_strategy(req, user_id: str):
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
if running_run_id:
|
|
running_cfg = _load_config(user_id, running_run_id)
|
|
running_mode = (running_cfg.get("mode") or req.mode or "PAPER").strip().upper()
|
|
if running_mode == "LIVE":
|
|
is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id)
|
|
if not is_valid:
|
|
return {
|
|
"status": "broker_auth_required",
|
|
"redirect_url": "/api/broker/login",
|
|
"broker": broker_state.get("broker"),
|
|
}
|
|
if engine_external:
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
engine_config = _build_engine_config(user_id, running_run_id, req)
|
|
if engine_config:
|
|
started = start_engine(engine_config)
|
|
if started:
|
|
_write_status(user_id, running_run_id, "RUNNING")
|
|
return {"status": "restarted", "run_id": running_run_id}
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
mode = (req.mode or "PAPER").strip().upper()
|
|
frequency_payload = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency)
|
|
validate_frequency(frequency_payload, mode)
|
|
if mode == "PAPER":
|
|
initial_cash = float(req.initial_cash) if req.initial_cash is not None else 1_000_000.0
|
|
broker_name = "paper"
|
|
elif mode == "LIVE":
|
|
is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id)
|
|
if not is_valid:
|
|
return {
|
|
"status": "broker_auth_required",
|
|
"redirect_url": "/api/broker/login",
|
|
"broker": broker_state.get("broker"),
|
|
}
|
|
initial_cash = None
|
|
broker_name = ((broker_state.get("broker") or "ZERODHA").strip().lower())
|
|
else:
|
|
return {"status": "unsupported_mode"}
|
|
|
|
meta = {
|
|
"sip_amount": req.sip_amount,
|
|
"sip_frequency": frequency_payload,
|
|
}
|
|
if initial_cash is not None:
|
|
meta["initial_cash"] = initial_cash
|
|
|
|
try:
|
|
run_id = create_strategy_run(
|
|
user_id,
|
|
strategy=req.strategy_name,
|
|
mode=mode,
|
|
broker=broker_name,
|
|
meta=meta,
|
|
)
|
|
except errors.UniqueViolation:
|
|
return {"status": "already_running"}
|
|
|
|
with engine_context(user_id, run_id):
|
|
if mode == "PAPER":
|
|
init_paper_state(initial_cash, frequency_payload)
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO paper_broker_account (user_id, run_id, cash)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET cash = EXCLUDED.cash
|
|
""",
|
|
(user_id, run_id, initial_cash),
|
|
)
|
|
PaperBroker(initial_cash=initial_cash)
|
|
else:
|
|
save_state(
|
|
{
|
|
"total_invested": 0.0,
|
|
"nifty_units": 0.0,
|
|
"gold_units": 0.0,
|
|
"last_sip_ts": None,
|
|
"last_run": None,
|
|
},
|
|
mode="LIVE",
|
|
emit_event=True,
|
|
event_meta={"source": "live_start"},
|
|
)
|
|
config = {
|
|
"strategy": req.strategy_name,
|
|
"sip_amount": req.sip_amount,
|
|
"sip_frequency": frequency_payload,
|
|
"mode": mode,
|
|
"broker": broker_name,
|
|
"active": True,
|
|
}
|
|
save_strategy_config(config, user_id, run_id)
|
|
start_new_run(user_id, run_id)
|
|
_write_status(user_id, run_id, "RUNNING")
|
|
if not engine_external:
|
|
def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event=event,
|
|
message=message,
|
|
level=level,
|
|
category=category,
|
|
meta=meta,
|
|
)
|
|
|
|
engine_config = dict(config)
|
|
if initial_cash is not None:
|
|
engine_config["initial_cash"] = initial_cash
|
|
engine_config["run_id"] = run_id
|
|
engine_config["user_id"] = user_id
|
|
engine_config["emit_event"] = emit_event_cb
|
|
start_engine(engine_config)
|
|
|
|
try:
|
|
user = get_user_by_id(user_id)
|
|
if user:
|
|
body = (
|
|
"Your strategy has been started.\n\n"
|
|
f"Strategy: {req.strategy_name}\n"
|
|
f"Mode: {mode}\n"
|
|
f"Run ID: {run_id}\n"
|
|
)
|
|
send_email_async(user["username"], "Strategy started", body)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"status": "started", "run_id": run_id}
|
|
|
|
|
|
def _build_engine_config(user_id: str, run_id: str, req=None):
|
|
cfg = _load_config(user_id, run_id)
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
if not isinstance(sip_frequency, dict) and req is not None:
|
|
sip_frequency = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency)
|
|
if not isinstance(sip_frequency, dict):
|
|
sip_frequency = {"value": cfg.get("frequency_days") or 1, "unit": cfg.get("unit") or "days"}
|
|
|
|
sip_amount = cfg.get("sip_amount")
|
|
if sip_amount is None and req is not None:
|
|
sip_amount = req.sip_amount
|
|
|
|
mode = (cfg.get("mode") or (req.mode if req is not None else "PAPER") or "PAPER").strip().upper()
|
|
broker = cfg.get("broker") or "paper"
|
|
strategy_name = cfg.get("strategy") or cfg.get("strategy_name") or (req.strategy_name if req is not None else None)
|
|
|
|
with engine_context(user_id, run_id):
|
|
state = load_state(mode=mode)
|
|
initial_cash = float(state.get("initial_cash") or 1_000_000.0)
|
|
|
|
def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event=event,
|
|
message=message,
|
|
level=level,
|
|
category=category,
|
|
meta=meta,
|
|
)
|
|
|
|
return {
|
|
"strategy": strategy_name or "Golden Nifty",
|
|
"sip_amount": sip_amount or 0,
|
|
"sip_frequency": sip_frequency,
|
|
"mode": mode,
|
|
"broker": broker,
|
|
"active": cfg.get("active", True),
|
|
"initial_cash": initial_cash,
|
|
"user_id": user_id,
|
|
"run_id": run_id,
|
|
"emit_event": emit_event_cb,
|
|
}
|
|
|
|
|
|
def resume_running_runs():
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
if engine_external:
|
|
return
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT user_id, run_id
|
|
FROM strategy_run
|
|
WHERE status = 'RUNNING'
|
|
ORDER BY created_at DESC
|
|
"""
|
|
)
|
|
runs = cur.fetchall()
|
|
for user_id, run_id in runs:
|
|
engine_config = _build_engine_config(user_id, run_id, None)
|
|
if not engine_config:
|
|
continue
|
|
started = start_engine(engine_config)
|
|
if started:
|
|
_write_status(user_id, run_id, "RUNNING")
|
|
|
|
def stop_strategy(user_id: str):
|
|
run_id = _effective_running_run_id(user_id)
|
|
if not run_id:
|
|
latest_run_id = get_active_run_id(user_id)
|
|
return {"status": "already_stopped", "run_id": latest_run_id}
|
|
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
stop_warning = None
|
|
if not engine_external:
|
|
try:
|
|
stop_engine(user_id, run_id, timeout=15.0)
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] stop_engine failed for {user_id}/{run_id}: {exc}", flush=True)
|
|
stop_warning = str(exc)
|
|
deactivate_strategy_config(user_id, run_id)
|
|
stop_run(user_id, run_id, reason="user_request")
|
|
try:
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] engine status update failed during stop for {user_id}/{run_id}: {exc}", flush=True)
|
|
if not stop_warning:
|
|
stop_warning = str(exc)
|
|
update_run_status(user_id, run_id, "STOPPED", meta={"reason": "user_request"})
|
|
|
|
try:
|
|
user = get_user_by_id(user_id)
|
|
if user:
|
|
body = "Your strategy has been stopped."
|
|
send_email_async(user["username"], "Strategy stopped", body)
|
|
except Exception:
|
|
pass
|
|
|
|
result = {"status": "stopped", "run_id": run_id}
|
|
if stop_warning:
|
|
result["warning"] = stop_warning
|
|
return result
|
|
|
|
|
|
def resume_strategy(user_id: str):
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
if running_run_id:
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
|
|
run_id = get_active_run_id(user_id)
|
|
cfg = _load_config(user_id, run_id)
|
|
strategy_name = (cfg.get("strategy") or "").strip()
|
|
mode = (cfg.get("mode") or "").strip().upper()
|
|
if not strategy_name or mode not in {"LIVE", "PAPER"}:
|
|
return {"status": "no_resumable_run"}
|
|
|
|
if mode == "LIVE":
|
|
is_valid, broker_state, _failure_reason = _validate_live_broker_session(user_id)
|
|
if not is_valid:
|
|
return {
|
|
"status": "broker_auth_required",
|
|
"redirect_url": "/api/broker/login",
|
|
"broker": broker_state.get("broker"),
|
|
}
|
|
|
|
engine_config = None
|
|
if not engine_external:
|
|
try:
|
|
engine_config = _build_engine_config(user_id, run_id, None)
|
|
except Exception as exc:
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": f"Unable to load the saved strategy state: {exc}",
|
|
}
|
|
if not engine_config:
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": "Saved strategy configuration is incomplete.",
|
|
}
|
|
|
|
reactivate_strategy_config(user_id, run_id)
|
|
update_run_status(user_id, run_id, "RUNNING", meta={"reason": "user_resume"})
|
|
_write_status(user_id, run_id, "RUNNING")
|
|
|
|
if not engine_external:
|
|
try:
|
|
started = start_engine(engine_config)
|
|
except Exception as exc:
|
|
deactivate_strategy_config(user_id, run_id)
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
update_run_status(user_id, run_id, "STOPPED", meta={"reason": "resume_start_failed"})
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": f"Unable to resume the strategy engine: {exc}",
|
|
}
|
|
if not started:
|
|
deactivate_strategy_config(user_id, run_id)
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
update_run_status(user_id, run_id, "STOPPED", meta={"reason": "resume_start_failed"})
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": "Strategy engine could not be started.",
|
|
}
|
|
|
|
resume_run(user_id, run_id)
|
|
|
|
try:
|
|
user = get_user_by_id(user_id)
|
|
if user:
|
|
body = (
|
|
"Your strategy has been resumed.\n\n"
|
|
f"Strategy: {strategy_name}\n"
|
|
f"Mode: {mode}\n"
|
|
f"Run ID: {run_id}\n"
|
|
)
|
|
send_email_async(user["username"], "Strategy resumed", body)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"status": "resumed", "run_id": run_id}
|
|
|
|
def get_strategy_status(user_id: str):
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
run_id = running_run_id or get_active_run_id(user_id)
|
|
cfg = _load_config(user_id, run_id) if run_id else {}
|
|
default_status = "RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE")
|
|
engine_row = None
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s",
|
|
(user_id, run_id),
|
|
)
|
|
engine_row = cur.fetchone()
|
|
if not engine_row:
|
|
status = {"status": default_status, "last_updated": None}
|
|
else:
|
|
status = {
|
|
"status": default_status,
|
|
"last_updated": _format_local_ts(engine_row[1]),
|
|
}
|
|
status["run_id"] = run_id
|
|
engine_state = str((engine_row or [None])[0] or "").strip().upper()
|
|
if running_run_id and engine_state in {"STOPPED", "ERROR"}:
|
|
status["status"] = "STOPPED"
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
if not isinstance(sip_frequency, dict):
|
|
frequency = cfg.get("frequency")
|
|
unit = cfg.get("unit")
|
|
if isinstance(frequency, dict):
|
|
unit = frequency.get("unit", unit)
|
|
frequency = frequency.get("value")
|
|
if frequency is None and cfg.get("frequency_days") is not None:
|
|
frequency = cfg.get("frequency_days")
|
|
unit = unit or "days"
|
|
if frequency is not None and unit:
|
|
sip_frequency = {"value": frequency, "unit": unit}
|
|
status["config"] = {
|
|
"strategy": cfg.get("strategy"),
|
|
"sip_amount": cfg.get("sip_amount"),
|
|
"sip_frequency": sip_frequency,
|
|
"mode": cfg.get("mode"),
|
|
"broker": cfg.get("broker"),
|
|
"active": cfg.get("active"),
|
|
}
|
|
if running_run_id:
|
|
mode = (cfg.get("mode") or "LIVE").strip().upper()
|
|
with engine_context(user_id, run_id):
|
|
state = load_state(mode=mode)
|
|
last_execution_ts = _last_execution_ts(state, mode)
|
|
next_eligible = compute_next_eligible(last_execution_ts, sip_frequency)
|
|
status["last_execution_ts"] = last_execution_ts
|
|
status["next_eligible_ts"] = next_eligible
|
|
if next_eligible:
|
|
try:
|
|
parsed_next = datetime.fromisoformat(next_eligible)
|
|
now_cmp = (
|
|
datetime.now(parsed_next.tzinfo)
|
|
if parsed_next.tzinfo
|
|
else market_now().replace(tzinfo=None)
|
|
)
|
|
if parsed_next > now_cmp:
|
|
status["status"] = "WAITING"
|
|
except ValueError:
|
|
pass
|
|
status_key = (status.get("status") or "IDLE").upper()
|
|
resumable = bool(cfg.get("strategy")) and bool(cfg.get("mode"))
|
|
status["can_resume"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
|
status["can_restart"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
|
return status
|
|
|
|
def get_engine_status(user_id: str):
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
run_id = running_run_id or get_active_run_id(user_id)
|
|
status = {
|
|
"state": "STOPPED",
|
|
"run_id": run_id,
|
|
"user_id": user_id,
|
|
"last_heartbeat_ts": None,
|
|
}
|
|
if running_run_id:
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT status, last_updated
|
|
FROM engine_status
|
|
WHERE user_id = %s AND run_id = %s
|
|
ORDER BY last_updated DESC
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
status["state"] = row[0]
|
|
last_updated = row[1]
|
|
if last_updated is not None:
|
|
status["last_heartbeat_ts"] = (
|
|
last_updated.astimezone(timezone.utc)
|
|
.isoformat()
|
|
.replace("+00:00", "Z")
|
|
)
|
|
cfg = _load_config(user_id, run_id)
|
|
mode = (cfg.get("mode") or "LIVE").strip().upper()
|
|
with engine_context(user_id, run_id):
|
|
state = load_state(mode=mode)
|
|
last_execution_ts = _last_execution_ts(state, mode)
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
if isinstance(sip_frequency, dict):
|
|
sip_frequency = {
|
|
"value": sip_frequency.get("value"),
|
|
"unit": sip_frequency.get("unit"),
|
|
}
|
|
else:
|
|
frequency = cfg.get("frequency")
|
|
unit = cfg.get("unit")
|
|
if isinstance(frequency, dict):
|
|
unit = frequency.get("unit", unit)
|
|
frequency = frequency.get("value")
|
|
if frequency is None and cfg.get("frequency_days") is not None:
|
|
frequency = cfg.get("frequency_days")
|
|
unit = unit or "days"
|
|
if frequency is not None and unit:
|
|
sip_frequency = {"value": frequency, "unit": unit}
|
|
status["last_execution_ts"] = last_execution_ts
|
|
status["next_eligible_ts"] = compute_next_eligible(last_execution_ts, sip_frequency)
|
|
status["run_id"] = run_id
|
|
return status
|
|
|
|
|
|
def get_strategy_logs(user_id: str, since_seq: int):
|
|
run_id = get_active_run_id(user_id)
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT seq, ts, level, category, event, message, run_id, meta
|
|
FROM strategy_log
|
|
WHERE user_id = %s AND run_id = %s AND seq > %s
|
|
ORDER BY seq
|
|
""",
|
|
(user_id, run_id, since_seq),
|
|
)
|
|
rows = cur.fetchall()
|
|
events = []
|
|
for row in rows:
|
|
ts = row[1]
|
|
if ts is not None:
|
|
ts_str = ts.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
|
else:
|
|
ts_str = None
|
|
events.append(
|
|
{
|
|
"seq": row[0],
|
|
"ts": ts_str,
|
|
"level": row[2],
|
|
"category": row[3],
|
|
"event": row[4],
|
|
"message": row[5],
|
|
"run_id": row[6],
|
|
"meta": row[7] if isinstance(row[7], dict) else {},
|
|
}
|
|
)
|
|
cur.execute(
|
|
"SELECT COALESCE(MAX(seq), 0) FROM strategy_log WHERE user_id = %s AND run_id = %s",
|
|
(user_id, run_id),
|
|
)
|
|
latest_seq = cur.fetchone()[0]
|
|
return {"events": events, "latest_seq": latest_seq}
|
|
|
|
|
|
def _humanize_reason(reason: str | None):
|
|
if not reason:
|
|
return None
|
|
return reason.replace("_", " ").strip().capitalize()
|
|
|
|
|
|
def _issue_message(event: str, message: str | None, data: dict | None, meta: dict | None):
|
|
payload = data if isinstance(data, dict) else {}
|
|
extra = meta if isinstance(meta, dict) else {}
|
|
reason = payload.get("reason") or extra.get("reason")
|
|
reason_key = str(reason or "").strip().lower()
|
|
|
|
if event == "SIP_NO_FILL":
|
|
if reason_key == "insufficient_funds":
|
|
return "Insufficient funds for this SIP."
|
|
if reason_key == "broker_auth_expired":
|
|
return "Broker session expired. Reconnect broker."
|
|
if reason_key == "no_fill":
|
|
return "Order was not filled."
|
|
return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}."
|
|
|
|
if event == "BROKER_AUTH_EXPIRED":
|
|
return "Broker session expired. Reconnect broker."
|
|
if event == "PRICE_FETCH_ERROR":
|
|
return "Could not fetch prices. Retrying."
|
|
if event == "HISTORY_LOAD_ERROR":
|
|
return "Could not load price history. Retrying."
|
|
if event == "ENGINE_ERROR":
|
|
return message or "Strategy engine hit an error."
|
|
if event == "EXECUTION_BLOCKED":
|
|
if reason_key == "market_closed":
|
|
return "Market is closed. Execution will resume next session."
|
|
return f"Execution blocked: {_humanize_reason(reason) or 'Unknown reason'}."
|
|
if event == "ORDER_REJECTED":
|
|
return message or payload.get("status_message") or "Broker rejected the order."
|
|
if event == "ORDER_CANCELLED":
|
|
return message or "Order was cancelled."
|
|
|
|
return message or _humanize_reason(reason) or "Strategy update available."
|
|
|
|
|
|
def _issue_is_stale_for_current_state(
|
|
user_id: str,
|
|
status: dict,
|
|
event: str,
|
|
data: dict | None,
|
|
meta: dict | None,
|
|
):
|
|
status_key = (status.get("status") or "IDLE").upper()
|
|
cfg = status.get("config") if isinstance(status.get("config"), dict) else {}
|
|
mode = str(cfg.get("mode") or "").strip().upper()
|
|
|
|
payload = data if isinstance(data, dict) else {}
|
|
extra = meta if isinstance(meta, dict) else {}
|
|
reason = payload.get("reason") or extra.get("reason")
|
|
reason_key = str(reason or "").strip().lower()
|
|
|
|
if status_key == "STOPPED" and event in {
|
|
"BROKER_AUTH_EXPIRED",
|
|
"EXECUTION_BLOCKED",
|
|
"SIP_NO_FILL",
|
|
"PRICE_FETCH_ERROR",
|
|
"HISTORY_LOAD_ERROR",
|
|
"ENGINE_ERROR",
|
|
"ORDER_REJECTED",
|
|
"ORDER_CANCELLED",
|
|
}:
|
|
return True
|
|
|
|
if event == "EXECUTION_BLOCKED" and reason_key == "market_closed":
|
|
return is_market_open(market_now())
|
|
|
|
if mode != "LIVE":
|
|
return False
|
|
|
|
auth_related_issue = event == "BROKER_AUTH_EXPIRED" or (
|
|
event == "SIP_NO_FILL" and reason_key == "broker_auth_expired"
|
|
) or (event == "EXECUTION_BLOCKED" and reason_key in {"broker_auth_expired", "auth_expired"})
|
|
if not auth_related_issue:
|
|
return False
|
|
|
|
broker_state = get_user_broker(user_id) or {}
|
|
auth_state = str(broker_state.get("auth_state") or "").strip().upper()
|
|
return auth_state == "VALID"
|
|
|
|
|
|
def get_strategy_summary(user_id: str):
|
|
run_id = get_active_run_id(user_id)
|
|
status = get_strategy_status(user_id)
|
|
next_eligible_ts = status.get("next_eligible_ts")
|
|
|
|
summary = {
|
|
"run_id": run_id,
|
|
"status": status.get("status"),
|
|
"tone": "neutral",
|
|
"message": "No active strategy.",
|
|
"event": None,
|
|
"ts": None,
|
|
}
|
|
|
|
issue_row = None
|
|
if run_id:
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT event, message, data, meta, ts
|
|
FROM engine_event
|
|
WHERE user_id = %s
|
|
AND run_id = %s
|
|
AND event IN (
|
|
'SIP_NO_FILL',
|
|
'BROKER_AUTH_EXPIRED',
|
|
'PRICE_FETCH_ERROR',
|
|
'HISTORY_LOAD_ERROR',
|
|
'ENGINE_ERROR',
|
|
'EXECUTION_BLOCKED',
|
|
'ORDER_REJECTED',
|
|
'ORDER_CANCELLED'
|
|
)
|
|
ORDER BY ts DESC
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
issue_row = cur.fetchone()
|
|
|
|
if issue_row:
|
|
event, message, data, meta, ts = issue_row
|
|
if not _issue_is_stale_for_current_state(user_id, status, event, data, meta):
|
|
summary.update(
|
|
{
|
|
"tone": "error" if event in {"ENGINE_ERROR", "ORDER_REJECTED"} else "warning",
|
|
"message": _issue_message(event, message, data, meta),
|
|
"event": event,
|
|
"ts": _format_local_ts(ts),
|
|
}
|
|
)
|
|
return summary
|
|
|
|
status_key = (status.get("status") or "IDLE").upper()
|
|
if status_key == "WAITING" and next_eligible_ts:
|
|
summary.update(
|
|
{
|
|
"tone": "warning",
|
|
"message": f"Waiting until {next_eligible_ts}.",
|
|
}
|
|
)
|
|
return summary
|
|
if status_key == "RUNNING":
|
|
summary.update(
|
|
{
|
|
"tone": "success",
|
|
"message": "Strategy is running.",
|
|
}
|
|
)
|
|
return summary
|
|
if status_key == "STOPPED":
|
|
summary.update(
|
|
{
|
|
"tone": "neutral",
|
|
"message": "Strategy is stopped.",
|
|
}
|
|
)
|
|
return summary
|
|
return summary
|
|
|
|
def get_market_status():
|
|
now = market_now()
|
|
return {
|
|
"status": "OPEN" if is_market_open(now) else "CLOSED",
|
|
"checked_at": now.isoformat(),
|
|
}
|