2026-03-24 21:59:17 +05:30

208 lines
6.5 KiB
Python

import hashlib
import json
import os
import urllib.error
import urllib.parse
import urllib.request
KITE_API_BASE = os.getenv("KITE_API_BASE", "https://api.kite.trade")
KITE_LOGIN_URL = os.getenv("KITE_LOGIN_URL", "https://kite.trade/connect/login")
KITE_VERSION = "3"
class KiteApiError(Exception):
def __init__(self, status_code: int, error_type: str, message: str):
super().__init__(f"Kite API error {status_code}: {error_type} - {message}")
self.status_code = status_code
self.error_type = error_type
self.message = message
class KiteTokenError(KiteApiError):
pass
class KitePermissionError(KiteApiError):
pass
def build_login_url(api_key: str, redirect_url: str | None = None) -> str:
params = {"api_key": api_key, "v": KITE_VERSION}
redirect_url = (redirect_url or os.getenv("ZERODHA_REDIRECT_URL") or "").strip()
if redirect_url:
params["redirect_url"] = redirect_url
query = urllib.parse.urlencode(params)
return f"{KITE_LOGIN_URL}?{query}"
def _request(method: str, url: str, data: dict | None = None, headers: dict | None = None):
payload = None
if data is not None:
payload = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(url, data=payload, headers=headers or {}, method=method)
try:
with urllib.request.urlopen(req, timeout=20) as resp:
body = resp.read().decode("utf-8")
except urllib.error.HTTPError as err:
error_body = err.read().decode("utf-8") if err.fp else ""
try:
payload = json.loads(error_body) if error_body else {}
except json.JSONDecodeError:
payload = {}
error_type = payload.get("error_type") or payload.get("status") or "unknown_error"
message = payload.get("message") or error_body or err.reason
if error_type == "TokenException":
exc_cls = KiteTokenError
elif error_type == "PermissionException":
exc_cls = KitePermissionError
else:
exc_cls = KiteApiError
raise exc_cls(err.code, error_type, message) from err
return json.loads(body)
def _auth_headers(api_key: str, access_token: str) -> dict:
return {
"X-Kite-Version": KITE_VERSION,
"Authorization": f"token {api_key}:{access_token}",
}
def exchange_request_token(api_key: str, api_secret: str, request_token: str) -> dict:
checksum = hashlib.sha256(
f"{api_key}{request_token}{api_secret}".encode("utf-8")
).hexdigest()
url = f"{KITE_API_BASE}/session/token"
response = _request(
"POST",
url,
data={
"api_key": api_key,
"request_token": request_token,
"checksum": checksum,
},
)
return response.get("data", {})
def fetch_holdings(api_key: str, access_token: str) -> list:
url = f"{KITE_API_BASE}/portfolio/holdings"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", [])
def fetch_funds(api_key: str, access_token: str) -> dict:
url = f"{KITE_API_BASE}/user/margins"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", {})
def fetch_ltp_quotes(api_key: str, access_token: str, instruments: list[str]) -> dict:
symbols = [str(item).strip() for item in instruments if str(item).strip()]
if not symbols:
return {}
query = urllib.parse.urlencode([("i", symbol) for symbol in symbols])
url = f"{KITE_API_BASE}/quote/ltp?{query}"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", {})
def fetch_ohlc_quotes(api_key: str, access_token: str, instruments: list[str]) -> dict:
symbols = [str(item).strip() for item in instruments if str(item).strip()]
if not symbols:
return {}
query = urllib.parse.urlencode([("i", symbol) for symbol in symbols])
url = f"{KITE_API_BASE}/quote/ohlc?{query}"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", {})
def fetch_historical_candles(
api_key: str,
access_token: str,
instrument_token: int | str,
interval: str,
*,
from_dt,
to_dt,
continuous: bool = False,
oi: bool = False,
) -> list:
params = {
"from": from_dt.strftime("%Y-%m-%d %H:%M:%S"),
"to": to_dt.strftime("%Y-%m-%d %H:%M:%S"),
"continuous": 1 if continuous else 0,
"oi": 1 if oi else 0,
}
query = urllib.parse.urlencode(params)
url = f"{KITE_API_BASE}/instruments/historical/{instrument_token}/{interval}?{query}"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", {}).get("candles", [])
def place_order(
api_key: str,
access_token: str,
*,
tradingsymbol: str,
exchange: str,
transaction_type: str,
order_type: str,
quantity: int,
product: str,
price: float | None = None,
validity: str = "DAY",
variety: str = "regular",
market_protection: int | None = None,
tag: str | None = None,
) -> dict:
payload = {
"tradingsymbol": tradingsymbol,
"exchange": exchange,
"transaction_type": transaction_type,
"order_type": order_type,
"quantity": int(quantity),
"product": product,
"validity": validity,
}
if price is not None:
payload["price"] = price
if market_protection is not None:
payload["market_protection"] = market_protection
if tag:
payload["tag"] = tag
url = f"{KITE_API_BASE}/orders/{variety}"
response = _request(
"POST",
url,
data=payload,
headers=_auth_headers(api_key, access_token),
)
return response.get("data", {})
def fetch_orders(api_key: str, access_token: str) -> list:
url = f"{KITE_API_BASE}/orders"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", [])
def fetch_order_history(api_key: str, access_token: str, order_id: str) -> list:
url = f"{KITE_API_BASE}/orders/{order_id}"
response = _request("GET", url, headers=_auth_headers(api_key, access_token))
return response.get("data", [])
def cancel_order(
api_key: str,
access_token: str,
*,
order_id: str,
variety: str = "regular",
) -> dict:
url = f"{KITE_API_BASE}/orders/{variety}/{order_id}"
response = _request("DELETE", url, headers=_auth_headers(api_key, access_token))
return response.get("data", {})