763 lines
28 KiB
Python
763 lines
28 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
import hashlib
|
|
import os
|
|
|
|
from psycopg2.extras import Json
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
from app.services.db import db_connection
|
|
from app.services.run_service import get_running_run_id
|
|
from indian_paper_trading_strategy.engine.runner import stop_engine
|
|
|
|
|
|
def _paginate(page: int, page_size: int):
|
|
page = max(page, 1)
|
|
page_size = max(min(page_size, 200), 1)
|
|
offset = (page - 1) * page_size
|
|
return page, page_size, offset
|
|
|
|
|
|
def get_overview():
|
|
now = datetime.now(timezone.utc)
|
|
since = now - timedelta(hours=24)
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT COUNT(*) FROM app_user")
|
|
total_users = cur.fetchone()[0]
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(DISTINCT user_id)
|
|
FROM app_session
|
|
WHERE COALESCE(last_seen_at, created_at) >= %s
|
|
""",
|
|
(since,),
|
|
)
|
|
users_logged_in_last_24h = cur.fetchone()[0]
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total_runs,
|
|
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_runs,
|
|
COUNT(*) FILTER (WHERE status = 'STOPPED') AS stopped_runs,
|
|
COUNT(*) FILTER (WHERE status = 'ERROR') AS error_runs,
|
|
COUNT(*) FILTER (WHERE mode = 'LIVE') AS live_runs_count,
|
|
COUNT(*) FILTER (WHERE mode = 'PAPER') AS paper_runs_count
|
|
FROM strategy_run
|
|
"""
|
|
)
|
|
run_row = cur.fetchone()
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM paper_order WHERE "timestamp" >= %s
|
|
""",
|
|
(since,),
|
|
)
|
|
orders_last_24h = cur.fetchone()[0]
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM paper_trade WHERE "timestamp" >= %s
|
|
""",
|
|
(since,),
|
|
)
|
|
trades_last_24h = cur.fetchone()[0]
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*)
|
|
FROM event_ledger
|
|
WHERE event = 'SIP_EXECUTED' AND "timestamp" >= %s
|
|
""",
|
|
(since,),
|
|
)
|
|
sip_executed_last_24h = cur.fetchone()[0]
|
|
cur.execute(
|
|
"""
|
|
SELECT ts, event, message, source, user_id, run_id
|
|
FROM (
|
|
SELECT ts, event, message, 'engine_event' AS source, user_id, run_id
|
|
FROM engine_event
|
|
WHERE event ILIKE '%ERROR%'
|
|
UNION ALL
|
|
SELECT ts, event, message, 'strategy_log' AS source, user_id, run_id
|
|
FROM strategy_log
|
|
WHERE level = 'ERROR'
|
|
) t
|
|
ORDER BY ts DESC NULLS LAST
|
|
LIMIT 10
|
|
"""
|
|
)
|
|
top_errors = [
|
|
{
|
|
"ts": row[0],
|
|
"event": row[1],
|
|
"message": row[2],
|
|
"source": row[3],
|
|
"user_id": row[4],
|
|
"run_id": row[5],
|
|
}
|
|
for row in cur.fetchall()
|
|
]
|
|
return {
|
|
"total_users": total_users,
|
|
"users_logged_in_last_24h": users_logged_in_last_24h,
|
|
"total_runs": run_row[0],
|
|
"running_runs": run_row[1],
|
|
"stopped_runs": run_row[2],
|
|
"error_runs": run_row[3],
|
|
"live_runs_count": run_row[4],
|
|
"paper_runs_count": run_row[5],
|
|
"orders_last_24h": orders_last_24h,
|
|
"trades_last_24h": trades_last_24h,
|
|
"sip_executed_last_24h": sip_executed_last_24h,
|
|
"top_errors": top_errors,
|
|
}
|
|
|
|
|
|
def get_users(page: int, page_size: int, query: str | None):
|
|
page, page_size, offset = _paginate(page, page_size)
|
|
params = []
|
|
where = ""
|
|
if query:
|
|
where = "WHERE username ILIKE %s OR user_id = %s"
|
|
params = [f"%{query}%", query]
|
|
with db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(f"SELECT COUNT(*) FROM admin_user_metrics {where}", params)
|
|
total = cur.fetchone()["count"]
|
|
cur.execute(
|
|
f"""
|
|
SELECT *
|
|
FROM admin_user_metrics
|
|
{where}
|
|
ORDER BY created_at DESC NULLS LAST
|
|
LIMIT %s OFFSET %s
|
|
""",
|
|
(*params, page_size, offset),
|
|
)
|
|
rows = cur.fetchall()
|
|
return {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"users": rows,
|
|
}
|
|
|
|
|
|
def _get_active_run_id(cur, user_id: str):
|
|
cur.execute(
|
|
"""
|
|
SELECT run_id
|
|
FROM strategy_run
|
|
WHERE user_id = %s AND status = 'RUNNING'
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(user_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
return row[0]
|
|
cur.execute(
|
|
"""
|
|
SELECT run_id
|
|
FROM strategy_run
|
|
WHERE user_id = %s
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(user_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
def get_user_detail(user_id: str):
|
|
with db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("SELECT * FROM admin_user_metrics WHERE user_id = %s", (user_id,))
|
|
user = cur.fetchone()
|
|
if not user:
|
|
return None
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM admin_run_metrics
|
|
WHERE user_id = %s
|
|
ORDER BY created_at DESC NULLS LAST
|
|
LIMIT 20
|
|
""",
|
|
(user_id,),
|
|
)
|
|
runs = cur.fetchall()
|
|
|
|
active_run_id = _get_active_run_id(cur, user_id)
|
|
config = None
|
|
if active_run_id:
|
|
cur.execute(
|
|
"""
|
|
SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit,
|
|
mode, broker, active, frequency, frequency_days, unit, next_run
|
|
FROM strategy_config
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, active_run_id),
|
|
)
|
|
cfg_row = cur.fetchone()
|
|
if cfg_row:
|
|
config = dict(cfg_row)
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT ts, event, message, level, run_id, meta, 'strategy_log' AS source
|
|
FROM strategy_log
|
|
WHERE user_id = %s
|
|
UNION ALL
|
|
SELECT ts, event, message, NULL AS level, run_id, meta, 'engine_event' AS source
|
|
FROM engine_event
|
|
WHERE user_id = %s
|
|
ORDER BY ts DESC NULLS LAST
|
|
LIMIT 50
|
|
""",
|
|
(user_id, user_id),
|
|
)
|
|
events = [
|
|
{
|
|
"ts": row[0],
|
|
"event": row[1],
|
|
"message": row[2],
|
|
"level": row[3],
|
|
"run_id": row[4],
|
|
"meta": row[5],
|
|
"source": row[6],
|
|
}
|
|
for row in cur.fetchall()
|
|
]
|
|
|
|
capital_summary = {
|
|
"cash": None,
|
|
"invested": None,
|
|
"mtm": None,
|
|
"equity": None,
|
|
"pnl": None,
|
|
}
|
|
if active_run_id:
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
(SELECT cash FROM paper_broker_account WHERE user_id = %s AND run_id = %s LIMIT 1) AS cash,
|
|
(SELECT total_invested FROM engine_state_paper WHERE user_id = %s AND run_id = %s LIMIT 1) AS invested,
|
|
(SELECT portfolio_value FROM mtm_ledger WHERE user_id = %s AND run_id = %s ORDER BY "timestamp" DESC LIMIT 1) AS mtm,
|
|
(SELECT equity FROM paper_equity_curve WHERE user_id = %s AND run_id = %s ORDER BY "timestamp" DESC LIMIT 1) AS equity,
|
|
(SELECT pnl FROM paper_equity_curve WHERE user_id = %s AND run_id = %s ORDER BY "timestamp" DESC LIMIT 1) AS pnl
|
|
""",
|
|
(
|
|
user_id,
|
|
active_run_id,
|
|
user_id,
|
|
active_run_id,
|
|
user_id,
|
|
active_run_id,
|
|
user_id,
|
|
active_run_id,
|
|
user_id,
|
|
active_run_id,
|
|
),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
capital_summary = {
|
|
"cash": row[0],
|
|
"invested": row[1],
|
|
"mtm": row[2],
|
|
"equity": row[3],
|
|
"pnl": row[4],
|
|
}
|
|
|
|
return {
|
|
"user": user,
|
|
"runs": runs,
|
|
"current_config": config,
|
|
"events": events,
|
|
"capital_summary": capital_summary,
|
|
}
|
|
|
|
|
|
def get_runs(page: int, page_size: int, status: str | None, mode: str | None, user_id: str | None):
|
|
page, page_size, offset = _paginate(page, page_size)
|
|
filters = []
|
|
params = []
|
|
if status:
|
|
filters.append("status = %s")
|
|
params.append(status)
|
|
if mode:
|
|
filters.append("mode = %s")
|
|
params.append(mode)
|
|
if user_id:
|
|
filters.append("user_id = %s")
|
|
params.append(user_id)
|
|
where = f"WHERE {' AND '.join(filters)}" if filters else ""
|
|
|
|
with db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(f"SELECT COUNT(*) FROM admin_run_metrics {where}", params)
|
|
total = cur.fetchone()["count"]
|
|
cur.execute(
|
|
f"""
|
|
SELECT *
|
|
FROM admin_run_metrics
|
|
{where}
|
|
ORDER BY created_at DESC NULLS LAST
|
|
LIMIT %s OFFSET %s
|
|
""",
|
|
(*params, page_size, offset),
|
|
)
|
|
runs = cur.fetchall()
|
|
return {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"runs": runs,
|
|
}
|
|
|
|
|
|
def get_run_detail(run_id: str):
|
|
with db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("SELECT * FROM admin_run_metrics WHERE run_id = %s", (run_id,))
|
|
run = cur.fetchone()
|
|
if not run:
|
|
return None
|
|
|
|
user_id = run["user_id"]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit,
|
|
mode, broker, active, frequency, frequency_days, unit, next_run
|
|
FROM strategy_config
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
config = cur.fetchone()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT status, last_updated
|
|
FROM engine_status
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
engine_status = cur.fetchone()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT initial_cash, cash, total_invested, nifty_units, gold_units,
|
|
last_sip_ts, last_run, sip_frequency_value, sip_frequency_unit
|
|
FROM engine_state_paper
|
|
WHERE user_id = %s AND run_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
state = cur.fetchone()
|
|
state_snapshot = dict(state) if state else None
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT event, "timestamp", logical_time, nifty_units, gold_units, nifty_price, gold_price, amount
|
|
FROM event_ledger
|
|
WHERE user_id = %s AND run_id = %s
|
|
ORDER BY "timestamp" DESC
|
|
LIMIT 100
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
ledger_events = cur.fetchall()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, symbol, side, qty, price, status, "timestamp"
|
|
FROM paper_order
|
|
WHERE user_id = %s AND run_id = %s
|
|
ORDER BY "timestamp" DESC
|
|
LIMIT 50
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
orders = cur.fetchall()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, order_id, symbol, side, qty, price, "timestamp"
|
|
FROM paper_trade
|
|
WHERE user_id = %s AND run_id = %s
|
|
ORDER BY "timestamp" DESC
|
|
LIMIT 50
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
trades = cur.fetchall()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM (
|
|
SELECT logical_time FROM event_ledger
|
|
WHERE user_id = %s AND run_id = %s
|
|
GROUP BY logical_time, event
|
|
HAVING COUNT(*) > 1
|
|
) t
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
dup_event = cur.fetchone()["count"]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM (
|
|
SELECT logical_time FROM mtm_ledger
|
|
WHERE user_id = %s AND run_id = %s
|
|
GROUP BY logical_time
|
|
HAVING COUNT(*) > 1
|
|
) t
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
dup_mtm = cur.fetchone()["count"]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM paper_broker_account
|
|
WHERE user_id = %s AND run_id = %s AND cash < 0
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
neg_cash = cur.fetchone()["count"]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM paper_order
|
|
WHERE user_id = %s AND run_id = %s AND qty <= 0
|
|
""",
|
|
(user_id, run_id),
|
|
)
|
|
bad_qty = cur.fetchone()["count"]
|
|
|
|
invariants = {
|
|
"duplicate_event_logical_time": dup_event,
|
|
"duplicate_mtm_logical_time": dup_mtm,
|
|
"negative_cash": neg_cash,
|
|
"invalid_qty": bad_qty,
|
|
}
|
|
|
|
return {
|
|
"run": run,
|
|
"config": dict(config) if config else None,
|
|
"engine_status": dict(engine_status) if engine_status else None,
|
|
"state_snapshot": state_snapshot,
|
|
"ledger_events": ledger_events,
|
|
"orders": orders,
|
|
"trades": trades,
|
|
"invariants": invariants,
|
|
}
|
|
|
|
|
|
def get_invariants(stale_minutes: int = 30):
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=stale_minutes)
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM (
|
|
SELECT user_id FROM strategy_run
|
|
WHERE status = 'RUNNING'
|
|
GROUP BY user_id
|
|
HAVING COUNT(*) > 1
|
|
) t
|
|
"""
|
|
)
|
|
running_runs_per_user_violations = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM (
|
|
SELECT user_id, run_id FROM engine_state
|
|
UNION ALL
|
|
SELECT user_id, run_id FROM engine_status
|
|
UNION ALL
|
|
SELECT user_id, run_id FROM paper_order
|
|
UNION ALL
|
|
SELECT user_id, run_id FROM paper_trade
|
|
) t
|
|
LEFT JOIN strategy_run sr
|
|
ON sr.user_id = t.user_id AND sr.run_id = t.run_id
|
|
WHERE sr.run_id IS NULL
|
|
"""
|
|
)
|
|
orphan_rows = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM (
|
|
SELECT user_id, run_id, logical_time, event
|
|
FROM event_ledger
|
|
GROUP BY user_id, run_id, logical_time, event
|
|
HAVING COUNT(*) > 1
|
|
) t
|
|
"""
|
|
)
|
|
dup_event = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM (
|
|
SELECT user_id, run_id, logical_time
|
|
FROM mtm_ledger
|
|
GROUP BY user_id, run_id, logical_time
|
|
HAVING COUNT(*) > 1
|
|
) t
|
|
"""
|
|
)
|
|
dup_mtm = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM paper_broker_account WHERE cash < 0"
|
|
)
|
|
negative_cash = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM paper_order WHERE qty <= 0"
|
|
)
|
|
invalid_qty = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM strategy_run sr
|
|
LEFT JOIN (
|
|
SELECT user_id, run_id, MAX(ts) AS last_ts
|
|
FROM (
|
|
SELECT user_id, run_id, ts FROM engine_event
|
|
UNION ALL
|
|
SELECT user_id, run_id, ts FROM strategy_log
|
|
UNION ALL
|
|
SELECT user_id, run_id, "timestamp" AS ts FROM event_ledger
|
|
) t
|
|
GROUP BY user_id, run_id
|
|
) activity
|
|
ON activity.user_id = sr.user_id AND activity.run_id = sr.run_id
|
|
WHERE sr.status = 'RUNNING' AND (activity.last_ts IS NULL OR activity.last_ts < %s)
|
|
""",
|
|
(cutoff,),
|
|
)
|
|
stale_running_runs = cur.fetchone()[0]
|
|
|
|
return {
|
|
"running_runs_per_user_violations": running_runs_per_user_violations,
|
|
"orphan_rows": orphan_rows,
|
|
"duplicate_logical_time": dup_event + dup_mtm,
|
|
"negative_cash": negative_cash,
|
|
"invalid_qty": invalid_qty,
|
|
"stale_running_runs": stale_running_runs,
|
|
}
|
|
|
|
|
|
def get_support_tickets(page: int, page_size: int):
|
|
page, page_size, offset = _paginate(page, page_size)
|
|
with db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("SELECT COUNT(*) FROM support_ticket")
|
|
total = cur.fetchone()["count"]
|
|
cur.execute(
|
|
"""
|
|
SELECT id AS ticket_id, name, email, subject, message, status, created_at, updated_at
|
|
FROM support_ticket
|
|
ORDER BY created_at DESC NULLS LAST
|
|
LIMIT %s OFFSET %s
|
|
""",
|
|
(page_size, offset),
|
|
)
|
|
rows = cur.fetchall()
|
|
tickets = []
|
|
for row in rows:
|
|
ticket = dict(row)
|
|
ticket["ticket_id"] = str(ticket.get("ticket_id"))
|
|
if ticket.get("created_at"):
|
|
ticket["created_at"] = ticket["created_at"]
|
|
if ticket.get("updated_at"):
|
|
ticket["updated_at"] = ticket["updated_at"]
|
|
tickets.append(ticket)
|
|
return {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"tickets": tickets,
|
|
}
|
|
|
|
|
|
def delete_support_ticket(ticket_id: str) -> dict | None:
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("DELETE FROM support_ticket WHERE id = %s", (ticket_id,))
|
|
if cur.rowcount == 0:
|
|
return None
|
|
return {"ticket_id": ticket_id, "deleted": True}
|
|
|
|
|
|
def _hash_value(value: str | None) -> str | None:
|
|
if value is None:
|
|
return None
|
|
return hashlib.sha256(value.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def delete_user_hard(user_id: str, admin_user: dict):
|
|
table_counts = [
|
|
("app_user", "SELECT COUNT(*) FROM app_user WHERE id = %s"),
|
|
("app_session", "SELECT COUNT(*) FROM app_session WHERE user_id = %s"),
|
|
("user_broker", "SELECT COUNT(*) FROM user_broker WHERE user_id = %s"),
|
|
("zerodha_session", "SELECT COUNT(*) FROM zerodha_session WHERE user_id = %s"),
|
|
("zerodha_request_token", "SELECT COUNT(*) FROM zerodha_request_token WHERE user_id = %s"),
|
|
("strategy_run", "SELECT COUNT(*) FROM strategy_run WHERE user_id = %s"),
|
|
("strategy_config", "SELECT COUNT(*) FROM strategy_config WHERE user_id = %s"),
|
|
("strategy_log", "SELECT COUNT(*) FROM strategy_log WHERE user_id = %s"),
|
|
("engine_status", "SELECT COUNT(*) FROM engine_status WHERE user_id = %s"),
|
|
("engine_state", "SELECT COUNT(*) FROM engine_state WHERE user_id = %s"),
|
|
("engine_state_paper", "SELECT COUNT(*) FROM engine_state_paper WHERE user_id = %s"),
|
|
("engine_event", "SELECT COUNT(*) FROM engine_event WHERE user_id = %s"),
|
|
("paper_broker_account", "SELECT COUNT(*) FROM paper_broker_account WHERE user_id = %s"),
|
|
("paper_position", "SELECT COUNT(*) FROM paper_position WHERE user_id = %s"),
|
|
("paper_order", "SELECT COUNT(*) FROM paper_order WHERE user_id = %s"),
|
|
("paper_trade", "SELECT COUNT(*) FROM paper_trade WHERE user_id = %s"),
|
|
("paper_equity_curve", "SELECT COUNT(*) FROM paper_equity_curve WHERE user_id = %s"),
|
|
("mtm_ledger", "SELECT COUNT(*) FROM mtm_ledger WHERE user_id = %s"),
|
|
("event_ledger", "SELECT COUNT(*) FROM event_ledger WHERE user_id = %s"),
|
|
]
|
|
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id, username FROM app_user WHERE id = %s",
|
|
(user_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
target_username = row[1]
|
|
|
|
counts = {}
|
|
for name, query in table_counts:
|
|
cur.execute(query, (user_id,))
|
|
counts[name] = cur.fetchone()[0]
|
|
|
|
cur.execute("DELETE FROM app_user WHERE id = %s", (user_id,))
|
|
if cur.rowcount == 0:
|
|
return None
|
|
|
|
audit_meta = {"deleted": counts, "hard": True}
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO admin_audit_log
|
|
(actor_user_hash, target_user_hash, target_username_hash, action, meta)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
_hash_value(admin_user["id"]),
|
|
_hash_value(user_id),
|
|
_hash_value(target_username),
|
|
"HARD_DELETE_USER",
|
|
Json(audit_meta),
|
|
),
|
|
)
|
|
audit_id = cur.fetchone()[0]
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"deleted": counts,
|
|
"audit_id": audit_id,
|
|
}
|
|
|
|
|
|
def hard_reset_user_data(user_id: str, admin_user: dict):
|
|
table_counts = [
|
|
("strategy_run", "SELECT COUNT(*) FROM strategy_run WHERE user_id = %s"),
|
|
("strategy_config", "SELECT COUNT(*) FROM strategy_config WHERE user_id = %s"),
|
|
("strategy_log", "SELECT COUNT(*) FROM strategy_log WHERE user_id = %s"),
|
|
("engine_status", "SELECT COUNT(*) FROM engine_status WHERE user_id = %s"),
|
|
("engine_state", "SELECT COUNT(*) FROM engine_state WHERE user_id = %s"),
|
|
("engine_state_paper", "SELECT COUNT(*) FROM engine_state_paper WHERE user_id = %s"),
|
|
("engine_event", "SELECT COUNT(*) FROM engine_event WHERE user_id = %s"),
|
|
("paper_broker_account", "SELECT COUNT(*) FROM paper_broker_account WHERE user_id = %s"),
|
|
("paper_position", "SELECT COUNT(*) FROM paper_position WHERE user_id = %s"),
|
|
("paper_order", "SELECT COUNT(*) FROM paper_order WHERE user_id = %s"),
|
|
("paper_trade", "SELECT COUNT(*) FROM paper_trade WHERE user_id = %s"),
|
|
("paper_equity_curve", "SELECT COUNT(*) FROM paper_equity_curve WHERE user_id = %s"),
|
|
("mtm_ledger", "SELECT COUNT(*) FROM mtm_ledger WHERE user_id = %s"),
|
|
("event_ledger", "SELECT COUNT(*) FROM event_ledger WHERE user_id = %s"),
|
|
]
|
|
|
|
engine_external = os.getenv("ENGINE_EXTERNAL", "").strip().lower() in {"1", "true", "yes"}
|
|
running_run_id = get_running_run_id(user_id)
|
|
if running_run_id and not engine_external:
|
|
stop_engine(user_id, timeout=15.0)
|
|
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id, username FROM app_user WHERE id = %s",
|
|
(user_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
target_username = row[1]
|
|
|
|
counts = {}
|
|
for name, query in table_counts:
|
|
cur.execute(query, (user_id,))
|
|
counts[name] = cur.fetchone()[0]
|
|
|
|
cur.execute("DELETE FROM strategy_log WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM engine_event WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM paper_equity_curve WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM paper_trade WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM paper_order WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM paper_position WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM paper_broker_account WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM mtm_ledger WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM event_ledger WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM engine_state_paper WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM engine_state WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM engine_status WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM strategy_config WHERE user_id = %s", (user_id,))
|
|
cur.execute("DELETE FROM strategy_run WHERE user_id = %s", (user_id,))
|
|
|
|
audit_meta = {"reset": counts, "hard": True}
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO admin_audit_log
|
|
(actor_user_hash, target_user_hash, target_username_hash, action, meta)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
_hash_value(admin_user["id"]),
|
|
_hash_value(user_id),
|
|
_hash_value(target_username),
|
|
"HARD_RESET_USER",
|
|
Json(audit_meta),
|
|
),
|
|
)
|
|
audit_id = cur.fetchone()[0]
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"deleted": counts,
|
|
"audit_id": audit_id,
|
|
}
|