Add OAuth completion step after twofa
After twofa verifies identity, re-visit connect/login as authenticated user. Zerodha then redirects to the registered callback URL with request_token. This mirrors what the browser JS does to complete the OAuth flow. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9d1126b84d
commit
bec31b9f9d
@ -258,9 +258,8 @@ def _perform_zerodha_login(
|
|||||||
flush=True,
|
flush=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 4: Extract request_token.
|
# Step 4: Extract request_token from twofa response (some Zerodha versions
|
||||||
# Modern Zerodha (SPA): returns 200 JSON with data.redirect_url.
|
# return redirect_url directly in the JSON body).
|
||||||
# Older behavior: 302 Location header redirect.
|
|
||||||
request_token = None
|
request_token = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -273,18 +272,38 @@ def _perform_zerodha_login(
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Also check twofa Location header
|
||||||
if not request_token:
|
if not request_token:
|
||||||
location = twofa_resp.headers.get("Location", "")
|
location = twofa_resp.headers.get("Location", "")
|
||||||
for _ in range(10):
|
if "request_token" in location:
|
||||||
|
parsed = urlparse(location)
|
||||||
|
params = parse_qs(parsed.query)
|
||||||
|
request_token = params.get("request_token", [None])[0]
|
||||||
|
|
||||||
|
# Step 5: Complete OAuth flow.
|
||||||
|
# In a browser, after twofa succeeds, Zerodha's JavaScript re-visits the
|
||||||
|
# connect/login URL as an authenticated user. Zerodha detects the valid
|
||||||
|
# session and redirects to the registered callback URL with request_token.
|
||||||
|
# We follow the same redirect chain here to capture the token.
|
||||||
|
if not request_token:
|
||||||
|
next_url = f"https://kite.zerodha.com/connect/login?v=3&api_key={api_key}"
|
||||||
|
for attempt in range(10):
|
||||||
|
step_resp = session.get(next_url, timeout=15, allow_redirects=False)
|
||||||
|
location = step_resp.headers.get("Location", "")
|
||||||
|
print(
|
||||||
|
f"[AUTO-LOGIN-DEBUG] oauth_complete[{attempt}] "
|
||||||
|
f"status={step_resp.status_code} "
|
||||||
|
f"location={location[:300] if location else 'NONE'}",
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
if "request_token" in location:
|
if "request_token" in location:
|
||||||
parsed = urlparse(location)
|
parsed = urlparse(location)
|
||||||
params = parse_qs(parsed.query)
|
params = parse_qs(parsed.query)
|
||||||
request_token = params.get("request_token", [None])[0]
|
request_token = params.get("request_token", [None])[0]
|
||||||
break
|
break
|
||||||
if not location or twofa_resp.status_code not in (301, 302, 303, 307, 308):
|
if not location or step_resp.status_code not in (301, 302, 303, 307, 308):
|
||||||
break
|
break
|
||||||
twofa_resp = session.get(location, allow_redirects=False, timeout=15)
|
next_url = location
|
||||||
location = twofa_resp.headers.get("Location", "")
|
|
||||||
|
|
||||||
if not request_token:
|
if not request_token:
|
||||||
raise AutoLoginError(
|
raise AutoLoginError(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user