443 lines
15 KiB
Python
443 lines
15 KiB
Python
import os
|
|
import threading
|
|
import time
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from decimal import Decimal
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from app.broker_store import get_user_broker
|
|
from app.services.db import db_connection
|
|
from app.services.groww_service import (
|
|
GrowwApiError,
|
|
fetch_funds as fetch_groww_funds,
|
|
fetch_holdings as fetch_groww_holdings,
|
|
fetch_positions as fetch_groww_positions,
|
|
normalize_holding as normalize_groww_holding,
|
|
normalize_position as normalize_groww_position,
|
|
)
|
|
from app.services.groww_storage import get_session as get_groww_session
|
|
from app.services.zerodha_service import (
|
|
KiteApiError,
|
|
fetch_funds as fetch_zerodha_funds,
|
|
fetch_holdings as fetch_zerodha_holdings,
|
|
fetch_positions as fetch_zerodha_positions,
|
|
holding_effective_quantity,
|
|
holding_last_price,
|
|
normalize_holding as normalize_zerodha_holding,
|
|
normalize_position as normalize_zerodha_position,
|
|
)
|
|
from app.services.zerodha_storage import get_session as get_zerodha_session
|
|
|
|
IST = ZoneInfo("Asia/Calcutta")
|
|
AUTO_SNAPSHOT_AFTER_HOUR = int(os.getenv("LIVE_EQUITY_SNAPSHOT_HOUR", "15"))
|
|
AUTO_SNAPSHOT_AFTER_MINUTE = int(os.getenv("LIVE_EQUITY_SNAPSHOT_MINUTE", "35"))
|
|
AUTO_SNAPSHOT_INTERVAL_SEC = int(os.getenv("LIVE_EQUITY_SNAPSHOT_INTERVAL_SEC", "1800"))
|
|
|
|
_SNAPSHOT_THREAD = None
|
|
_SNAPSHOT_LOCK = threading.Lock()
|
|
_LAST_AUTO_SNAPSHOT_DATE: date | None = None
|
|
|
|
|
|
def _now_utc() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def _now_ist() -> datetime:
|
|
return _now_utc().astimezone(IST)
|
|
|
|
|
|
def _snapshot_day(ts: datetime) -> date:
|
|
return ts.astimezone(IST).date()
|
|
|
|
|
|
def _first_numeric(*values, default: float = 0.0) -> float:
|
|
for value in values:
|
|
try:
|
|
if value is None or value == "":
|
|
continue
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return float(default)
|
|
|
|
|
|
def _extract_cash_value(funds_data: dict | None) -> float:
|
|
equity = funds_data.get("equity", {}) if isinstance(funds_data, dict) else {}
|
|
available = equity.get("available", {}) if isinstance(equity, dict) else {}
|
|
return _first_numeric(
|
|
equity.get("balance") if isinstance(equity, dict) else None,
|
|
equity.get("net") if isinstance(equity, dict) else None,
|
|
equity.get("withdrawable") if isinstance(equity, dict) else None,
|
|
equity.get("cash") if isinstance(equity, dict) else None,
|
|
available.get("live_balance") if isinstance(available, dict) else None,
|
|
available.get("opening_balance") if isinstance(available, dict) else None,
|
|
available.get("cash") if isinstance(available, dict) else None,
|
|
default=0.0,
|
|
)
|
|
|
|
|
|
def _extract_holdings_value(holdings: list[dict] | None) -> float:
|
|
total = 0.0
|
|
for item in holdings or []:
|
|
qty = holding_effective_quantity(item)
|
|
last_price = holding_last_price(item)
|
|
total += qty * last_price
|
|
return total
|
|
|
|
|
|
def _position_signed_quantity(item: dict | None) -> float:
|
|
entry = item or {}
|
|
return _first_numeric(
|
|
entry.get("net_quantity"),
|
|
entry.get("signed_quantity"),
|
|
default=0.0,
|
|
)
|
|
|
|
|
|
def _symbol_key(item: dict | None) -> tuple[str, str]:
|
|
entry = item or {}
|
|
symbol = str(
|
|
entry.get("symbol")
|
|
or entry.get("tradingsymbol")
|
|
or entry.get("trading_symbol")
|
|
or ""
|
|
).strip().upper()
|
|
exchange = str(entry.get("exchange") or "NSE").strip().upper()
|
|
return symbol, exchange
|
|
|
|
|
|
def _extract_positions_adjustment_value(
|
|
holdings: list[dict] | None,
|
|
positions: list[dict] | None,
|
|
) -> float:
|
|
holdings_map: dict[tuple[str, str], dict] = {}
|
|
for item in holdings or []:
|
|
key = _symbol_key(item)
|
|
holdings_map[key] = {
|
|
"settled_qty": _first_numeric(item.get("settled_quantity"), item.get("quantity"), default=0.0),
|
|
"t1_qty": _first_numeric(item.get("t1_quantity"), default=0.0),
|
|
"effective_qty": holding_effective_quantity(item),
|
|
"last_price": holding_last_price(item),
|
|
}
|
|
|
|
adjustment = 0.0
|
|
for item in positions or []:
|
|
signed_qty = _position_signed_quantity(item)
|
|
if signed_qty == 0:
|
|
continue
|
|
key = _symbol_key(item)
|
|
holding_entry = holdings_map.get(
|
|
key,
|
|
{"settled_qty": 0.0, "t1_qty": 0.0, "effective_qty": 0.0, "last_price": holding_last_price(item)},
|
|
)
|
|
effective_qty = float(holding_entry.get("effective_qty") or 0.0)
|
|
t1_qty = float(holding_entry.get("t1_qty") or 0.0)
|
|
last_price = _first_numeric(item.get("last_price"), item.get("close_price"), holding_entry.get("last_price"), default=0.0)
|
|
|
|
positive_qty = max(signed_qty, 0.0)
|
|
covered_by_t1 = min(positive_qty, t1_qty)
|
|
net_adjustment_qty = (positive_qty - covered_by_t1) + min(signed_qty, 0.0)
|
|
net_adjustment_qty = max(net_adjustment_qty, -effective_qty)
|
|
adjustment += net_adjustment_qty * last_price
|
|
return adjustment
|
|
|
|
|
|
def _normalize_groww_funds(data: dict | None) -> dict:
|
|
payload = data if isinstance(data, dict) else {}
|
|
available = payload.get("available") if isinstance(payload.get("available"), dict) else {}
|
|
equity = payload.get("equity") if isinstance(payload.get("equity"), dict) else {}
|
|
equity_available = equity.get("available") if isinstance(equity.get("available"), dict) else {}
|
|
equity_margin = (
|
|
payload.get("equity_margin_details")
|
|
if isinstance(payload.get("equity_margin_details"), dict)
|
|
else {}
|
|
)
|
|
|
|
cash = _first_numeric(
|
|
payload.get("clear_cash"),
|
|
payload.get("cash"),
|
|
payload.get("available_cash"),
|
|
payload.get("available_balance"),
|
|
payload.get("available_margin"),
|
|
available.get("cash"),
|
|
available.get("available_cash"),
|
|
available.get("available_margin"),
|
|
available.get("balance"),
|
|
equity.get("cash"),
|
|
equity.get("available_margin"),
|
|
equity_available.get("cash"),
|
|
equity_available.get("live_balance"),
|
|
equity_margin.get("cnc_balance_available"),
|
|
equity_margin.get("mis_balance_available"),
|
|
)
|
|
net = _first_numeric(
|
|
payload.get("net"),
|
|
payload.get("total"),
|
|
payload.get("margin_available"),
|
|
payload.get("available_margin"),
|
|
payload.get("clear_cash"),
|
|
equity.get("net"),
|
|
cash,
|
|
)
|
|
withdrawable = _first_numeric(
|
|
payload.get("withdrawable"),
|
|
payload.get("available_to_withdraw"),
|
|
available.get("withdrawable"),
|
|
equity_margin.get("cnc_balance_available"),
|
|
payload.get("clear_cash"),
|
|
cash,
|
|
)
|
|
balance = _first_numeric(
|
|
payload.get("balance"),
|
|
payload.get("available_balance"),
|
|
available.get("balance"),
|
|
payload.get("clear_cash"),
|
|
cash,
|
|
)
|
|
|
|
return {
|
|
"net": net,
|
|
"cash": cash,
|
|
"withdrawable": withdrawable,
|
|
"balance": balance,
|
|
"available": {
|
|
"live_balance": cash,
|
|
"cash": cash,
|
|
"opening_balance": balance,
|
|
},
|
|
"raw": payload,
|
|
}
|
|
|
|
|
|
def _upsert_snapshot(
|
|
*,
|
|
user_id: str,
|
|
snapshot_date: date,
|
|
captured_at: datetime,
|
|
cash_value: float,
|
|
holdings_value: float,
|
|
positions_adjustment_value: float,
|
|
):
|
|
total_value = cash_value + holdings_value + positions_adjustment_value
|
|
with db_connection() as conn:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO live_equity_snapshot (
|
|
user_id,
|
|
snapshot_date,
|
|
captured_at,
|
|
cash_value,
|
|
holdings_value,
|
|
positions_adjustment_value,
|
|
total_value
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (user_id, snapshot_date) DO UPDATE
|
|
SET captured_at = EXCLUDED.captured_at,
|
|
cash_value = EXCLUDED.cash_value,
|
|
holdings_value = EXCLUDED.holdings_value,
|
|
positions_adjustment_value = EXCLUDED.positions_adjustment_value,
|
|
total_value = EXCLUDED.total_value
|
|
""",
|
|
(
|
|
user_id,
|
|
snapshot_date,
|
|
captured_at,
|
|
Decimal(str(round(cash_value, 2))),
|
|
Decimal(str(round(holdings_value, 2))),
|
|
Decimal(str(round(positions_adjustment_value, 2))),
|
|
Decimal(str(round(total_value, 2))),
|
|
),
|
|
)
|
|
return {
|
|
"snapshotDate": snapshot_date.isoformat(),
|
|
"capturedAt": captured_at.isoformat(),
|
|
"cashValue": round(cash_value, 2),
|
|
"holdingsValue": round(holdings_value, 2),
|
|
"positionsAdjustmentValue": round(positions_adjustment_value, 2),
|
|
"totalValue": round(total_value, 2),
|
|
}
|
|
|
|
|
|
def capture_live_equity_snapshot(
|
|
user_id: str,
|
|
*,
|
|
holdings: list[dict] | None = None,
|
|
positions: list[dict] | None = None,
|
|
funds_data: dict | None = None,
|
|
captured_at: datetime | None = None,
|
|
):
|
|
broker_state = get_user_broker(user_id) or {}
|
|
broker_name = (broker_state.get("broker") or "").strip().upper()
|
|
|
|
captured_at = captured_at or _now_utc()
|
|
if holdings is None:
|
|
if broker_name == "ZERODHA":
|
|
session = get_zerodha_session(user_id)
|
|
if not session:
|
|
return None
|
|
holdings = [
|
|
normalize_zerodha_holding(item)
|
|
for item in fetch_zerodha_holdings(session["api_key"], session["access_token"])
|
|
]
|
|
positions = [
|
|
normalize_zerodha_position(item)
|
|
for item in fetch_zerodha_positions(session["api_key"], session["access_token"])
|
|
]
|
|
elif broker_name == "GROWW":
|
|
session = get_groww_session(user_id)
|
|
if not session:
|
|
return None
|
|
holdings = [
|
|
normalize_groww_holding(item)
|
|
for item in fetch_groww_holdings(session["access_token"])
|
|
]
|
|
positions = [
|
|
normalize_groww_position(item)
|
|
for item in fetch_groww_positions(session["access_token"])
|
|
]
|
|
else:
|
|
return None
|
|
elif positions is None:
|
|
positions = []
|
|
if funds_data is None:
|
|
if broker_name == "ZERODHA":
|
|
session = get_zerodha_session(user_id)
|
|
if not session:
|
|
return None
|
|
raw_funds = fetch_zerodha_funds(session["api_key"], session["access_token"])
|
|
equity = raw_funds.get("equity", {}) if isinstance(raw_funds, dict) else {}
|
|
funds_data = {**equity, "raw": raw_funds}
|
|
elif broker_name == "GROWW":
|
|
session = get_groww_session(user_id)
|
|
if not session:
|
|
return None
|
|
funds_data = _normalize_groww_funds(fetch_groww_funds(session["access_token"]))
|
|
else:
|
|
return None
|
|
|
|
cash_value = _extract_cash_value(funds_data)
|
|
holdings_value = _extract_holdings_value(holdings)
|
|
positions_adjustment_value = _extract_positions_adjustment_value(holdings, positions)
|
|
return _upsert_snapshot(
|
|
user_id=user_id,
|
|
snapshot_date=_snapshot_day(captured_at),
|
|
captured_at=captured_at,
|
|
cash_value=cash_value,
|
|
holdings_value=holdings_value,
|
|
positions_adjustment_value=positions_adjustment_value,
|
|
)
|
|
|
|
|
|
def get_live_equity_curve(user_id: str, *, start_date: date | None = None):
|
|
if start_date is None:
|
|
start_date = _snapshot_day(_now_utc()) - timedelta(days=90)
|
|
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT snapshot_date, total_value
|
|
FROM live_equity_snapshot
|
|
WHERE user_id = %s
|
|
AND snapshot_date >= %s
|
|
ORDER BY snapshot_date ASC
|
|
""",
|
|
(user_id, start_date),
|
|
)
|
|
rows = cur.fetchall()
|
|
cur.execute(
|
|
"""
|
|
SELECT MIN(snapshot_date)
|
|
FROM live_equity_snapshot
|
|
WHERE user_id = %s
|
|
""",
|
|
(user_id,),
|
|
)
|
|
first_row = cur.fetchone()
|
|
|
|
points = [
|
|
{"date": row[0].isoformat(), "value": round(float(row[1] or 0), 2)}
|
|
for row in rows
|
|
]
|
|
first_snapshot = first_row[0].isoformat() if first_row and first_row[0] else None
|
|
return {
|
|
"startDate": start_date.isoformat(),
|
|
"endDate": _now_utc().isoformat(),
|
|
"exactFrom": first_snapshot,
|
|
"points": points,
|
|
}
|
|
|
|
|
|
def _list_connected_live_brokers() -> list[tuple[str, str]]:
|
|
with db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT user_id, UPPER(COALESCE(broker, ''))
|
|
FROM user_broker
|
|
WHERE connected = TRUE
|
|
AND UPPER(COALESCE(broker, '')) IN ('ZERODHA', 'GROWW')
|
|
"""
|
|
)
|
|
return [(row[0], row[1]) for row in cur.fetchall()]
|
|
|
|
|
|
def _should_auto_snapshot(now_local: datetime) -> bool:
|
|
if now_local.weekday() >= 5:
|
|
return False
|
|
snapshot_cutoff = now_local.replace(
|
|
hour=AUTO_SNAPSHOT_AFTER_HOUR,
|
|
minute=AUTO_SNAPSHOT_AFTER_MINUTE,
|
|
second=0,
|
|
microsecond=0,
|
|
)
|
|
return now_local >= snapshot_cutoff
|
|
|
|
|
|
def _run_auto_snapshot_cycle():
|
|
global _LAST_AUTO_SNAPSHOT_DATE
|
|
now_local = _now_ist()
|
|
today = now_local.date()
|
|
if _LAST_AUTO_SNAPSHOT_DATE == today:
|
|
return
|
|
if not _should_auto_snapshot(now_local):
|
|
return
|
|
|
|
for user_id, _broker_name in _list_connected_live_brokers():
|
|
try:
|
|
capture_live_equity_snapshot(user_id)
|
|
except KiteApiError:
|
|
continue
|
|
except GrowwApiError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
_LAST_AUTO_SNAPSHOT_DATE = today
|
|
|
|
|
|
def _snapshot_loop():
|
|
while True:
|
|
try:
|
|
_run_auto_snapshot_cycle()
|
|
except Exception:
|
|
pass
|
|
time.sleep(max(AUTO_SNAPSHOT_INTERVAL_SEC, 60))
|
|
|
|
|
|
def start_live_equity_snapshot_daemon():
|
|
global _SNAPSHOT_THREAD
|
|
with _SNAPSHOT_LOCK:
|
|
if _SNAPSHOT_THREAD and _SNAPSHOT_THREAD.is_alive():
|
|
return
|
|
thread = threading.Thread(
|
|
target=_snapshot_loop,
|
|
name="live-equity-snapshot-daemon",
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
_SNAPSHOT_THREAD = thread
|