1296 lines
46 KiB
Python
1296 lines
46 KiB
Python
import json
|
|
import os
|
|
import sys
|
|
import threading
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
ENGINE_ROOT = Path(__file__).resolve().parents[3]
|
|
if str(ENGINE_ROOT) not in sys.path:
|
|
sys.path.append(str(ENGINE_ROOT))
|
|
|
|
from indian_paper_trading_strategy.engine.market import (
|
|
align_to_market_open,
|
|
market_now,
|
|
market_session,
|
|
next_market_open_after,
|
|
)
|
|
from indian_paper_trading_strategy.engine.market_calendar import UnsupportedCalendarYearError
|
|
from indian_paper_trading_strategy.engine.runner import RunLeaseNotAcquiredError, start_engine, stop_engine
|
|
from indian_paper_trading_strategy.engine.state import init_paper_state, load_state, save_state
|
|
from indian_paper_trading_strategy.engine.broker import PaperBroker
|
|
from indian_paper_trading_strategy.engine.time_utils import (
|
|
UTC,
|
|
frequency_to_timedelta,
|
|
parse_market_timestamp,
|
|
parse_persisted_timestamp,
|
|
serialize_timestamp,
|
|
)
|
|
from indian_paper_trading_strategy.engine.db import engine_context
|
|
|
|
from app.broker_store import get_user_broker, set_broker_auth_state
|
|
from app.services.db import db_connection
|
|
from app.services.run_service import (
|
|
create_strategy_run,
|
|
get_active_run_id,
|
|
get_running_run_id,
|
|
update_run_status,
|
|
)
|
|
from app.services.auth_service import get_user_by_id
|
|
from app.services.email_service import send_email_async
|
|
from app.services.groww_service import GrowwApiError, GrowwTokenError, fetch_funds as fetch_groww_funds
|
|
from app.services.groww_storage import get_session as get_groww_session
|
|
from app.services.zerodha_service import (
|
|
KiteTokenError,
|
|
fetch_funds as fetch_zerodha_funds,
|
|
)
|
|
from app.services.zerodha_storage import get_session as get_zerodha_session
|
|
from psycopg2.extras import Json
|
|
from psycopg2 import errors
|
|
|
|
SEQ_LOCK = threading.Lock()
|
|
SEQ = 0
|
|
LAST_WAIT_LOG_TS = {}
|
|
WAIT_LOG_INTERVAL = timedelta(seconds=60)
|
|
|
|
def init_log_state():
|
|
global SEQ
|
|
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT COALESCE(MAX(seq), 0) FROM strategy_log")
|
|
row = cur.fetchone()
|
|
SEQ = row[0] if row and row[0] is not None else 0
|
|
|
|
def start_new_run(user_id: str, run_id: str):
|
|
LAST_WAIT_LOG_TS.pop(run_id, None)
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event="STRATEGY_STARTED",
|
|
message="Strategy started",
|
|
meta={},
|
|
)
|
|
|
|
|
|
def stop_run(user_id: str, run_id: str, reason="user_request"):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event="STRATEGY_STOPPED",
|
|
message="Strategy stopped",
|
|
meta={"reason": reason},
|
|
)
|
|
|
|
|
|
def resume_run(user_id: str, run_id: str):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event="STRATEGY_RESUMED",
|
|
message="Strategy resumed",
|
|
meta={},
|
|
)
|
|
|
|
|
|
def emit_event(
|
|
*,
|
|
user_id: str,
|
|
run_id: str,
|
|
event: str,
|
|
message: str,
|
|
level: str = "INFO",
|
|
category: str = "ENGINE",
|
|
meta: dict | None = None
|
|
):
|
|
global SEQ, LAST_WAIT_LOG_TS
|
|
if not user_id or not run_id:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if event == "SIP_WAITING":
|
|
last_ts = LAST_WAIT_LOG_TS.get(run_id)
|
|
if last_ts and (now - last_ts) < WAIT_LOG_INTERVAL:
|
|
return
|
|
LAST_WAIT_LOG_TS[run_id] = now
|
|
|
|
with SEQ_LOCK:
|
|
SEQ += 1
|
|
seq = SEQ
|
|
|
|
evt = {
|
|
"seq": seq,
|
|
"ts": serialize_timestamp(now),
|
|
"level": level,
|
|
"category": category,
|
|
"event": event,
|
|
"message": message,
|
|
"run_id": run_id,
|
|
"meta": meta or {}
|
|
}
|
|
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO strategy_log (
|
|
seq, ts, level, category, event, message, user_id, run_id, meta
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (seq) DO NOTHING
|
|
""",
|
|
(
|
|
evt["seq"],
|
|
now,
|
|
evt["level"],
|
|
evt["category"],
|
|
evt["event"],
|
|
evt["message"],
|
|
user_id,
|
|
evt["run_id"],
|
|
Json(evt["meta"]),
|
|
),
|
|
)
|
|
|
|
def _maybe_parse_json(value):
|
|
if value is None:
|
|
return None
|
|
if not isinstance(value, str):
|
|
return value
|
|
text = value.strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
return json.loads(text)
|
|
except Exception:
|
|
return value
|
|
|
|
|
|
def _utc_now():
|
|
return datetime.now(UTC)
|
|
|
|
|
|
def _load_config(user_id: str, run_id: str):
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit,
|
|
mode, broker, active, frequency, frequency_days, unit, next_run
|
|
FROM strategy_config
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return {}
|
|
cfg = {
|
|
"strategy": row[0],
|
|
"sip_amount": float(row[1]) if row[1] is not None else None,
|
|
"mode": row[4],
|
|
"broker": row[5],
|
|
"active": row[6],
|
|
"frequency": _maybe_parse_json(row[7]),
|
|
"frequency_days": row[8],
|
|
"unit": row[9],
|
|
"next_run": serialize_timestamp(row[10]),
|
|
}
|
|
if row[2] is not None or row[3] is not None:
|
|
cfg["sip_frequency"] = {
|
|
"value": row[2],
|
|
"unit": row[3],
|
|
}
|
|
return cfg
|
|
|
|
|
|
def _save_config(cfg, user_id: str, run_id: str):
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
sip_value = None
|
|
sip_unit = None
|
|
if isinstance(sip_frequency, dict):
|
|
sip_value = sip_frequency.get("value")
|
|
sip_unit = sip_frequency.get("unit")
|
|
|
|
frequency = cfg.get("frequency")
|
|
if not isinstance(frequency, str) and frequency is not None:
|
|
frequency = json.dumps(frequency)
|
|
|
|
next_run = cfg.get("next_run")
|
|
next_run_dt = None
|
|
if isinstance(next_run, str):
|
|
next_run_dt = parse_persisted_timestamp(next_run)
|
|
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO strategy_config (
|
|
user_id,
|
|
run_id,
|
|
strategy,
|
|
sip_amount,
|
|
sip_frequency_value,
|
|
sip_frequency_unit,
|
|
mode,
|
|
broker,
|
|
active,
|
|
frequency,
|
|
frequency_days,
|
|
unit,
|
|
next_run
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET strategy = EXCLUDED.strategy,
|
|
sip_amount = EXCLUDED.sip_amount,
|
|
sip_frequency_value = EXCLUDED.sip_frequency_value,
|
|
sip_frequency_unit = EXCLUDED.sip_frequency_unit,
|
|
mode = EXCLUDED.mode,
|
|
broker = EXCLUDED.broker,
|
|
active = EXCLUDED.active,
|
|
frequency = EXCLUDED.frequency,
|
|
frequency_days = EXCLUDED.frequency_days,
|
|
unit = EXCLUDED.unit,
|
|
next_run = EXCLUDED.next_run
|
|
""",
|
|
(
|
|
user_id,
|
|
run_id,
|
|
cfg.get("strategy"),
|
|
cfg.get("sip_amount"),
|
|
sip_value,
|
|
sip_unit,
|
|
cfg.get("mode"),
|
|
cfg.get("broker"),
|
|
cfg.get("active"),
|
|
frequency,
|
|
cfg.get("frequency_days"),
|
|
cfg.get("unit"),
|
|
next_run_dt,
|
|
),
|
|
)
|
|
|
|
def save_strategy_config(cfg, user_id: str, run_id: str):
|
|
_save_config(cfg, user_id, run_id)
|
|
|
|
def deactivate_strategy_config(user_id: str, run_id: str):
|
|
cfg = _load_config(user_id, run_id)
|
|
cfg["active"] = False
|
|
_save_config(cfg, user_id, run_id)
|
|
|
|
|
|
def reactivate_strategy_config(user_id: str, run_id: str):
|
|
cfg = _load_config(user_id, run_id)
|
|
if not cfg:
|
|
return {}
|
|
cfg["active"] = True
|
|
_save_config(cfg, user_id, run_id)
|
|
return cfg
|
|
|
|
def _write_status(user_id: str, run_id: str, status):
|
|
now_local = _utc_now()
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO engine_status (user_id, run_id, status, last_updated)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET status = EXCLUDED.status,
|
|
last_updated = EXCLUDED.last_updated
|
|
""",
|
|
(user_id, run_id, status, now_local),
|
|
)
|
|
|
|
|
|
def _get_engine_status_row(user_id: str, run_id: str):
|
|
if not user_id or not run_id:
|
|
return None
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT status, last_updated
|
|
FROM engine_status
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return {"status": row[0], "last_updated": row[1]}
|
|
|
|
|
|
def _effective_running_run_id(user_id: str):
|
|
run_id = get_running_run_id(user_id)
|
|
if not run_id:
|
|
return None
|
|
engine_row = _get_engine_status_row(user_id, run_id)
|
|
engine_state = str((engine_row or {}).get("status") or "").strip().upper()
|
|
if engine_state not in {"STOPPED", "ERROR", "PAUSED_AUTH_EXPIRED"}:
|
|
return run_id
|
|
if engine_state == "PAUSED_AUTH_EXPIRED":
|
|
update_run_status(
|
|
user_id,
|
|
run_id,
|
|
"PAUSED_AUTH_EXPIRED",
|
|
meta={"reason": "broker_auth_expired", "engine_state": engine_state},
|
|
)
|
|
return None
|
|
update_run_status(
|
|
user_id,
|
|
run_id,
|
|
"STOPPED",
|
|
meta={"reason": "engine_inactive", "engine_state": engine_state},
|
|
)
|
|
return None
|
|
|
|
|
|
def _set_run_status_or_raise(user_id: str, run_id: str, status: str, meta: dict | None = None):
|
|
updated = update_run_status(user_id, run_id, status, meta=meta)
|
|
if not updated:
|
|
raise RuntimeError(f"Run {run_id} for user {user_id} no longer exists")
|
|
|
|
|
|
def _get_run_row(user_id: str, run_id: str | None):
|
|
if not user_id or not run_id:
|
|
return None
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT status, meta, started_at, stopped_at
|
|
FROM strategy_run
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"status": row[0],
|
|
"meta": row[1] if isinstance(row[1], dict) else {},
|
|
"started_at": row[2],
|
|
"stopped_at": row[3],
|
|
}
|
|
|
|
|
|
def _broker_block_state(user_id: str, cfg: dict | None):
|
|
config = cfg or {}
|
|
mode = str(config.get("mode") or "").strip().upper()
|
|
if mode != "LIVE":
|
|
return {"blocked": False, "reason": None, "broker_state": None}
|
|
|
|
broker_state = get_user_broker(user_id) or {}
|
|
auth_state = str(broker_state.get("auth_state") or "").strip().upper()
|
|
connected = bool(broker_state.get("connected"))
|
|
broker_name = str(broker_state.get("broker") or "").strip().upper() or None
|
|
|
|
if not connected or auth_state == "DISCONNECTED":
|
|
return {
|
|
"blocked": True,
|
|
"reason": "broker_disconnected",
|
|
"broker_state": "DISCONNECTED",
|
|
"broker": broker_name,
|
|
}
|
|
if auth_state in {"EXPIRED", "PENDING"}:
|
|
return {
|
|
"blocked": True,
|
|
"reason": "broker_auth_required",
|
|
"broker_state": auth_state or "EXPIRED",
|
|
"broker": broker_name,
|
|
}
|
|
return {
|
|
"blocked": False,
|
|
"reason": None,
|
|
"broker_state": auth_state or "VALID",
|
|
"broker": broker_name,
|
|
}
|
|
|
|
|
|
def block_live_strategy_for_broker_disconnect(user_id: str, *, reason: str = "broker_disconnected"):
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
run_id = running_run_id or get_active_run_id(user_id)
|
|
if not run_id:
|
|
return None
|
|
cfg = _load_config(user_id, run_id)
|
|
if str(cfg.get("mode") or "").strip().upper() != "LIVE":
|
|
return None
|
|
|
|
stop_warning = None
|
|
try:
|
|
stop_engine(user_id, run_id, timeout=10.0)
|
|
except Exception as exc:
|
|
stop_warning = str(exc)
|
|
print(f"[STRATEGY] stop_engine failed during broker disconnect for {user_id}/{run_id}: {exc}", flush=True)
|
|
|
|
deactivate_strategy_config(user_id, run_id)
|
|
stop_run(user_id, run_id, reason=reason)
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
_set_run_status_or_raise(
|
|
user_id,
|
|
run_id,
|
|
"STOPPED",
|
|
meta={"reason": reason, "broker_blocked": True},
|
|
)
|
|
result = {"run_id": run_id, "status": "STOPPED", "reason": reason}
|
|
if stop_warning:
|
|
result["warning"] = stop_warning
|
|
return result
|
|
|
|
def validate_frequency(freq: dict, mode: str):
|
|
if not isinstance(freq, dict):
|
|
raise ValueError("Frequency payload is required")
|
|
value = int(freq.get("value", 0))
|
|
unit = freq.get("unit")
|
|
|
|
if unit not in {"minutes", "days"}:
|
|
raise ValueError(f"Unsupported frequency unit: {unit}")
|
|
|
|
if unit == "minutes":
|
|
if value < 1:
|
|
raise ValueError("Minimum frequency is 1 minute")
|
|
|
|
if unit == "days" and value < 1:
|
|
raise ValueError("Minimum frequency is 1 day")
|
|
|
|
|
|
def _validate_live_broker_session(user_id: str):
|
|
broker_state = get_user_broker(user_id) or {}
|
|
broker_name = (broker_state.get("broker") or "").strip().upper()
|
|
if not broker_state.get("connected") or broker_name not in {"ZERODHA", "GROWW"}:
|
|
return False, broker_state, "broker_not_connected"
|
|
|
|
if broker_name == "ZERODHA":
|
|
try:
|
|
session = get_zerodha_session(user_id)
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to load Zerodha session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
if not session:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
api_key = str(session.get("api_key") or "").strip()
|
|
access_token = str(session.get("access_token") or "").strip()
|
|
if not api_key or not access_token:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
try:
|
|
fetch_zerodha_funds(api_key, access_token)
|
|
except KiteTokenError:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to validate Zerodha session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
set_broker_auth_state(user_id, "VALID")
|
|
return True, broker_state, "ok"
|
|
|
|
try:
|
|
session = get_groww_session(user_id)
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to load Groww session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
if not session:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
access_token = str(session.get("access_token") or "").strip()
|
|
if not access_token:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
try:
|
|
fetch_groww_funds(access_token)
|
|
except GrowwTokenError:
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
except GrowwApiError as exc:
|
|
print(f"[STRATEGY] failed to validate Groww session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] failed to validate Groww session for {user_id}: {exc}", flush=True)
|
|
set_broker_auth_state(user_id, "EXPIRED")
|
|
return False, broker_state, "broker_auth_required"
|
|
|
|
set_broker_auth_state(user_id, "VALID")
|
|
return True, broker_state, "ok"
|
|
|
|
def compute_next_eligible(last_run: str | None, sip_frequency: dict | None):
|
|
if not last_run or not sip_frequency:
|
|
return None
|
|
last_dt = parse_market_timestamp(last_run)
|
|
if last_dt is None:
|
|
return None
|
|
try:
|
|
delta = frequency_to_timedelta(sip_frequency)
|
|
except ValueError:
|
|
return None
|
|
next_dt = last_dt + delta
|
|
next_dt = align_to_market_open(next_dt)
|
|
return serialize_timestamp(next_dt)
|
|
|
|
|
|
def _last_execution_ts(state: dict, mode: str) -> str | None:
|
|
mode_key = (mode or "LIVE").strip().upper()
|
|
if mode_key == "LIVE":
|
|
return state.get("last_sip_ts")
|
|
return state.get("last_run") or state.get("last_sip_ts")
|
|
|
|
def start_strategy(req, user_id: str):
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
if running_run_id:
|
|
running_cfg = _load_config(user_id, running_run_id)
|
|
running_mode = (running_cfg.get("mode") or req.mode or "PAPER").strip().upper()
|
|
if running_mode == "LIVE":
|
|
is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id)
|
|
if not is_valid:
|
|
return {
|
|
"status": "broker_auth_required",
|
|
"redirect_url": "/api/broker/login",
|
|
"broker": broker_state.get("broker"),
|
|
}
|
|
if engine_external:
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
engine_config = _build_engine_config(user_id, running_run_id, req)
|
|
if engine_config:
|
|
try:
|
|
started = start_engine(engine_config)
|
|
except RunLeaseNotAcquiredError:
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
if started:
|
|
_write_status(user_id, running_run_id, "RUNNING")
|
|
return {"status": "restarted", "run_id": running_run_id}
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
mode = (req.mode or "PAPER").strip().upper()
|
|
frequency_payload = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency)
|
|
validate_frequency(frequency_payload, mode)
|
|
if mode == "PAPER":
|
|
initial_cash = float(req.initial_cash) if req.initial_cash is not None else 1_000_000.0
|
|
broker_name = "paper"
|
|
elif mode == "LIVE":
|
|
is_valid, broker_state, failure_reason = _validate_live_broker_session(user_id)
|
|
if not is_valid:
|
|
return {
|
|
"status": "broker_auth_required",
|
|
"redirect_url": "/api/broker/login",
|
|
"broker": broker_state.get("broker"),
|
|
}
|
|
initial_cash = None
|
|
broker_name = ((broker_state.get("broker") or "ZERODHA").strip().lower())
|
|
else:
|
|
return {"status": "unsupported_mode"}
|
|
|
|
meta = {
|
|
"sip_amount": req.sip_amount,
|
|
"sip_frequency": frequency_payload,
|
|
}
|
|
if initial_cash is not None:
|
|
meta["initial_cash"] = initial_cash
|
|
|
|
try:
|
|
run_id = create_strategy_run(
|
|
user_id,
|
|
strategy=req.strategy_name,
|
|
mode=mode,
|
|
broker=broker_name,
|
|
meta=meta,
|
|
)
|
|
except errors.UniqueViolation:
|
|
return {"status": "already_running"}
|
|
|
|
with engine_context(user_id, run_id):
|
|
if mode == "PAPER":
|
|
init_paper_state(initial_cash, frequency_payload)
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO paper_broker_account (user_id, run_id, cash)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (user_id, run_id) DO UPDATE
|
|
SET cash = EXCLUDED.cash
|
|
""",
|
|
(user_id, run_id, initial_cash),
|
|
)
|
|
PaperBroker(initial_cash=initial_cash)
|
|
else:
|
|
save_state(
|
|
{
|
|
"total_invested": 0.0,
|
|
"nifty_units": 0.0,
|
|
"gold_units": 0.0,
|
|
"last_sip_ts": None,
|
|
"last_run": None,
|
|
},
|
|
mode="LIVE",
|
|
emit_event=True,
|
|
event_meta={"source": "live_start"},
|
|
)
|
|
config = {
|
|
"strategy": req.strategy_name,
|
|
"sip_amount": req.sip_amount,
|
|
"sip_frequency": frequency_payload,
|
|
"mode": mode,
|
|
"broker": broker_name,
|
|
"active": True,
|
|
}
|
|
save_strategy_config(config, user_id, run_id)
|
|
start_new_run(user_id, run_id)
|
|
_write_status(user_id, run_id, "RUNNING")
|
|
if not engine_external:
|
|
def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event=event,
|
|
message=message,
|
|
level=level,
|
|
category=category,
|
|
meta=meta,
|
|
)
|
|
|
|
engine_config = dict(config)
|
|
if initial_cash is not None:
|
|
engine_config["initial_cash"] = initial_cash
|
|
engine_config["run_id"] = run_id
|
|
engine_config["user_id"] = user_id
|
|
engine_config["emit_event"] = emit_event_cb
|
|
try:
|
|
start_engine(engine_config)
|
|
except RunLeaseNotAcquiredError:
|
|
pass
|
|
|
|
try:
|
|
user = get_user_by_id(user_id)
|
|
if user:
|
|
body = (
|
|
"Your strategy has been started.\n\n"
|
|
f"Strategy: {req.strategy_name}\n"
|
|
f"Mode: {mode}\n"
|
|
f"Run ID: {run_id}\n"
|
|
)
|
|
send_email_async(user["username"], "Strategy started", body)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"status": "started", "run_id": run_id}
|
|
|
|
|
|
def _build_engine_config(user_id: str, run_id: str, req=None):
|
|
cfg = _load_config(user_id, run_id)
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
if not isinstance(sip_frequency, dict) and req is not None:
|
|
sip_frequency = req.sip_frequency.dict() if hasattr(req.sip_frequency, "dict") else dict(req.sip_frequency)
|
|
if not isinstance(sip_frequency, dict):
|
|
sip_frequency = {"value": cfg.get("frequency_days") or 1, "unit": cfg.get("unit") or "days"}
|
|
|
|
sip_amount = cfg.get("sip_amount")
|
|
if sip_amount is None and req is not None:
|
|
sip_amount = req.sip_amount
|
|
|
|
mode = (cfg.get("mode") or (req.mode if req is not None else "PAPER") or "PAPER").strip().upper()
|
|
broker = cfg.get("broker") or "paper"
|
|
strategy_name = cfg.get("strategy") or cfg.get("strategy_name") or (req.strategy_name if req is not None else None)
|
|
|
|
with engine_context(user_id, run_id):
|
|
state = load_state(mode=mode)
|
|
initial_cash = float(state.get("initial_cash") or 1_000_000.0)
|
|
|
|
def emit_event_cb(*, event: str, message: str, level: str = "INFO", category: str = "ENGINE", meta: dict | None = None):
|
|
emit_event(
|
|
user_id=user_id,
|
|
run_id=run_id,
|
|
event=event,
|
|
message=message,
|
|
level=level,
|
|
category=category,
|
|
meta=meta,
|
|
)
|
|
|
|
return {
|
|
"strategy": strategy_name or "Golden Nifty",
|
|
"sip_amount": sip_amount or 0,
|
|
"sip_frequency": sip_frequency,
|
|
"mode": mode,
|
|
"broker": broker,
|
|
"active": cfg.get("active", True),
|
|
"initial_cash": initial_cash,
|
|
"user_id": user_id,
|
|
"run_id": run_id,
|
|
"emit_event": emit_event_cb,
|
|
}
|
|
|
|
|
|
def resume_running_runs():
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
if engine_external:
|
|
return
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT user_id, run_id
|
|
FROM strategy_run
|
|
WHERE status = 'RUNNING'
|
|
ORDER BY created_at DESC
|
|
"""
|
|
)
|
|
runs = cur.fetchall()
|
|
for user_id, run_id in runs:
|
|
engine_config = _build_engine_config(user_id, run_id, None)
|
|
if not engine_config:
|
|
continue
|
|
try:
|
|
started = start_engine(engine_config)
|
|
except RunLeaseNotAcquiredError:
|
|
started = False
|
|
if started:
|
|
_write_status(user_id, run_id, "RUNNING")
|
|
|
|
def stop_strategy(user_id: str):
|
|
run_id = _effective_running_run_id(user_id)
|
|
if not run_id:
|
|
latest_run_id = get_running_run_id(user_id) or get_active_run_id(user_id)
|
|
return {"status": "already_stopped", "run_id": latest_run_id}
|
|
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
stop_warning = None
|
|
if not engine_external:
|
|
try:
|
|
stop_engine(user_id, run_id, timeout=15.0)
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] stop_engine failed for {user_id}/{run_id}: {exc}", flush=True)
|
|
stop_warning = str(exc)
|
|
deactivate_strategy_config(user_id, run_id)
|
|
stop_run(user_id, run_id, reason="user_request")
|
|
try:
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
except Exception as exc:
|
|
print(f"[STRATEGY] engine status update failed during stop for {user_id}/{run_id}: {exc}", flush=True)
|
|
if not stop_warning:
|
|
stop_warning = str(exc)
|
|
try:
|
|
_set_run_status_or_raise(user_id, run_id, "STOPPED", meta={"reason": "user_request"})
|
|
except RuntimeError as exc:
|
|
return {
|
|
"status": "stop_failed",
|
|
"run_id": run_id,
|
|
"message": str(exc),
|
|
}
|
|
|
|
try:
|
|
user = get_user_by_id(user_id)
|
|
if user:
|
|
body = "Your strategy has been stopped."
|
|
send_email_async(user["username"], "Strategy stopped", body)
|
|
except Exception:
|
|
pass
|
|
|
|
result = {"status": "stopped", "run_id": run_id}
|
|
if stop_warning:
|
|
result["warning"] = stop_warning
|
|
return result
|
|
|
|
|
|
def resume_strategy(user_id: str):
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
if running_run_id:
|
|
return {"status": "already_running", "run_id": running_run_id}
|
|
|
|
run_id = get_active_run_id(user_id)
|
|
if not run_id:
|
|
return {"status": "no_resumable_run"}
|
|
cfg = _load_config(user_id, run_id)
|
|
strategy_name = (cfg.get("strategy") or "").strip()
|
|
mode = (cfg.get("mode") or "").strip().upper()
|
|
if not strategy_name or mode not in {"LIVE", "PAPER"}:
|
|
return {"status": "no_resumable_run"}
|
|
|
|
if mode == "LIVE":
|
|
is_valid, broker_state, _failure_reason = _validate_live_broker_session(user_id)
|
|
if not is_valid:
|
|
return {
|
|
"status": "broker_auth_required",
|
|
"redirect_url": "/api/broker/login",
|
|
"broker": broker_state.get("broker"),
|
|
}
|
|
|
|
engine_config = None
|
|
if not engine_external:
|
|
try:
|
|
engine_config = _build_engine_config(user_id, run_id, None)
|
|
except Exception as exc:
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": f"Unable to load the saved strategy state: {exc}",
|
|
}
|
|
if not engine_config:
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": "Saved strategy configuration is incomplete.",
|
|
}
|
|
|
|
reactivate_strategy_config(user_id, run_id)
|
|
try:
|
|
_set_run_status_or_raise(user_id, run_id, "RUNNING", meta={"reason": "user_resume"})
|
|
except RuntimeError as exc:
|
|
deactivate_strategy_config(user_id, run_id)
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": str(exc),
|
|
}
|
|
_write_status(user_id, run_id, "RUNNING")
|
|
|
|
if not engine_external:
|
|
try:
|
|
started = start_engine(engine_config)
|
|
except RunLeaseNotAcquiredError:
|
|
return {"status": "already_running", "run_id": run_id}
|
|
except Exception as exc:
|
|
deactivate_strategy_config(user_id, run_id)
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
_set_run_status_or_raise(user_id, run_id, "STOPPED", meta={"reason": "resume_start_failed"})
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": f"Unable to resume the strategy engine: {exc}",
|
|
}
|
|
if not started:
|
|
deactivate_strategy_config(user_id, run_id)
|
|
_write_status(user_id, run_id, "STOPPED")
|
|
_set_run_status_or_raise(user_id, run_id, "STOPPED", meta={"reason": "resume_start_failed"})
|
|
return {
|
|
"status": "resume_failed",
|
|
"run_id": run_id,
|
|
"message": "Strategy engine could not be started.",
|
|
}
|
|
|
|
resume_run(user_id, run_id)
|
|
|
|
try:
|
|
user = get_user_by_id(user_id)
|
|
if user:
|
|
body = (
|
|
"Your strategy has been resumed.\n\n"
|
|
f"Strategy: {strategy_name}\n"
|
|
f"Mode: {mode}\n"
|
|
f"Run ID: {run_id}\n"
|
|
)
|
|
send_email_async(user["username"], "Strategy resumed", body)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"status": "resumed", "run_id": run_id}
|
|
|
|
def get_strategy_status(user_id: str):
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
run_id = running_run_id or get_active_run_id(user_id)
|
|
cfg = _load_config(user_id, run_id) if run_id else {}
|
|
run_row = _get_run_row(user_id, run_id)
|
|
default_status = (run_row or {}).get("status") or ("RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE"))
|
|
engine_row = None
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT status, last_updated FROM engine_status WHERE user_id = %s AND run_id = %s",
|
|
(user_id, run_id),
|
|
)
|
|
engine_row = cur.fetchone()
|
|
if not engine_row:
|
|
status = {"status": default_status, "last_updated": None}
|
|
else:
|
|
status = {
|
|
"status": default_status,
|
|
"last_updated": serialize_timestamp(engine_row[1]),
|
|
}
|
|
status["run_id"] = run_id
|
|
engine_state = str((engine_row or [None])[0] or "").strip().upper()
|
|
if running_run_id and engine_state in {"STOPPED", "ERROR"}:
|
|
status["status"] = "STOPPED"
|
|
elif engine_state == "PAUSED_AUTH_EXPIRED":
|
|
status["status"] = "PAUSED_AUTH_EXPIRED"
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
if not isinstance(sip_frequency, dict):
|
|
frequency = cfg.get("frequency")
|
|
unit = cfg.get("unit")
|
|
if isinstance(frequency, dict):
|
|
unit = frequency.get("unit", unit)
|
|
frequency = frequency.get("value")
|
|
if frequency is None and cfg.get("frequency_days") is not None:
|
|
frequency = cfg.get("frequency_days")
|
|
unit = unit or "days"
|
|
if frequency is not None and unit:
|
|
sip_frequency = {"value": frequency, "unit": unit}
|
|
status["config"] = {
|
|
"strategy": cfg.get("strategy"),
|
|
"sip_amount": cfg.get("sip_amount"),
|
|
"sip_frequency": sip_frequency,
|
|
"mode": cfg.get("mode"),
|
|
"broker": cfg.get("broker"),
|
|
"active": cfg.get("active"),
|
|
}
|
|
if running_run_id:
|
|
mode = (cfg.get("mode") or "LIVE").strip().upper()
|
|
with engine_context(user_id, run_id):
|
|
state = load_state(mode=mode)
|
|
last_execution_ts = _last_execution_ts(state, mode)
|
|
next_eligible = compute_next_eligible(last_execution_ts, sip_frequency)
|
|
status["last_execution_ts"] = last_execution_ts
|
|
status["next_eligible_ts"] = next_eligible
|
|
if next_eligible:
|
|
parsed_next = parse_persisted_timestamp(next_eligible)
|
|
if parsed_next and parsed_next > _utc_now():
|
|
status["status"] = "WAITING"
|
|
broker_block = _broker_block_state(user_id, cfg)
|
|
status["broker_state"] = broker_block.get("broker_state")
|
|
status["strategy_blocked"] = bool(broker_block.get("blocked"))
|
|
status["strategy_block_reason"] = broker_block.get("reason")
|
|
status["broker"] = broker_block.get("broker") or cfg.get("broker")
|
|
status_key = (status.get("status") or "IDLE").upper()
|
|
resumable = bool(cfg.get("strategy")) and bool(cfg.get("mode"))
|
|
status["can_resume"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
|
status["can_restart"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
|
return status
|
|
|
|
def get_engine_status(user_id: str):
|
|
running_run_id = _effective_running_run_id(user_id)
|
|
run_id = running_run_id or get_active_run_id(user_id)
|
|
status = {
|
|
"state": "STOPPED",
|
|
"run_id": run_id,
|
|
"user_id": user_id,
|
|
"last_heartbeat_ts": None,
|
|
}
|
|
if running_run_id:
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT status, last_updated
|
|
FROM engine_status
|
|
WHERE user_id = %s AND run_id = %s
|
|
ORDER BY last_updated DESC
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
status["state"] = row[0]
|
|
last_updated = row[1]
|
|
if last_updated is not None:
|
|
status["last_heartbeat_ts"] = serialize_timestamp(last_updated)
|
|
cfg = _load_config(user_id, run_id)
|
|
mode = (cfg.get("mode") or "LIVE").strip().upper()
|
|
with engine_context(user_id, run_id):
|
|
state = load_state(mode=mode)
|
|
last_execution_ts = _last_execution_ts(state, mode)
|
|
sip_frequency = cfg.get("sip_frequency")
|
|
if isinstance(sip_frequency, dict):
|
|
sip_frequency = {
|
|
"value": sip_frequency.get("value"),
|
|
"unit": sip_frequency.get("unit"),
|
|
}
|
|
else:
|
|
frequency = cfg.get("frequency")
|
|
unit = cfg.get("unit")
|
|
if isinstance(frequency, dict):
|
|
unit = frequency.get("unit", unit)
|
|
frequency = frequency.get("value")
|
|
if frequency is None and cfg.get("frequency_days") is not None:
|
|
frequency = cfg.get("frequency_days")
|
|
unit = unit or "days"
|
|
if frequency is not None and unit:
|
|
sip_frequency = {"value": frequency, "unit": unit}
|
|
status["last_execution_ts"] = last_execution_ts
|
|
status["next_eligible_ts"] = compute_next_eligible(last_execution_ts, sip_frequency)
|
|
status["run_id"] = run_id
|
|
return status
|
|
|
|
|
|
def get_strategy_logs(user_id: str, since_seq: int):
|
|
run_id = get_active_run_id(user_id)
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT seq, ts, level, category, event, message, run_id, meta
|
|
FROM strategy_log
|
|
WHERE user_id = %s AND run_id = %s AND seq > %s
|
|
ORDER BY seq
|
|
""",
|
|
(user_id, run_id, since_seq),
|
|
)
|
|
rows = cur.fetchall()
|
|
events = []
|
|
for row in rows:
|
|
ts = row[1]
|
|
ts_str = serialize_timestamp(ts)
|
|
events.append(
|
|
{
|
|
"seq": row[0],
|
|
"ts": ts_str,
|
|
"level": row[2],
|
|
"category": row[3],
|
|
"event": row[4],
|
|
"message": row[5],
|
|
"run_id": row[6],
|
|
"meta": row[7] if isinstance(row[7], dict) else {},
|
|
}
|
|
)
|
|
cur.execute(
|
|
"SELECT COALESCE(MAX(seq), 0) FROM strategy_log WHERE user_id = %s AND run_id = %s",
|
|
(user_id, run_id),
|
|
)
|
|
latest_seq = cur.fetchone()[0]
|
|
return {"events": events, "latest_seq": latest_seq}
|
|
|
|
|
|
def _humanize_reason(reason: str | None):
|
|
if not reason:
|
|
return None
|
|
return reason.replace("_", " ").strip().capitalize()
|
|
|
|
|
|
def _issue_message(event: str, message: str | None, data: dict | None, meta: dict | None):
|
|
payload = data if isinstance(data, dict) else {}
|
|
extra = meta if isinstance(meta, dict) else {}
|
|
reason = payload.get("reason") or extra.get("reason")
|
|
reason_key = str(reason or "").strip().lower()
|
|
|
|
if event == "SIP_NO_FILL":
|
|
if reason_key == "insufficient_funds":
|
|
return "Insufficient funds for this SIP."
|
|
if reason_key == "broker_auth_expired":
|
|
return "Broker session expired. Reconnect broker."
|
|
if reason_key == "no_fill":
|
|
return "Order was not filled."
|
|
return f"SIP not executed: {_humanize_reason(reason) or 'Unknown reason'}."
|
|
|
|
if event == "BROKER_AUTH_EXPIRED":
|
|
return "Broker session expired. Reconnect broker."
|
|
if event == "PRICE_FETCH_ERROR":
|
|
return "Could not fetch prices. Retrying."
|
|
if event == "HISTORY_LOAD_ERROR":
|
|
return "Could not load price history. Retrying."
|
|
if event == "ENGINE_ERROR":
|
|
return message or "Strategy engine hit an error."
|
|
if event == "EXECUTION_BLOCKED":
|
|
if reason_key == "market_holiday":
|
|
return "Exchange holiday. Execution will resume next session."
|
|
if reason_key == "market_weekend":
|
|
return "Weekend closure. Execution will resume next session."
|
|
if reason_key == "market_pre_open":
|
|
return "Market has not opened yet. Execution will begin after 9:15 AM IST."
|
|
if reason_key == "market_post_close":
|
|
return "Market is closed for the day. Execution will resume next session."
|
|
if reason_key == "market_calendar_unavailable":
|
|
return "Market calendar unavailable. Execution paused for safety."
|
|
if reason_key == "market_closed":
|
|
return "Market is closed. Execution will resume next session."
|
|
return f"Execution blocked: {_humanize_reason(reason) or 'Unknown reason'}."
|
|
if event == "ORDER_REJECTED":
|
|
return message or payload.get("status_message") or "Broker rejected the order."
|
|
if event == "ORDER_CANCELLED":
|
|
return message or "Order was cancelled."
|
|
|
|
return message or _humanize_reason(reason) or "Strategy update available."
|
|
|
|
|
|
def _issue_is_stale_for_current_state(
|
|
user_id: str,
|
|
status: dict,
|
|
event: str,
|
|
data: dict | None,
|
|
meta: dict | None,
|
|
):
|
|
status_key = (status.get("status") or "IDLE").upper()
|
|
cfg = status.get("config") if isinstance(status.get("config"), dict) else {}
|
|
mode = str(cfg.get("mode") or "").strip().upper()
|
|
|
|
payload = data if isinstance(data, dict) else {}
|
|
extra = meta if isinstance(meta, dict) else {}
|
|
reason = payload.get("reason") or extra.get("reason")
|
|
reason_key = str(reason or "").strip().lower()
|
|
|
|
if status_key == "STOPPED" and event in {
|
|
"BROKER_AUTH_EXPIRED",
|
|
"EXECUTION_BLOCKED",
|
|
"SIP_NO_FILL",
|
|
"PRICE_FETCH_ERROR",
|
|
"HISTORY_LOAD_ERROR",
|
|
"ENGINE_ERROR",
|
|
"ORDER_REJECTED",
|
|
"ORDER_CANCELLED",
|
|
}:
|
|
return True
|
|
|
|
if event == "EXECUTION_BLOCKED" and reason_key.startswith("market_"):
|
|
current_session = market_session(market_now())
|
|
current_reason = str(current_session.get("reason") or "").strip().lower()
|
|
current_status = str(current_session.get("status") or "").strip().upper()
|
|
if reason_key == "market_holiday":
|
|
return current_status != "HOLIDAY"
|
|
if reason_key == "market_calendar_unavailable":
|
|
return current_reason != "calendar_unavailable"
|
|
if reason_key in {"market_weekend", "market_pre_open", "market_post_close", "market_closed"}:
|
|
return current_status == "OPEN"
|
|
return False
|
|
|
|
if mode != "LIVE":
|
|
return False
|
|
|
|
auth_related_issue = event == "BROKER_AUTH_EXPIRED" or (
|
|
event == "SIP_NO_FILL" and reason_key == "broker_auth_expired"
|
|
) or (event == "EXECUTION_BLOCKED" and reason_key in {"broker_auth_expired", "auth_expired"})
|
|
if not auth_related_issue:
|
|
return False
|
|
|
|
broker_state = get_user_broker(user_id) or {}
|
|
auth_state = str(broker_state.get("auth_state") or "").strip().upper()
|
|
return auth_state == "VALID"
|
|
|
|
|
|
def get_strategy_summary(user_id: str):
|
|
run_id = get_active_run_id(user_id)
|
|
status = get_strategy_status(user_id)
|
|
next_eligible_ts = status.get("next_eligible_ts")
|
|
|
|
summary = {
|
|
"run_id": run_id,
|
|
"status": status.get("status"),
|
|
"tone": "neutral",
|
|
"message": "No active strategy.",
|
|
"event": None,
|
|
"ts": None,
|
|
}
|
|
|
|
issue_row = None
|
|
block_reason = str(status.get("strategy_block_reason") or "").strip().lower()
|
|
if block_reason == "broker_disconnected":
|
|
summary.update(
|
|
{
|
|
"tone": "warning",
|
|
"message": "Broker disconnected. Live strategy is stopped until you reconnect.",
|
|
"event": "BROKER_DISCONNECTED",
|
|
}
|
|
)
|
|
return summary
|
|
if block_reason == "broker_auth_required" or (status.get("status") or "").upper() == "PAUSED_AUTH_EXPIRED":
|
|
summary.update(
|
|
{
|
|
"tone": "warning",
|
|
"message": "Broker session expired. Reconnect broker to resume the strategy.",
|
|
"event": "BROKER_AUTH_EXPIRED",
|
|
}
|
|
)
|
|
return summary
|
|
|
|
if run_id:
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT event, message, data, meta, ts
|
|
FROM engine_event
|
|
WHERE user_id = %s
|
|
AND run_id = %s
|
|
AND event IN (
|
|
'SIP_NO_FILL',
|
|
'BROKER_AUTH_EXPIRED',
|
|
'PRICE_FETCH_ERROR',
|
|
'HISTORY_LOAD_ERROR',
|
|
'ENGINE_ERROR',
|
|
'EXECUTION_BLOCKED',
|
|
'ORDER_REJECTED',
|
|
'ORDER_CANCELLED'
|
|
)
|
|
ORDER BY ts DESC
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
issue_row = cur.fetchone()
|
|
|
|
if issue_row:
|
|
event, message, data, meta, ts = issue_row
|
|
if not _issue_is_stale_for_current_state(user_id, status, event, data, meta):
|
|
summary.update(
|
|
{
|
|
"tone": "error" if event in {"ENGINE_ERROR", "ORDER_REJECTED"} else "warning",
|
|
"message": _issue_message(event, message, data, meta),
|
|
"event": event,
|
|
"ts": serialize_timestamp(ts),
|
|
}
|
|
)
|
|
return summary
|
|
|
|
status_key = (status.get("status") or "IDLE").upper()
|
|
if status_key == "WAITING" and next_eligible_ts:
|
|
summary.update(
|
|
{
|
|
"tone": "warning",
|
|
"message": f"Waiting until {next_eligible_ts}.",
|
|
}
|
|
)
|
|
return summary
|
|
if status_key == "RUNNING":
|
|
summary.update(
|
|
{
|
|
"tone": "success",
|
|
"message": "Strategy is running.",
|
|
}
|
|
)
|
|
return summary
|
|
if status_key == "STOPPED":
|
|
summary.update(
|
|
{
|
|
"tone": "neutral",
|
|
"message": "Strategy is stopped.",
|
|
}
|
|
)
|
|
return summary
|
|
return summary
|
|
|
|
def get_market_status():
|
|
now = market_now()
|
|
session = market_session(now)
|
|
status = str(session.get("status") or "CLOSED")
|
|
reason = str(session.get("reason") or "")
|
|
next_open_at = None
|
|
try:
|
|
next_open_at = serialize_timestamp(next_market_open_after(now))
|
|
except UnsupportedCalendarYearError:
|
|
next_open_at = None
|
|
return {
|
|
"status": status,
|
|
"reason": reason,
|
|
"checked_at": serialize_timestamp(now),
|
|
"next_open_at": next_open_at,
|
|
}
|