import threading import time from datetime import datetime, timedelta, timezone from urllib.parse import parse_qs, urlparse import requests from app.services.crypto_service import decrypt_value, encrypt_value from app.services.db import db_transaction from app.services.email_service import send_email_async from app.services.zerodha_service import exchange_request_token from app.services.zerodha_storage import set_session from app.broker_store import expire_user_broker_session IST = timezone(timedelta(hours=5, minutes=30)) KITE_LOGIN_ENDPOINT = "https://kite.zerodha.com/api/login" KITE_TWOFA_ENDPOINT = "https://kite.zerodha.com/api/twofa" class AutoLoginError(Exception): pass # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def save_auto_login_credentials( user_id: str, zerodha_login_id: str, password: str, totp_secret: str, ) -> None: with db_transaction() as cur: cur.execute( """ UPDATE user_broker SET zerodha_login_id = %s, zerodha_password = %s, totp_secret = %s, auto_login_enabled = TRUE WHERE user_id = %s """, ( zerodha_login_id.strip(), encrypt_value(password), encrypt_value(totp_secret.strip().replace(" ", "")), user_id, ), ) def get_auto_login_credentials(user_id: str) -> dict | None: with db_transaction() as cur: cur.execute( """ SELECT zerodha_login_id, zerodha_password, totp_secret, auto_login_enabled, auto_login_last_at, auto_login_last_err, api_key, api_secret FROM user_broker WHERE user_id = %s """, (user_id,), ) row = cur.fetchone() if not row: return None login_id, enc_password, enc_totp, enabled, last_at, last_err, api_key, enc_api_secret = row if not enabled or not login_id or not enc_password or not enc_totp: return None return { "zerodha_login_id": login_id, "password": decrypt_value(enc_password), "totp_secret": decrypt_value(enc_totp), "api_key": api_key, "api_secret": decrypt_value(enc_api_secret) if enc_api_secret else None, "last_refreshed_at": last_at.isoformat() if last_at else None, "last_error": last_err, } def get_auto_login_status(user_id: str) -> dict: with db_transaction() as cur: cur.execute( """ SELECT auto_login_enabled, auto_login_last_at, auto_login_last_err, zerodha_login_id FROM user_broker WHERE user_id = %s """, (user_id,), ) row = cur.fetchone() if not row: return {"configured": False} enabled, last_at, last_err, login_id = row return { "configured": bool(enabled and login_id), "last_refreshed_at": last_at.isoformat() if last_at else None, "last_error": last_err, } def delete_auto_login_credentials(user_id: str) -> None: with db_transaction() as cur: cur.execute( """ UPDATE user_broker SET zerodha_login_id = NULL, zerodha_password = NULL, totp_secret = NULL, auto_login_enabled = FALSE, auto_login_last_at = NULL, auto_login_last_err = NULL WHERE user_id = %s """, (user_id,), ) def _update_auto_login_result(user_id: str, error: str | None) -> None: with db_transaction() as cur: cur.execute( """ UPDATE user_broker SET auto_login_last_at = NOW(), auto_login_last_err = %s WHERE user_id = %s """, (error, user_id), ) def _get_all_auto_login_users() -> list[dict]: with db_transaction() as cur: cur.execute( """ SELECT ub.user_id, au.username, ub.zerodha_login_id, ub.zerodha_password, ub.totp_secret, ub.api_key, ub.api_secret FROM user_broker ub JOIN app_user au ON au.id = ub.user_id WHERE ub.auto_login_enabled = TRUE AND ub.zerodha_login_id IS NOT NULL AND ub.zerodha_password IS NOT NULL AND ub.totp_secret IS NOT NULL """ ) rows = cur.fetchall() results = [] for row in rows: user_id, email, login_id, enc_pw, enc_totp, api_key, enc_secret = row results.append({ "user_id": user_id, "email": email, "zerodha_login_id": login_id, "password": decrypt_value(enc_pw), "totp_secret": decrypt_value(enc_totp), "api_key": api_key, "api_secret": decrypt_value(enc_secret) if enc_secret else None, }) return results # --------------------------------------------------------------------------- # Core login flow # --------------------------------------------------------------------------- def _perform_zerodha_login( zerodha_login_id: str, password: str, totp_secret: str, api_key: str, api_secret: str, ) -> dict: """Automates Zerodha login and returns session data with access_token.""" session = requests.Session() session.headers.update({ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "Origin": "https://kite.zerodha.com", }) # Step 1: Initialize OAuth session and capture sess_id. # sess_id is the OAuth session identifier that must be passed to twofa. # Without it, Zerodha cannot link the twofa request to the OAuth app and # returns profile:{} instead of redirect_url with request_token. connect_resp = session.get( f"https://kite.zerodha.com/connect/login?v=3&api_key={api_key}", timeout=15, allow_redirects=False, ) # Extract sess_id from the Location header redirect URL connect_location = connect_resp.headers.get("Location", "") sess_id = None if "sess_id=" in connect_location: sess_id = parse_qs(urlparse(connect_location).query).get("sess_id", [None])[0] # Follow the redirect so the server registers the sess_id against kf_session if connect_location: session.get(connect_location, timeout=15, allow_redirects=True) # Step 2: Username + password login_resp = session.post( KITE_LOGIN_ENDPOINT, data={"user_id": zerodha_login_id, "password": password}, timeout=15, ) try: login_data = login_resp.json() except Exception: raise AutoLoginError(f"Invalid response from Zerodha login: {login_resp.text[:200]}") if login_data.get("status") != "success": raise AutoLoginError(f"Zerodha login failed: {login_data.get('message', 'Unknown error')}") request_id = login_data["data"]["request_id"] # Step 3: TOTP — include sess_id so Zerodha links this to the OAuth app try: import pyotp except ImportError: raise AutoLoginError("pyotp is not installed on the server. Run: pip install pyotp==2.9.0") totp_value = pyotp.TOTP(totp_secret).now() twofa_data = { "user_id": zerodha_login_id, "request_id": request_id, "twofa_value": totp_value, "twofa_type": "totp", } if sess_id: twofa_data["sess_id"] = sess_id twofa_resp = session.post( KITE_TWOFA_ENDPOINT, data=twofa_data, timeout=15, allow_redirects=False, ) # Step 4: Extract request_token from twofa response (some Zerodha versions # return redirect_url directly in the JSON body). request_token = None try: twofa_json = twofa_resp.json() redirect_url_body = twofa_json.get("data", {}).get("redirect_url", "") if redirect_url_body and "request_token" in redirect_url_body: parsed = urlparse(redirect_url_body) params = parse_qs(parsed.query) request_token = params.get("request_token", [None])[0] except Exception: pass # Also check twofa Location header if not request_token: location = twofa_resp.headers.get("Location", "") if "request_token" in location: parsed = urlparse(location) params = parse_qs(parsed.query) request_token = params.get("request_token", [None])[0] # Step 5: Complete OAuth flow. # In a browser, after twofa succeeds, Zerodha's JavaScript re-visits the # connect/login URL as an authenticated user. Zerodha detects the valid # session and redirects to the registered callback URL with request_token. # We follow the same redirect chain here to capture the token. if not request_token: next_url = f"https://kite.zerodha.com/connect/login?v=3&api_key={api_key}" for attempt in range(10): step_resp = session.get(next_url, timeout=15, allow_redirects=False) location = step_resp.headers.get("Location", "") if "request_token" in location: parsed = urlparse(location) params = parse_qs(parsed.query) request_token = params.get("request_token", [None])[0] break if not location or step_resp.status_code not in (301, 302, 303, 307, 308): break next_url = location if not request_token: raise AutoLoginError( "Could not extract request_token from Zerodha redirect. " "Check TOTP secret and credentials." ) # Step 5: Exchange request_token for access_token using existing service session_data = exchange_request_token(api_key, api_secret, request_token) return { "api_key": api_key, "access_token": session_data.get("access_token"), "request_token": request_token, "user_name": session_data.get("user_name"), "broker_user_id": session_data.get("user_id"), } def _reconnect_broker_after_auto_login(user_id: str) -> None: """Marks broker as connected after successful auto-login.""" with db_transaction() as cur: cur.execute( """ UPDATE user_broker SET connected = TRUE, auth_state = 'VALID' WHERE user_id = %s """, (user_id,), ) # --------------------------------------------------------------------------- # Public: execute for one user # --------------------------------------------------------------------------- def execute_auto_login(user_id: str, email: str | None = None) -> dict: """Run auto-login for a single user. Returns result dict.""" creds = get_auto_login_credentials(user_id) if not creds: return {"success": False, "error": "Auto-login not configured"} try: session_data = _perform_zerodha_login( zerodha_login_id=creds["zerodha_login_id"], password=creds["password"], totp_secret=creds["totp_secret"], api_key=creds["api_key"], api_secret=creds["api_secret"], ) set_session(user_id, session_data) _reconnect_broker_after_auto_login(user_id) _update_auto_login_result(user_id, error=None) if email: send_email_async( email, "Zerodha session refreshed automatically", ( "Your Zerodha session has been refreshed automatically by QuantFortune.\n\n" "Your strategy will continue running without any interruption.\n\n" f"Refreshed at: {datetime.now(IST).strftime('%d %b %Y, %I:%M %p IST')}" ), ) print(f"[AUTO-LOGIN] Successfully refreshed session for user {user_id}", flush=True) return {"success": True} except Exception as exc: err_msg = str(exc) _update_auto_login_result(user_id, error=err_msg) if email: send_email_async( email, "Action required: Zerodha auto-login failed", ( f"QuantFortune could not automatically refresh your Zerodha session.\n\n" f"Error: {err_msg}\n\n" "Please log in to QuantFortune and reconnect your Zerodha account manually.\n\n" "If your strategy was running, it has been paused until you reconnect." ), ) print(f"[AUTO-LOGIN] Failed for user {user_id}: {exc}", flush=True) return {"success": False, "error": err_msg} # --------------------------------------------------------------------------- # Refresh all users (called by daily scheduler) # --------------------------------------------------------------------------- def refresh_all_auto_login_sessions() -> None: users = _get_all_auto_login_users() print(f"[AUTO-LOGIN] Starting daily refresh for {len(users)} user(s)", flush=True) for user in users: execute_auto_login(user_id=user["user_id"], email=user["email"]) def refresh_expired_auto_login_sessions() -> None: """Run on startup: re-login any users whose broker session is expired/disconnected.""" from app.broker_store import get_user_broker users = _get_all_auto_login_users() if not users: return print(f"[AUTO-LOGIN] Startup check: {len(users)} auto-login user(s)", flush=True) for user in users: entry = get_user_broker(user["user_id"]) or {} auth_state = str(entry.get("auth_state") or "").strip().upper() connected = bool(entry.get("connected")) if not connected or auth_state in {"EXPIRED", "DISCONNECTED", "PENDING", ""}: print(f"[AUTO-LOGIN] Startup: refreshing expired session for {user['user_id']}", flush=True) execute_auto_login(user_id=user["user_id"], email=user["email"]) # --------------------------------------------------------------------------- # Daily scheduler — runs at 6:05 AM IST every day # --------------------------------------------------------------------------- def _next_refresh_time() -> datetime: now = datetime.now(IST) target = now.replace(hour=6, minute=5, second=0, microsecond=0) if now >= target: target += timedelta(days=1) return target def _scheduler_loop() -> None: while True: next_run = _next_refresh_time() sleep_seconds = (_next_refresh_time() - datetime.now(IST)).total_seconds() print( f"[AUTO-LOGIN] Next scheduled refresh at " f"{next_run.strftime('%d %b %Y %I:%M %p IST')}, " f"sleeping {sleep_seconds:.0f}s", flush=True, ) time.sleep(sleep_seconds) try: refresh_all_auto_login_sessions() except Exception as exc: print(f"[AUTO-LOGIN] Scheduler error: {exc}", flush=True) def start_auto_login_scheduler() -> None: thread = threading.Thread(target=_scheduler_loop, daemon=True, name="auto-login-scheduler") thread.start() print("[AUTO-LOGIN] Daily scheduler started (fires at 6:05 AM IST)", flush=True)