Compare commits

..

2 Commits

Author SHA1 Message Date
Thigazhezhilan J
1b14e7b23e Fix broker session showing connected after Zerodha token expiry
- Set connected=FALSE (was TRUE) when expiring broker session so the
  dashboard correctly reflects disconnected state
- Notify user by email when their Zerodha session expires so they know
  to reconnect before the next SIP execution

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 13:38:45 +05:30
Thigazhezhilan J
247a1c5107 Make engine event payloads JSON safe 2026-04-15 09:54:35 +05:30
4 changed files with 84 additions and 19 deletions

View File

@ -162,7 +162,7 @@ def expire_user_broker_session(user_id: str):
cur.execute( cur.execute(
""" """
UPDATE user_broker UPDATE user_broker
SET connected = TRUE, SET connected = FALSE,
access_token = NULL, access_token = NULL,
auth_state = 'EXPIRED' auth_state = 'EXPIRED'
WHERE user_id = %s WHERE user_id = %s

View File

@ -5,6 +5,7 @@ from fastapi.responses import HTMLResponse
from app.broker_store import expire_user_broker_session from app.broker_store import expire_user_broker_session
from app.services.auth_service import get_user_for_session from app.services.auth_service import get_user_for_session
from app.services.email_service import send_email_async
from app.services.live_equity_service import ( from app.services.live_equity_service import (
capture_live_equity_snapshot, capture_live_equity_snapshot,
get_live_equity_curve, get_live_equity_curve,
@ -48,14 +49,24 @@ def _capture_request_token(request: Request, request_token: str):
store_request_token(user["id"], token) store_request_token(user["id"], token)
def _clear_broker_session(user_id: str): def _clear_broker_session(user_id: str, email: str | None = None):
expire_user_broker_session(user_id) expire_user_broker_session(user_id)
clear_session(user_id) clear_session(user_id)
if email:
try:
body = (
"Your Zerodha session has expired and your broker connection has been disconnected.\n\n"
"Please log in to QuantFortune and reconnect your Zerodha account to resume your strategy.\n\n"
"If your strategy was running, it has been paused until you reconnect."
)
send_email_async(email, "Action required: Zerodha session expired", body)
except Exception:
pass
def _raise_kite_error(user_id: str, exc: KiteApiError): def _raise_kite_error(user_id: str, exc: KiteApiError, email: str | None = None):
if isinstance(exc, KiteTokenError): if isinstance(exc, KiteTokenError):
_clear_broker_session(user_id) _clear_broker_session(user_id, email=email)
raise HTTPException( raise HTTPException(
status_code=401, detail="Zerodha session expired. Please reconnect." status_code=401, detail="Zerodha session expired. Please reconnect."
) from exc ) from exc
@ -139,7 +150,7 @@ async def holdings(request: Request):
try: try:
data = fetch_holdings(session["api_key"], session["access_token"]) data = fetch_holdings(session["api_key"], session["access_token"])
except KiteApiError as exc: except KiteApiError as exc:
_raise_kite_error(user["id"], exc) _raise_kite_error(user["id"], exc, email=user["username"])
return {"holdings": [normalize_holding(item) for item in data]} return {"holdings": [normalize_holding(item) for item in data]}
@ -152,7 +163,7 @@ async def funds(request: Request):
try: try:
data = fetch_funds(session["api_key"], session["access_token"]) data = fetch_funds(session["api_key"], session["access_token"])
except KiteApiError as exc: except KiteApiError as exc:
_raise_kite_error(user["id"], exc) _raise_kite_error(user["id"], exc, email=user["username"])
equity = data.get("equity", {}) if isinstance(data, dict) else {} equity = data.get("equity", {}) if isinstance(data, dict) else {}
return {"funds": {**equity, "raw": data}} return {"funds": {**equity, "raw": data}}
@ -168,7 +179,7 @@ async def equity_curve(request: Request, from_: str = Query("", alias="from")):
holdings = fetch_holdings(session["api_key"], session["access_token"]) holdings = fetch_holdings(session["api_key"], session["access_token"])
funds_data = fetch_funds(session["api_key"], session["access_token"]) funds_data = fetch_funds(session["api_key"], session["access_token"])
except KiteApiError as exc: except KiteApiError as exc:
_raise_kite_error(user["id"], exc) _raise_kite_error(user["id"], exc, email=user["username"])
try: try:
capture_live_equity_snapshot( capture_live_equity_snapshot(
@ -177,7 +188,7 @@ async def equity_curve(request: Request, from_: str = Query("", alias="from")):
funds_data=funds_data, funds_data=funds_data,
) )
except KiteApiError as exc: except KiteApiError as exc:
_raise_kite_error(user["id"], exc) _raise_kite_error(user["id"], exc, email=user["username"])
now = datetime.utcnow() now = datetime.utcnow()
default_start = (now - timedelta(days=90)).date() default_start = (now - timedelta(days=90)).date()

View File

@ -112,3 +112,39 @@ def test_bootstrap_schema_contains_migrated_core_columns_and_tables():
for snippet in required_snippets: for snippet in required_snippets:
assert snippet in schema_sql assert snippet in schema_sql
def test_insert_engine_event_serializes_nested_datetimes():
from indian_paper_trading_strategy.engine.db import insert_engine_event
class FakeCursor:
def __init__(self):
self.params = None
def execute(self, _sql, params):
self.params = params
cursor = FakeCursor()
event_ts = datetime(2026, 4, 15, 9, 0, tzinfo=timezone.utc)
payload_ts = datetime(2026, 4, 15, 9, 1, tzinfo=timezone.utc)
insert_engine_event(
cursor,
"ORDER_PLACED",
data={
"order_id": "order-1",
"timestamp": payload_ts,
"history": [payload_ts],
},
meta={"checked_at": payload_ts},
ts=event_ts,
user_id="user-1",
run_id="run-1",
)
data_json = cursor.params[4]
meta_json = cursor.params[6]
assert data_json.adapted["timestamp"] == payload_ts.isoformat()
assert data_json.adapted["history"] == [payload_ts.isoformat()]
assert meta_json.adapted["checked_at"] == payload_ts.isoformat()

View File

@ -2,8 +2,10 @@ import os
import threading import threading
import time import time
from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from contextvars import ContextVar from contextvars import ContextVar
from decimal import Decimal
from uuid import UUID
import psycopg2 import psycopg2
from psycopg2 import pool from psycopg2 import pool
@ -146,6 +148,22 @@ def _utc_now():
return datetime.utcnow().replace(tzinfo=timezone.utc) return datetime.utcnow().replace(tzinfo=timezone.utc)
def _json_safe(value):
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, UUID):
return str(value)
if isinstance(value, dict):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value]
return value
def set_context(user_id: str | None, run_id: str | None): def set_context(user_id: str | None, run_id: str | None):
token_user = _USER_ID.set(user_id) token_user = _USER_ID.set(user_id)
token_run = _RUN_ID.set(run_id) token_run = _RUN_ID.set(run_id)
@ -328,9 +346,9 @@ def insert_engine_event(
scope_run, scope_run,
when, when,
event, event,
Json(data) if data is not None else None, Json(_json_safe(data)) if data is not None else None,
message, message,
Json(meta) if meta is not None else None, Json(_json_safe(meta)) if meta is not None else None,
), ),
) )