Add resume strategy functionality and update run status handling
This commit is contained in:
parent
5f29d8c757
commit
79c036ca8b
@ -2,6 +2,7 @@ from fastapi import APIRouter, Query, Request
|
|||||||
from app.models import StrategyStartRequest
|
from app.models import StrategyStartRequest
|
||||||
from app.services.strategy_service import (
|
from app.services.strategy_service import (
|
||||||
start_strategy,
|
start_strategy,
|
||||||
|
resume_strategy,
|
||||||
stop_strategy,
|
stop_strategy,
|
||||||
get_strategy_status,
|
get_strategy_status,
|
||||||
get_strategy_summary,
|
get_strategy_summary,
|
||||||
@ -23,6 +24,11 @@ def stop(request: Request):
|
|||||||
user_id = get_request_user_id(request)
|
user_id = get_request_user_id(request)
|
||||||
return stop_strategy(user_id)
|
return stop_strategy(user_id)
|
||||||
|
|
||||||
|
@router.post("/strategy/resume")
|
||||||
|
def resume(request: Request):
|
||||||
|
user_id = get_request_user_id(request)
|
||||||
|
return resume_strategy(user_id)
|
||||||
|
|
||||||
@router.get("/strategy/status")
|
@router.get("/strategy/status")
|
||||||
def status(request: Request):
|
def status(request: Request):
|
||||||
user_id = get_request_user_id(request)
|
user_id = get_request_user_id(request)
|
||||||
|
|||||||
@ -157,7 +157,8 @@ def update_run_status(user_id: str, run_id: str, status: str, meta: dict | None
|
|||||||
cur.execute(
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
UPDATE strategy_run
|
UPDATE strategy_run
|
||||||
SET status = %s, started_at = COALESCE(started_at, %s), meta = COALESCE(meta, '{}'::jsonb) || %s
|
SET status = %s, started_at = COALESCE(started_at, %s), stopped_at = NULL,
|
||||||
|
meta = COALESCE(meta, '{}'::jsonb) || %s
|
||||||
WHERE run_id = %s AND user_id = %s
|
WHERE run_id = %s AND user_id = %s
|
||||||
""",
|
""",
|
||||||
(status, now, Json(meta or {}), run_id, user_id),
|
(status, now, Json(meta or {}), run_id, user_id),
|
||||||
|
|||||||
@ -69,6 +69,16 @@ def stop_run(user_id: str, run_id: str, reason="user_request"):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def resume_run(user_id: str, run_id: str):
|
||||||
|
emit_event(
|
||||||
|
user_id=user_id,
|
||||||
|
run_id=run_id,
|
||||||
|
event="STRATEGY_RESUMED",
|
||||||
|
message="Strategy resumed",
|
||||||
|
meta={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def emit_event(
|
def emit_event(
|
||||||
*,
|
*,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
@ -270,6 +280,15 @@ def deactivate_strategy_config(user_id: str, run_id: str):
|
|||||||
cfg["active"] = False
|
cfg["active"] = False
|
||||||
_save_config(cfg, user_id, run_id)
|
_save_config(cfg, user_id, run_id)
|
||||||
|
|
||||||
|
|
||||||
|
def reactivate_strategy_config(user_id: str, run_id: str):
|
||||||
|
cfg = _load_config(user_id, run_id)
|
||||||
|
if not cfg:
|
||||||
|
return {}
|
||||||
|
cfg["active"] = True
|
||||||
|
_save_config(cfg, user_id, run_id)
|
||||||
|
return cfg
|
||||||
|
|
||||||
def _write_status(user_id: str, run_id: str, status):
|
def _write_status(user_id: str, run_id: str, status):
|
||||||
now_local = datetime.now().astimezone()
|
now_local = datetime.now().astimezone()
|
||||||
with db_connection() as conn:
|
with db_connection() as conn:
|
||||||
@ -569,8 +588,61 @@ def stop_strategy(user_id: str):
|
|||||||
|
|
||||||
return {"status": "stopped"}
|
return {"status": "stopped"}
|
||||||
|
|
||||||
|
|
||||||
|
def resume_strategy(user_id: str):
|
||||||
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
||||||
|
running_run_id = get_running_run_id(user_id)
|
||||||
|
if running_run_id:
|
||||||
|
return {"status": "already_running", "run_id": running_run_id}
|
||||||
|
|
||||||
|
run_id = get_active_run_id(user_id)
|
||||||
|
cfg = _load_config(user_id, run_id)
|
||||||
|
strategy_name = (cfg.get("strategy") or "").strip()
|
||||||
|
mode = (cfg.get("mode") or "").strip().upper()
|
||||||
|
if not strategy_name or mode not in {"LIVE", "PAPER"}:
|
||||||
|
return {"status": "no_resumable_run"}
|
||||||
|
|
||||||
|
if mode == "LIVE":
|
||||||
|
is_valid, broker_state, _failure_reason = _validate_live_broker_session(user_id)
|
||||||
|
if not is_valid:
|
||||||
|
return {
|
||||||
|
"status": "broker_auth_required",
|
||||||
|
"redirect_url": "/api/broker/login",
|
||||||
|
"broker": broker_state.get("broker"),
|
||||||
|
}
|
||||||
|
|
||||||
|
reactivate_strategy_config(user_id, run_id)
|
||||||
|
resume_run(user_id, run_id)
|
||||||
|
_write_status(user_id, run_id, "RUNNING")
|
||||||
|
update_run_status(user_id, run_id, "RUNNING", meta={"reason": "user_resume"})
|
||||||
|
|
||||||
|
if not engine_external:
|
||||||
|
engine_config = _build_engine_config(user_id, run_id, None)
|
||||||
|
if not engine_config:
|
||||||
|
return {"status": "resume_failed", "run_id": run_id}
|
||||||
|
started = start_engine(engine_config)
|
||||||
|
if not started:
|
||||||
|
return {"status": "resume_failed", "run_id": run_id}
|
||||||
|
_write_status(user_id, run_id, "RUNNING")
|
||||||
|
|
||||||
|
try:
|
||||||
|
user = get_user_by_id(user_id)
|
||||||
|
if user:
|
||||||
|
body = (
|
||||||
|
"Your strategy has been resumed.\n\n"
|
||||||
|
f"Strategy: {strategy_name}\n"
|
||||||
|
f"Mode: {mode}\n"
|
||||||
|
f"Run ID: {run_id}\n"
|
||||||
|
)
|
||||||
|
send_email_async(user["username"], "Strategy resumed", body)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return {"status": "resumed", "run_id": run_id}
|
||||||
|
|
||||||
def get_strategy_status(user_id: str):
|
def get_strategy_status(user_id: str):
|
||||||
run_id = get_active_run_id(user_id)
|
run_id = get_active_run_id(user_id)
|
||||||
|
cfg = _load_config(user_id, run_id) if run_id else {}
|
||||||
with db_connection() as conn:
|
with db_connection() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
@ -585,24 +657,32 @@ def get_strategy_status(user_id: str):
|
|||||||
"status": row[0],
|
"status": row[0],
|
||||||
"last_updated": _format_local_ts(row[1]),
|
"last_updated": _format_local_ts(row[1]),
|
||||||
}
|
}
|
||||||
|
status["run_id"] = run_id
|
||||||
|
sip_frequency = cfg.get("sip_frequency")
|
||||||
|
if not isinstance(sip_frequency, dict):
|
||||||
|
frequency = cfg.get("frequency")
|
||||||
|
unit = cfg.get("unit")
|
||||||
|
if isinstance(frequency, dict):
|
||||||
|
unit = frequency.get("unit", unit)
|
||||||
|
frequency = frequency.get("value")
|
||||||
|
if frequency is None and cfg.get("frequency_days") is not None:
|
||||||
|
frequency = cfg.get("frequency_days")
|
||||||
|
unit = unit or "days"
|
||||||
|
if frequency is not None and unit:
|
||||||
|
sip_frequency = {"value": frequency, "unit": unit}
|
||||||
|
status["config"] = {
|
||||||
|
"strategy": cfg.get("strategy"),
|
||||||
|
"sip_amount": cfg.get("sip_amount"),
|
||||||
|
"sip_frequency": sip_frequency,
|
||||||
|
"mode": cfg.get("mode"),
|
||||||
|
"broker": cfg.get("broker"),
|
||||||
|
"active": cfg.get("active"),
|
||||||
|
}
|
||||||
if status.get("status") == "RUNNING":
|
if status.get("status") == "RUNNING":
|
||||||
cfg = _load_config(user_id, run_id)
|
|
||||||
mode = (cfg.get("mode") or "LIVE").strip().upper()
|
mode = (cfg.get("mode") or "LIVE").strip().upper()
|
||||||
with engine_context(user_id, run_id):
|
with engine_context(user_id, run_id):
|
||||||
state = load_state(mode=mode)
|
state = load_state(mode=mode)
|
||||||
last_execution_ts = _last_execution_ts(state, mode)
|
last_execution_ts = _last_execution_ts(state, mode)
|
||||||
sip_frequency = cfg.get("sip_frequency")
|
|
||||||
if not isinstance(sip_frequency, dict):
|
|
||||||
frequency = cfg.get("frequency")
|
|
||||||
unit = cfg.get("unit")
|
|
||||||
if isinstance(frequency, dict):
|
|
||||||
unit = frequency.get("unit", unit)
|
|
||||||
frequency = frequency.get("value")
|
|
||||||
if frequency is None and cfg.get("frequency_days") is not None:
|
|
||||||
frequency = cfg.get("frequency_days")
|
|
||||||
unit = unit or "days"
|
|
||||||
if frequency is not None and unit:
|
|
||||||
sip_frequency = {"value": frequency, "unit": unit}
|
|
||||||
next_eligible = compute_next_eligible(last_execution_ts, sip_frequency)
|
next_eligible = compute_next_eligible(last_execution_ts, sip_frequency)
|
||||||
status["last_execution_ts"] = last_execution_ts
|
status["last_execution_ts"] = last_execution_ts
|
||||||
status["next_eligible_ts"] = next_eligible
|
status["next_eligible_ts"] = next_eligible
|
||||||
@ -614,6 +694,10 @@ def get_strategy_status(user_id: str):
|
|||||||
status["status"] = "WAITING"
|
status["status"] = "WAITING"
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
status_key = (status.get("status") or "IDLE").upper()
|
||||||
|
resumable = bool(cfg.get("strategy")) and bool(cfg.get("mode"))
|
||||||
|
status["can_resume"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
||||||
|
status["can_restart"] = resumable and status_key in {"STOPPED", "PAUSED_AUTH_EXPIRED"}
|
||||||
return status
|
return status
|
||||||
|
|
||||||
def get_engine_status(user_id: str):
|
def get_engine_status(user_id: str):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user