SIP_GoldBees_Backend/backend/app/services/auto_login_service.py

359 lines
12 KiB
Python

import threading
import time
from datetime import datetime, timedelta, timezone
from urllib.parse import parse_qs, urlparse
import requests
from app.services.crypto_service import decrypt_value, encrypt_value
from app.services.db import db_transaction
from app.services.email_service import send_email_async
from app.services.zerodha_service import exchange_request_token
from app.services.zerodha_storage import set_session
from app.broker_store import expire_user_broker_session
IST = timezone(timedelta(hours=5, minutes=30))
KITE_LOGIN_ENDPOINT = "https://kite.zerodha.com/api/login"
KITE_TWOFA_ENDPOINT = "https://kite.zerodha.com/api/twofa"
class AutoLoginError(Exception):
pass
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def save_auto_login_credentials(
user_id: str,
zerodha_login_id: str,
password: str,
totp_secret: str,
) -> None:
with db_transaction() as cur:
cur.execute(
"""
UPDATE user_broker
SET zerodha_login_id = %s,
zerodha_password = %s,
totp_secret = %s,
auto_login_enabled = TRUE
WHERE user_id = %s
""",
(
zerodha_login_id.strip(),
encrypt_value(password),
encrypt_value(totp_secret.strip().replace(" ", "")),
user_id,
),
)
def get_auto_login_credentials(user_id: str) -> dict | None:
with db_transaction() as cur:
cur.execute(
"""
SELECT zerodha_login_id, zerodha_password, totp_secret,
auto_login_enabled, auto_login_last_at, auto_login_last_err,
api_key, api_secret
FROM user_broker
WHERE user_id = %s
""",
(user_id,),
)
row = cur.fetchone()
if not row:
return None
login_id, enc_password, enc_totp, enabled, last_at, last_err, api_key, enc_api_secret = row
if not enabled or not login_id or not enc_password or not enc_totp:
return None
return {
"zerodha_login_id": login_id,
"password": decrypt_value(enc_password),
"totp_secret": decrypt_value(enc_totp),
"api_key": api_key,
"api_secret": decrypt_value(enc_api_secret) if enc_api_secret else None,
"last_refreshed_at": last_at.isoformat() if last_at else None,
"last_error": last_err,
}
def get_auto_login_status(user_id: str) -> dict:
with db_transaction() as cur:
cur.execute(
"""
SELECT auto_login_enabled, auto_login_last_at, auto_login_last_err,
zerodha_login_id
FROM user_broker
WHERE user_id = %s
""",
(user_id,),
)
row = cur.fetchone()
if not row:
return {"configured": False}
enabled, last_at, last_err, login_id = row
return {
"configured": bool(enabled and login_id),
"last_refreshed_at": last_at.isoformat() if last_at else None,
"last_error": last_err,
}
def delete_auto_login_credentials(user_id: str) -> None:
with db_transaction() as cur:
cur.execute(
"""
UPDATE user_broker
SET zerodha_login_id = NULL,
zerodha_password = NULL,
totp_secret = NULL,
auto_login_enabled = FALSE,
auto_login_last_at = NULL,
auto_login_last_err = NULL
WHERE user_id = %s
""",
(user_id,),
)
def _update_auto_login_result(user_id: str, error: str | None) -> None:
with db_transaction() as cur:
cur.execute(
"""
UPDATE user_broker
SET auto_login_last_at = NOW(),
auto_login_last_err = %s
WHERE user_id = %s
""",
(error, user_id),
)
def _get_all_auto_login_users() -> list[dict]:
with db_transaction() as cur:
cur.execute(
"""
SELECT ub.user_id, au.username,
ub.zerodha_login_id, ub.zerodha_password, ub.totp_secret,
ub.api_key, ub.api_secret
FROM user_broker ub
JOIN app_user au ON au.id = ub.user_id
WHERE ub.auto_login_enabled = TRUE
AND ub.zerodha_login_id IS NOT NULL
AND ub.zerodha_password IS NOT NULL
AND ub.totp_secret IS NOT NULL
"""
)
rows = cur.fetchall()
results = []
for row in rows:
user_id, email, login_id, enc_pw, enc_totp, api_key, enc_secret = row
results.append({
"user_id": user_id,
"email": email,
"zerodha_login_id": login_id,
"password": decrypt_value(enc_pw),
"totp_secret": decrypt_value(enc_totp),
"api_key": api_key,
"api_secret": decrypt_value(enc_secret) if enc_secret else None,
})
return results
# ---------------------------------------------------------------------------
# Core login flow
# ---------------------------------------------------------------------------
def _perform_zerodha_login(
zerodha_login_id: str,
password: str,
totp_secret: str,
api_key: str,
api_secret: str,
) -> dict:
"""Automates Zerodha login and returns session data with access_token."""
session = requests.Session()
session.headers.update({
"X-Kite-Version": "3",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
})
# Step 1: Username + password
login_resp = session.post(
KITE_LOGIN_ENDPOINT,
data={"user_id": zerodha_login_id, "password": password},
timeout=15,
)
try:
login_data = login_resp.json()
except Exception:
raise AutoLoginError(f"Invalid response from Zerodha login: {login_resp.text[:200]}")
if login_data.get("status") != "success":
raise AutoLoginError(f"Zerodha login failed: {login_data.get('message', 'Unknown error')}")
request_id = login_data["data"]["request_id"]
# Step 2: TOTP — don't follow redirect automatically
try:
import pyotp
except ImportError:
raise AutoLoginError("pyotp is not installed on the server. Run: pip install pyotp==2.9.0")
totp_value = pyotp.TOTP(totp_secret).now()
twofa_resp = session.post(
KITE_TWOFA_ENDPOINT,
data={
"user_id": zerodha_login_id,
"request_id": request_id,
"twofa_value": totp_value,
"twofa_type": "totp",
},
timeout=15,
allow_redirects=False,
)
# Step 3: Follow redirects manually to intercept request_token
request_token = None
location = twofa_resp.headers.get("Location", "")
for _ in range(10):
if "request_token" in location:
parsed = urlparse(location)
params = parse_qs(parsed.query)
request_token = params.get("request_token", [None])[0]
break
if not location or twofa_resp.status_code not in (301, 302, 303, 307, 308):
break
twofa_resp = session.get(location, allow_redirects=False, timeout=15)
location = twofa_resp.headers.get("Location", "")
if not request_token:
raise AutoLoginError(
"Could not extract request_token from Zerodha redirect. "
"Check TOTP secret and credentials."
)
# Step 4: Exchange request_token for access_token using existing service
session_data = exchange_request_token(api_key, api_secret, request_token)
return {
"api_key": api_key,
"access_token": session_data.get("access_token"),
"request_token": request_token,
"user_name": session_data.get("user_name"),
"broker_user_id": session_data.get("user_id"),
}
def _reconnect_broker_after_auto_login(user_id: str) -> None:
"""Marks broker as connected after successful auto-login."""
with db_transaction() as cur:
cur.execute(
"""
UPDATE user_broker
SET connected = TRUE,
auth_state = 'CONNECTED'
WHERE user_id = %s
""",
(user_id,),
)
# ---------------------------------------------------------------------------
# Public: execute for one user
# ---------------------------------------------------------------------------
def execute_auto_login(user_id: str, email: str | None = None) -> dict:
"""Run auto-login for a single user. Returns result dict."""
creds = get_auto_login_credentials(user_id)
if not creds:
return {"success": False, "error": "Auto-login not configured"}
try:
session_data = _perform_zerodha_login(
zerodha_login_id=creds["zerodha_login_id"],
password=creds["password"],
totp_secret=creds["totp_secret"],
api_key=creds["api_key"],
api_secret=creds["api_secret"],
)
set_session(user_id, session_data)
_reconnect_broker_after_auto_login(user_id)
_update_auto_login_result(user_id, error=None)
if email:
send_email_async(
email,
"Zerodha session refreshed automatically",
(
"Your Zerodha session has been refreshed automatically by QuantFortune.\n\n"
"Your strategy will continue running without any interruption.\n\n"
f"Refreshed at: {datetime.now(IST).strftime('%d %b %Y, %I:%M %p IST')}"
),
)
print(f"[AUTO-LOGIN] Successfully refreshed session for user {user_id}", flush=True)
return {"success": True}
except Exception as exc:
err_msg = str(exc)
_update_auto_login_result(user_id, error=err_msg)
if email:
send_email_async(
email,
"Action required: Zerodha auto-login failed",
(
f"QuantFortune could not automatically refresh your Zerodha session.\n\n"
f"Error: {err_msg}\n\n"
"Please log in to QuantFortune and reconnect your Zerodha account manually.\n\n"
"If your strategy was running, it has been paused until you reconnect."
),
)
print(f"[AUTO-LOGIN] Failed for user {user_id}: {exc}", flush=True)
return {"success": False, "error": err_msg}
# ---------------------------------------------------------------------------
# Refresh all users (called by daily scheduler)
# ---------------------------------------------------------------------------
def refresh_all_auto_login_sessions() -> None:
users = _get_all_auto_login_users()
print(f"[AUTO-LOGIN] Starting daily refresh for {len(users)} user(s)", flush=True)
for user in users:
execute_auto_login(user_id=user["user_id"], email=user["email"])
# ---------------------------------------------------------------------------
# Daily scheduler — runs at 6:05 AM IST every day
# ---------------------------------------------------------------------------
def _next_refresh_time() -> datetime:
now = datetime.now(IST)
target = now.replace(hour=6, minute=5, second=0, microsecond=0)
if now >= target:
target += timedelta(days=1)
return target
def _scheduler_loop() -> None:
while True:
next_run = _next_refresh_time()
sleep_seconds = (_next_refresh_time() - datetime.now(IST)).total_seconds()
print(
f"[AUTO-LOGIN] Next scheduled refresh at "
f"{next_run.strftime('%d %b %Y %I:%M %p IST')}, "
f"sleeping {sleep_seconds:.0f}s",
flush=True,
)
time.sleep(sleep_seconds)
try:
refresh_all_auto_login_sessions()
except Exception as exc:
print(f"[AUTO-LOGIN] Scheduler error: {exc}", flush=True)
def start_auto_login_scheduler() -> None:
thread = threading.Thread(target=_scheduler_loop, daemon=True, name="auto-login-scheduler")
thread.start()
print("[AUTO-LOGIN] Daily scheduler started (fires at 6:05 AM IST)", flush=True)