Compare commits

..

2 Commits

Author SHA1 Message Date
Thigazhezhilan J
1b14e7b23e Fix broker session showing connected after Zerodha token expiry
- Set connected=FALSE (was TRUE) when expiring broker session so the
  dashboard correctly reflects disconnected state
- Notify user by email when their Zerodha session expires so they know
  to reconnect before the next SIP execution

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 13:38:45 +05:30
Thigazhezhilan J
247a1c5107 Make engine event payloads JSON safe 2026-04-15 09:54:35 +05:30
4 changed files with 84 additions and 19 deletions

View File

@ -162,7 +162,7 @@ def expire_user_broker_session(user_id: str):
cur.execute(
"""
UPDATE user_broker
SET connected = TRUE,
SET connected = FALSE,
access_token = NULL,
auth_state = 'EXPIRED'
WHERE user_id = %s

View File

@ -5,6 +5,7 @@ from fastapi.responses import HTMLResponse
from app.broker_store import expire_user_broker_session
from app.services.auth_service import get_user_for_session
from app.services.email_service import send_email_async
from app.services.live_equity_service import (
capture_live_equity_snapshot,
get_live_equity_curve,
@ -48,14 +49,24 @@ def _capture_request_token(request: Request, request_token: str):
store_request_token(user["id"], token)
def _clear_broker_session(user_id: str):
def _clear_broker_session(user_id: str, email: str | None = None):
expire_user_broker_session(user_id)
clear_session(user_id)
if email:
try:
body = (
"Your Zerodha session has expired and your broker connection has been disconnected.\n\n"
"Please log in to QuantFortune and reconnect your Zerodha account to resume your strategy.\n\n"
"If your strategy was running, it has been paused until you reconnect."
)
send_email_async(email, "Action required: Zerodha session expired", body)
except Exception:
pass
def _raise_kite_error(user_id: str, exc: KiteApiError):
def _raise_kite_error(user_id: str, exc: KiteApiError, email: str | None = None):
if isinstance(exc, KiteTokenError):
_clear_broker_session(user_id)
_clear_broker_session(user_id, email=email)
raise HTTPException(
status_code=401, detail="Zerodha session expired. Please reconnect."
) from exc
@ -139,7 +150,7 @@ async def holdings(request: Request):
try:
data = fetch_holdings(session["api_key"], session["access_token"])
except KiteApiError as exc:
_raise_kite_error(user["id"], exc)
_raise_kite_error(user["id"], exc, email=user["username"])
return {"holdings": [normalize_holding(item) for item in data]}
@ -152,7 +163,7 @@ async def funds(request: Request):
try:
data = fetch_funds(session["api_key"], session["access_token"])
except KiteApiError as exc:
_raise_kite_error(user["id"], exc)
_raise_kite_error(user["id"], exc, email=user["username"])
equity = data.get("equity", {}) if isinstance(data, dict) else {}
return {"funds": {**equity, "raw": data}}
@ -168,7 +179,7 @@ async def equity_curve(request: Request, from_: str = Query("", alias="from")):
holdings = fetch_holdings(session["api_key"], session["access_token"])
funds_data = fetch_funds(session["api_key"], session["access_token"])
except KiteApiError as exc:
_raise_kite_error(user["id"], exc)
_raise_kite_error(user["id"], exc, email=user["username"])
try:
capture_live_equity_snapshot(
@ -177,7 +188,7 @@ async def equity_curve(request: Request, from_: str = Query("", alias="from")):
funds_data=funds_data,
)
except KiteApiError as exc:
_raise_kite_error(user["id"], exc)
_raise_kite_error(user["id"], exc, email=user["username"])
now = datetime.utcnow()
default_start = (now - timedelta(days=90)).date()

View File

@ -112,3 +112,39 @@ def test_bootstrap_schema_contains_migrated_core_columns_and_tables():
for snippet in required_snippets:
assert snippet in schema_sql
def test_insert_engine_event_serializes_nested_datetimes():
from indian_paper_trading_strategy.engine.db import insert_engine_event
class FakeCursor:
def __init__(self):
self.params = None
def execute(self, _sql, params):
self.params = params
cursor = FakeCursor()
event_ts = datetime(2026, 4, 15, 9, 0, tzinfo=timezone.utc)
payload_ts = datetime(2026, 4, 15, 9, 1, tzinfo=timezone.utc)
insert_engine_event(
cursor,
"ORDER_PLACED",
data={
"order_id": "order-1",
"timestamp": payload_ts,
"history": [payload_ts],
},
meta={"checked_at": payload_ts},
ts=event_ts,
user_id="user-1",
run_id="run-1",
)
data_json = cursor.params[4]
meta_json = cursor.params[6]
assert data_json.adapted["timestamp"] == payload_ts.isoformat()
assert data_json.adapted["history"] == [payload_ts.isoformat()]
assert meta_json.adapted["checked_at"] == payload_ts.isoformat()

View File

@ -2,8 +2,10 @@ import os
import threading
import time
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from datetime import date, datetime, timedelta, timezone
from contextvars import ContextVar
from decimal import Decimal
from uuid import UUID
import psycopg2
from psycopg2 import pool
@ -142,8 +144,24 @@ def db_transaction():
raise
def _utc_now():
return datetime.utcnow().replace(tzinfo=timezone.utc)
def _utc_now():
return datetime.utcnow().replace(tzinfo=timezone.utc)
def _json_safe(value):
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, UUID):
return str(value)
if isinstance(value, dict):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value]
return value
def set_context(user_id: str | None, run_id: str | None):
@ -323,14 +341,14 @@ def insert_engine_event(
INSERT INTO engine_event (user_id, run_id, ts, event, data, message, meta)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
scope_user,
scope_run,
when,
event,
Json(data) if data is not None else None,
message,
Json(meta) if meta is not None else None,
(
scope_user,
scope_run,
when,
event,
Json(_json_safe(data)) if data is not None else None,
message,
Json(_json_safe(meta)) if meta is not None else None,
),
)