Reconcile stale running strategy state
This commit is contained in:
parent
760eb6ea1a
commit
c41f6f2411
@ -309,6 +309,43 @@ def _write_status(user_id: str, run_id: str, status):
|
||||
(user_id, run_id, status, now_local),
|
||||
)
|
||||
|
||||
|
||||
def _get_engine_status_row(user_id: str, run_id: str):
|
||||
if not user_id or not run_id:
|
||||
return None
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT status, last_updated
|
||||
FROM engine_status
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
LIMIT 1
|
||||
""",
|
||||
(user_id, run_id),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {"status": row[0], "last_updated": row[1]}
|
||||
|
||||
|
||||
def _effective_running_run_id(user_id: str):
|
||||
run_id = get_running_run_id(user_id)
|
||||
if not run_id:
|
||||
return None
|
||||
engine_row = _get_engine_status_row(user_id, run_id)
|
||||
engine_state = str((engine_row or {}).get("status") or "").strip().upper()
|
||||
if engine_state not in {"STOPPED", "ERROR"}:
|
||||
return run_id
|
||||
update_run_status(
|
||||
user_id,
|
||||
run_id,
|
||||
"STOPPED",
|
||||
meta={"reason": "engine_inactive", "engine_state": engine_state},
|
||||
)
|
||||
return None
|
||||
|
||||
def validate_frequency(freq: dict, mode: str):
|
||||
if not isinstance(freq, dict):
|
||||
raise ValueError("Frequency payload is required")
|
||||
@ -420,7 +457,7 @@ def _last_execution_ts(state: dict, mode: str) -> str | None:
|
||||
|
||||
def start_strategy(req, user_id: str):
|
||||
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
||||
running_run_id = get_running_run_id(user_id)
|
||||
running_run_id = _effective_running_run_id(user_id)
|
||||
if running_run_id:
|
||||
running_cfg = _load_config(user_id, running_run_id)
|
||||
running_mode = (running_cfg.get("mode") or req.mode or "PAPER").strip().upper()
|
||||
@ -623,7 +660,7 @@ def resume_running_runs():
|
||||
_write_status(user_id, run_id, "RUNNING")
|
||||
|
||||
def stop_strategy(user_id: str):
|
||||
run_id = get_running_run_id(user_id)
|
||||
run_id = _effective_running_run_id(user_id)
|
||||
if not run_id:
|
||||
latest_run_id = get_active_run_id(user_id)
|
||||
return {"status": "already_stopped", "run_id": latest_run_id}
|
||||
@ -662,7 +699,7 @@ def stop_strategy(user_id: str):
|
||||
|
||||
def resume_strategy(user_id: str):
|
||||
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
||||
running_run_id = get_running_run_id(user_id)
|
||||
running_run_id = _effective_running_run_id(user_id)
|
||||
if running_run_id:
|
||||
return {"status": "already_running", "run_id": running_run_id}
|
||||
|
||||
@ -743,7 +780,7 @@ def resume_strategy(user_id: str):
|
||||
return {"status": "resumed", "run_id": run_id}
|
||||
|
||||
def get_strategy_status(user_id: str):
|
||||
running_run_id = get_running_run_id(user_id)
|
||||
running_run_id = _effective_running_run_id(user_id)
|
||||
run_id = running_run_id or get_active_run_id(user_id)
|
||||
cfg = _load_config(user_id, run_id) if run_id else {}
|
||||
default_status = "RUNNING" if running_run_id else ("STOPPED" if run_id else "IDLE")
|
||||
@ -813,7 +850,7 @@ def get_strategy_status(user_id: str):
|
||||
return status
|
||||
|
||||
def get_engine_status(user_id: str):
|
||||
running_run_id = get_running_run_id(user_id)
|
||||
running_run_id = _effective_running_run_id(user_id)
|
||||
run_id = running_run_id or get_active_run_id(user_id)
|
||||
status = {
|
||||
"state": "STOPPED",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user