from datetime import datetime, timedelta from fastapi import APIRouter, HTTPException, Query, Request from fastapi.responses import HTMLResponse from app.broker_store import clear_user_broker from app.services.auth_service import get_user_for_session from app.services.live_equity_service import ( capture_live_equity_snapshot, get_live_equity_curve, ) from app.services.zerodha_service import ( KiteApiError, KiteTokenError, build_login_url, exchange_request_token, fetch_funds, fetch_holdings, normalize_holding, ) from app.services.zerodha_storage import ( clear_session, consume_request_token, get_session, set_session, store_request_token, ) router = APIRouter(prefix="/api/zerodha") public_router = APIRouter() def _require_user(request: Request): session_id = request.cookies.get("session_id") if not session_id: raise HTTPException(status_code=401, detail="Not authenticated") user = get_user_for_session(session_id) if not user: raise HTTPException(status_code=401, detail="Not authenticated") return user def _capture_request_token(request: Request, request_token: str): user = _require_user(request) token = request_token.strip() if not token: raise HTTPException(status_code=400, detail="Missing request_token") store_request_token(user["id"], token) def _clear_broker_session(user_id: str): clear_user_broker(user_id) clear_session(user_id) def _raise_kite_error(user_id: str, exc: KiteApiError): if isinstance(exc, KiteTokenError): _clear_broker_session(user_id) raise HTTPException( status_code=401, detail="Zerodha session expired. Please reconnect." ) from exc raise HTTPException(status_code=502, detail=str(exc)) from exc @router.post("/login-url") async def login_url(payload: dict, request: Request): _require_user(request) api_key = (payload.get("apiKey") or "").strip() if not api_key: raise HTTPException(status_code=400, detail="API key is required") return {"loginUrl": build_login_url(api_key)} @router.post("/session") async def create_session(payload: dict, request: Request): user = _require_user(request) api_key = (payload.get("apiKey") or "").strip() api_secret = (payload.get("apiSecret") or "").strip() request_token = (payload.get("requestToken") or "").strip() if not api_key or not api_secret or not request_token: raise HTTPException( status_code=400, detail="API key, secret, and request token are required" ) try: session_data = exchange_request_token(api_key, api_secret, request_token) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc saved = set_session( user["id"], { "api_key": api_key, "access_token": session_data.get("access_token"), "request_token": session_data.get("request_token", request_token), "user_name": session_data.get("user_name"), "broker_user_id": session_data.get("user_id"), }, ) return { "connected": True, "userName": saved.get("user_name"), "brokerUserId": saved.get("broker_user_id"), "accessToken": saved.get("access_token"), } @router.get("/status") async def status(request: Request): user = _require_user(request) session = get_session(user["id"]) if not session: return {"connected": False} return { "connected": True, "broker": "zerodha", "userName": session.get("user_name"), "linkedAt": session.get("linked_at"), } @router.get("/request-token") async def request_token(request: Request): user = _require_user(request) token = consume_request_token(user["id"]) if not token: raise HTTPException(status_code=404, detail="No request token available.") return {"requestToken": token} @router.get("/holdings") async def holdings(request: Request): user = _require_user(request) session = get_session(user["id"]) if not session: raise HTTPException(status_code=400, detail="Zerodha is not connected") try: data = fetch_holdings(session["api_key"], session["access_token"]) except KiteApiError as exc: _raise_kite_error(user["id"], exc) return {"holdings": [normalize_holding(item) for item in data]} @router.get("/funds") async def funds(request: Request): user = _require_user(request) session = get_session(user["id"]) if not session: raise HTTPException(status_code=400, detail="Zerodha is not connected") try: data = fetch_funds(session["api_key"], session["access_token"]) except KiteApiError as exc: _raise_kite_error(user["id"], exc) equity = data.get("equity", {}) if isinstance(data, dict) else {} return {"funds": {**equity, "raw": data}} @router.get("/equity-curve") async def equity_curve(request: Request, from_: str = Query("", alias="from")): user = _require_user(request) session = get_session(user["id"]) if not session: raise HTTPException(status_code=400, detail="Zerodha is not connected") try: holdings = fetch_holdings(session["api_key"], session["access_token"]) funds_data = fetch_funds(session["api_key"], session["access_token"]) except KiteApiError as exc: _raise_kite_error(user["id"], exc) try: capture_live_equity_snapshot( user["id"], holdings=holdings, funds_data=funds_data, ) except KiteApiError as exc: _raise_kite_error(user["id"], exc) now = datetime.utcnow() default_start = (now - timedelta(days=90)).date() if from_: try: start_date = datetime.fromisoformat(from_).date() except ValueError: start_date = default_start else: start_date = default_start if start_date > now.date(): start_date = now.date() return get_live_equity_curve(user["id"], start_date=start_date) @router.get("/callback") async def callback(request: Request, request_token: str = ""): _capture_request_token(request, request_token) return { "status": "ok", "message": "Request token captured. You can close this tab.", } @router.get("/login") async def login_redirect(request: Request, request_token: str = ""): return await callback(request, request_token=request_token) @public_router.get("/login", response_class=HTMLResponse) async def login_capture(request: Request, request_token: str = ""): _capture_request_token(request, request_token) return ( "" "

Request token captured

" "

You can close this tab and return to QuantFortune.

" "" )