# engine/execution.py import os from datetime import datetime, timedelta, timezone from indian_paper_trading_strategy.engine.state import load_state, save_state from indian_paper_trading_strategy.engine.broker import Broker, BrokerAuthExpired from indian_paper_trading_strategy.engine.ledger import claim_execution_window, log_event, event_exists from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry, get_context from indian_paper_trading_strategy.engine.market import market_now from indian_paper_trading_strategy.engine.time_utils import compute_logical_time LOCAL_ORDER_PENDING = "PENDING" LOCAL_ORDER_PARTIAL = "PARTIAL" LOCAL_ORDER_FILLED = "FILLED" LOCAL_ORDER_REJECTED = "REJECTED" LOCAL_ORDER_CANCELLED = "CANCELLED" LOCAL_ORDER_UNKNOWN = "UNKNOWN" _RECONCILEABLE_ORDER_STATES = { LOCAL_ORDER_PENDING, LOCAL_ORDER_PARTIAL, LOCAL_ORDER_UNKNOWN, } RECONCILIATION_INTERVAL = timedelta( seconds=int(os.getenv("ORDER_RECONCILIATION_INTERVAL_SEC", "15")) ) RECONCILIATION_TIMEOUT = timedelta( seconds=int(os.getenv("ORDER_RECONCILIATION_TIMEOUT_SEC", "300")) ) def _as_float(value): if hasattr(value, "item"): try: return float(value.item()) except Exception: pass if hasattr(value, "iloc"): try: return float(value.iloc[-1]) except Exception: pass return float(value) def _local_tz(): return market_now().tzinfo def _normalize_now(now): if now.tzinfo is None: return now.replace(tzinfo=_local_tz()) return now def _resolve_timing(state, now_ts, sip_interval): force_execute = state.get("last_sip_ts") is None last = state.get("last_sip_ts") or state.get("last_run") if last and not force_execute: try: last_dt = datetime.fromisoformat(last) except ValueError: last_dt = None if last_dt: if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=_local_tz()) if now_ts.tzinfo and last_dt.tzinfo and last_dt.tzinfo != now_ts.tzinfo: last_dt = last_dt.astimezone(now_ts.tzinfo) if last_dt and (now_ts - last_dt).total_seconds() < sip_interval: return False, last, None logical_time = compute_logical_time(now_ts, last, sip_interval) return True, last, logical_time def _order_fill(order): if not isinstance(order, dict): return 0.0, 0.0 filled_qty = float(order.get("filled_qty") or 0.0) average_price = float(order.get("average_price") or order.get("price") or 0.0) return filled_qty, average_price def _apply_filled_orders_to_state(state, orders): nifty_filled = 0.0 gold_filled = 0.0 total_spent = 0.0 for order in orders: filled_qty, average_price = _order_fill(order) if filled_qty <= 0: continue symbol = (order.get("symbol") or "").upper() if symbol.startswith("NIFTYBEES"): nifty_filled += filled_qty elif symbol.startswith("GOLDBEES"): gold_filled += filled_qty total_spent += filled_qty * average_price if nifty_filled: state["nifty_units"] += nifty_filled if gold_filled: state["gold_units"] += gold_filled if total_spent: state["total_invested"] += total_spent return { "nifty_units": nifty_filled, "gold_units": gold_filled, "amount": total_spent, } def _utc_now(): return datetime.now(timezone.utc) def _current_scope(): return get_context() def _order_status_details(order: dict) -> dict: payload = dict(order or {}) requested_qty = float(payload.get("requested_qty") or payload.get("qty") or 0.0) filled_qty = max(float(payload.get("filled_qty") or 0.0), 0.0) requested_price = float(payload.get("requested_price") or payload.get("price") or 0.0) average_price = float(payload.get("average_price") or requested_price or 0.0) raw_status = str(payload.get("status") or "").strip().upper() if requested_qty > 0 and filled_qty >= requested_qty: local_status = LOCAL_ORDER_FILLED needs_reconciliation = False elif filled_qty > 0: local_status = LOCAL_ORDER_PARTIAL needs_reconciliation = raw_status not in {LOCAL_ORDER_FILLED, LOCAL_ORDER_REJECTED, LOCAL_ORDER_CANCELLED} elif raw_status == LOCAL_ORDER_REJECTED: local_status = LOCAL_ORDER_REJECTED needs_reconciliation = False elif raw_status == LOCAL_ORDER_CANCELLED: local_status = LOCAL_ORDER_CANCELLED needs_reconciliation = False elif raw_status == LOCAL_ORDER_PENDING: local_status = LOCAL_ORDER_PENDING needs_reconciliation = True else: local_status = LOCAL_ORDER_UNKNOWN needs_reconciliation = True return { "local_order_id": str(payload.get("id") or payload.get("broker_order_id") or ""), "broker_order_id": str(payload.get("broker_order_id") or payload.get("id") or "") or None, "symbol": str(payload.get("symbol") or ""), "side": str(payload.get("side") or "").upper(), "requested_qty": requested_qty, "filled_qty": filled_qty, "requested_price": requested_price, "average_price": average_price, "status": local_status, "broker_status": raw_status or None, "status_message": payload.get("status_message"), "needs_reconciliation": needs_reconciliation, "logical_time": payload.get("logical_time"), } def _upsert_broker_order_state(cur, *, broker_name: str, logical_time, order: dict, checked_at: datetime): user_id, run_id = _current_scope() details = _order_status_details(order) if not details["local_order_id"]: raise ValueError("Broker order state requires a stable order id") cur.execute( """ SELECT status, filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at FROM broker_order_state WHERE local_order_id = %s FOR UPDATE """, (details["local_order_id"],), ) row = cur.fetchone() previous = { "status": row[0], "filled_qty": float(row[1] or 0.0), "accounted_fill_qty": float(row[2] or 0.0), "needs_reconciliation": bool(row[3]), "last_checked_at": row[4], } if row else { "status": None, "filled_qty": 0.0, "accounted_fill_qty": 0.0, "needs_reconciliation": False, "last_checked_at": None, } logical_ts = logical_time or checked_at cur.execute( """ INSERT INTO broker_order_state ( local_order_id, user_id, run_id, logical_time, broker, symbol, side, broker_order_id, requested_qty, filled_qty, accounted_fill_qty, requested_price, average_price, status, broker_status, status_message, needs_reconciliation, last_checked_at, created_at, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (local_order_id) DO UPDATE SET broker_order_id = EXCLUDED.broker_order_id, requested_qty = EXCLUDED.requested_qty, filled_qty = EXCLUDED.filled_qty, requested_price = EXCLUDED.requested_price, average_price = EXCLUDED.average_price, status = EXCLUDED.status, broker_status = EXCLUDED.broker_status, status_message = EXCLUDED.status_message, needs_reconciliation = EXCLUDED.needs_reconciliation, last_checked_at = EXCLUDED.last_checked_at, updated_at = EXCLUDED.updated_at """, ( details["local_order_id"], user_id, run_id, logical_ts, broker_name, details["symbol"], details["side"], details["broker_order_id"], details["requested_qty"], details["filled_qty"], previous["accounted_fill_qty"], details["requested_price"], details["average_price"], details["status"], details["broker_status"], details["status_message"], details["needs_reconciliation"], checked_at, checked_at, checked_at, ), ) details["accounted_fill_qty"] = previous["accounted_fill_qty"] details["delta_fill_qty"] = max(details["filled_qty"] - previous["accounted_fill_qty"], 0.0) details["previous_status"] = previous["status"] details["previous_filled_qty"] = previous["filled_qty"] details["previous_needs_reconciliation"] = previous["needs_reconciliation"] return details def _mark_accounted_fill(cur, local_order_id: str, accounted_fill_qty: float, checked_at: datetime): cur.execute( """ UPDATE broker_order_state SET accounted_fill_qty = %s, updated_at = %s WHERE local_order_id = %s """, (accounted_fill_qty, checked_at, local_order_id), ) def _apply_fill_delta_to_state(state: dict, order_details: dict): delta_qty = max(float(order_details.get("delta_fill_qty") or 0.0), 0.0) average_price = float(order_details.get("average_price") or order_details.get("requested_price") or 0.0) symbol = str(order_details.get("symbol") or "").upper() delta_amount = delta_qty * average_price if delta_qty <= 0: return {"nifty_units": 0.0, "gold_units": 0.0, "amount": 0.0} nifty_units = 0.0 gold_units = 0.0 if symbol.startswith("NIFTYBEES"): state["nifty_units"] += delta_qty nifty_units = delta_qty elif symbol.startswith("GOLDBEES"): state["gold_units"] += delta_qty gold_units = delta_qty state["total_invested"] += delta_amount return {"nifty_units": nifty_units, "gold_units": gold_units, "amount": delta_amount} def _load_cycle_order_rows(cur, logical_time): user_id, run_id = _current_scope() cur.execute( """ SELECT local_order_id, symbol, side, requested_qty, filled_qty, accounted_fill_qty, requested_price, average_price, status, broker_status, status_message, needs_reconciliation, broker_order_id FROM broker_order_state WHERE user_id = %s AND run_id = %s AND logical_time = %s ORDER BY local_order_id """, (user_id, run_id, logical_time), ) rows = [] for row in cur.fetchall(): rows.append( { "local_order_id": row[0], "symbol": row[1], "side": row[2], "requested_qty": float(row[3] or 0.0), "filled_qty": float(row[4] or 0.0), "accounted_fill_qty": float(row[5] or 0.0), "requested_price": float(row[6] or 0.0), "average_price": float(row[7] or 0.0), "status": row[8], "broker_status": row[9], "status_message": row[10], "needs_reconciliation": bool(row[11]), "broker_order_id": row[12], } ) return rows def _aggregate_cycle_orders(orders: list[dict]): summary = { "nifty_units": 0.0, "gold_units": 0.0, "nifty_price": 0.0, "gold_price": 0.0, "amount": 0.0, "has_any_fill": False, "all_filled": bool(orders), "has_unresolved": False, } for order in orders: symbol = str(order.get("symbol") or "").upper() filled_qty = float(order.get("filled_qty") or 0.0) requested_qty = float(order.get("requested_qty") or 0.0) avg_price = float(order.get("average_price") or order.get("requested_price") or 0.0) status = str(order.get("status") or "") needs_reconciliation = bool(order.get("needs_reconciliation")) if status != LOCAL_ORDER_FILLED: summary["all_filled"] = False if needs_reconciliation or status in _RECONCILEABLE_ORDER_STATES: summary["has_unresolved"] = True if filled_qty <= 0: continue summary["has_any_fill"] = True summary["amount"] += filled_qty * avg_price if symbol.startswith("NIFTYBEES"): summary["nifty_units"] += filled_qty summary["nifty_price"] = avg_price or summary["nifty_price"] elif symbol.startswith("GOLDBEES"): summary["gold_units"] += filled_qty summary["gold_price"] = avg_price or summary["gold_price"] if requested_qty > 0 and filled_qty < requested_qty: summary["all_filled"] = False return summary def _record_reconciliation_event(cur, event: str, *, logical_time, payload: dict, ts: datetime): if event_exists(event, logical_time, cur=cur): return if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}: log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time) return insert_engine_event(cur, event, data=payload, ts=ts) def _advance_cycle_state(cur, *, state: dict, logical_time, cycle_orders: list[dict], event_ts: datetime, failure_reason: str | None = None): cycle_summary = _aggregate_cycle_orders(cycle_orders) if cycle_orders or failure_reason: state["last_run"] = event_ts.isoformat() if cycle_summary["all_filled"]: state["last_sip_ts"] = event_ts.isoformat() _record_reconciliation_event( cur, "SIP_EXECUTED", logical_time=logical_time, payload={ "nifty_units": cycle_summary["nifty_units"], "gold_units": cycle_summary["gold_units"], "nifty_price": cycle_summary["nifty_price"], "gold_price": cycle_summary["gold_price"], "amount": cycle_summary["amount"], }, ts=event_ts, ) return True if cycle_summary["has_any_fill"] and not cycle_summary["has_unresolved"]: _record_reconciliation_event( cur, "SIP_PARTIAL", logical_time=logical_time, payload={ "nifty_units": cycle_summary["nifty_units"], "gold_units": cycle_summary["gold_units"], "nifty_price": cycle_summary["nifty_price"], "gold_price": cycle_summary["gold_price"], "amount": cycle_summary["amount"], "reason": "partial_fill", }, ts=event_ts, ) return False if cycle_summary["has_unresolved"]: _record_reconciliation_event( cur, "ORDER_RECONCILIATION_PENDING", logical_time=logical_time, payload={ "logical_time": logical_time.isoformat(), "orders": cycle_orders, }, ts=event_ts, ) return False _record_reconciliation_event( cur, "SIP_NO_FILL", logical_time=logical_time, payload={ "reason": failure_reason or "no_fill", "orders": cycle_orders, }, ts=event_ts, ) return False def _list_orders_to_reconcile(cur, *, checked_before: datetime): user_id, run_id = _current_scope() cur.execute( """ SELECT local_order_id, logical_time, broker, symbol, side, broker_order_id, requested_qty, requested_price, average_price, status, broker_status, status_message, filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at, created_at FROM broker_order_state WHERE user_id = %s AND run_id = %s AND needs_reconciliation = TRUE AND (last_checked_at IS NULL OR last_checked_at <= %s) ORDER BY logical_time ASC, created_at ASC """, (user_id, run_id, checked_before), ) rows = [] for row in cur.fetchall(): rows.append( { "local_order_id": row[0], "logical_time": row[1], "broker": row[2], "symbol": row[3], "side": row[4], "broker_order_id": row[5], "requested_qty": float(row[6] or 0.0), "requested_price": float(row[7] or 0.0), "average_price": float(row[8] or 0.0), "status": row[9], "broker_status": row[10], "status_message": row[11], "filled_qty": float(row[12] or 0.0), "accounted_fill_qty": float(row[13] or 0.0), "needs_reconciliation": bool(row[14]), "last_checked_at": row[15], "created_at": row[16], "id": row[0], } ) return rows def reconcile_live_orders(*, broker: Broker, mode: str | None = "LIVE", now_ts: datetime | None = None): checked_at = _normalize_now(now_ts or _utc_now()) blocked = False summaries: list[dict] = [] pending_orders = run_with_retry( lambda cur, _conn: _list_orders_to_reconcile( cur, checked_before=checked_at - RECONCILIATION_INTERVAL, ) ) if not pending_orders: return {"blocked": False, "orders": [], "summaries": []} def _op(cur, _conn): state = load_state(mode=mode, cur=cur, for_update=True) insert_engine_event( cur, "ORDER_RECONCILIATION_STARTED", data={"count": len(pending_orders)}, ts=checked_at, ) nonlocal blocked for pending in pending_orders: refreshed = broker.refresh_order_status(pending) refreshed["logical_time"] = pending["logical_time"] details = _upsert_broker_order_state( cur, broker_name=str(pending.get("broker") or ""), logical_time=pending["logical_time"], order=refreshed, checked_at=checked_at, ) if details["status"] != pending.get("status"): insert_engine_event( cur, "ORDER_RECONCILIATION_STATUS_CHANGED", data={ "order_id": details["local_order_id"], "from": pending.get("status"), "to": details["status"], }, ts=checked_at, ) if details["status"] == LOCAL_ORDER_PARTIAL: insert_engine_event( cur, "ORDER_PARTIAL_FILL_DETECTED", data={ "order_id": details["local_order_id"], "filled_qty": details["filled_qty"], "requested_qty": details["requested_qty"], }, ts=checked_at, ) applied = _apply_fill_delta_to_state(state, details) if details["delta_fill_qty"] > 0: _mark_accounted_fill( cur, details["local_order_id"], details["accounted_fill_qty"] + details["delta_fill_qty"], checked_at, ) cycle_orders = _load_cycle_order_rows(cur, pending["logical_time"]) executed = _advance_cycle_state( cur, state=state, logical_time=pending["logical_time"], cycle_orders=cycle_orders, event_ts=checked_at, ) unresolved = any(order["needs_reconciliation"] for order in cycle_orders) if unresolved: oldest_ts = min( [ pending.get("created_at") or checked_at ] + [checked_at] ) if checked_at - oldest_ts >= RECONCILIATION_TIMEOUT: insert_engine_event( cur, "ORDER_RECONCILIATION_TIMEOUT", data={ "logical_time": pending["logical_time"].isoformat(), "orders": cycle_orders, }, ts=checked_at, ) blocked = True summaries.append( { "logical_time": pending["logical_time"], "executed": executed, "applied_amount": applied["amount"], "unresolved": unresolved, } ) save_state( state, mode=mode, cur=cur, emit_event=True, event_meta={"source": "live_reconciliation"}, ) insert_engine_event( cur, "ORDER_RECONCILIATION_COMPLETE", data={"blocked": blocked, "count": len(pending_orders)}, ts=checked_at, ) return {"blocked": blocked} run_with_retry(_op) return {"blocked": blocked, "orders": pending_orders, "summaries": summaries} def _record_live_order_events(cur, orders, event_ts): for order in orders: insert_engine_event(cur, "ORDER_PLACED", data=order, ts=event_ts) filled_qty, _average_price = _order_fill(order) status = (order.get("status") or "").upper() if filled_qty > 0: insert_engine_event( cur, "TRADE_EXECUTED", data={ "order_id": order.get("id"), "symbol": order.get("symbol"), "side": order.get("side"), "qty": filled_qty, "price": order.get("average_price") or order.get("price"), }, ts=event_ts, ) insert_engine_event(cur, "ORDER_FILLED", data={"order_id": order.get("id")}, ts=event_ts) elif status == "REJECTED": insert_engine_event(cur, "ORDER_REJECTED", data=order, ts=event_ts) elif status == "CANCELLED": insert_engine_event(cur, "ORDER_CANCELLED", data=order, ts=event_ts) elif status == "PENDING": insert_engine_event(cur, "ORDER_PENDING", data=order, ts=event_ts) def _try_execute_sip_paper( now, market_open, sip_interval, sip_amount, sp_price, gd_price, eq_w, gd_w, broker: Broker | None, mode: str | None, ): def _op(cur, _conn): now_ts = _normalize_now(now) event_ts = now_ts log_event("DEBUG_ENTER_TRY_EXECUTE", {"now": now_ts.isoformat()}, cur=cur, ts=event_ts) state = load_state(mode=mode, cur=cur, for_update=True) if not market_open: return state, False should_run, _last, logical_time = _resolve_timing(state, now_ts, sip_interval) if not should_run: return state, False if event_exists("SIP_EXECUTED", logical_time, cur=cur): return state, False sp_price_val = _as_float(sp_price) gd_price_val = _as_float(gd_price) eq_w_val = _as_float(eq_w) gd_w_val = _as_float(gd_w) sip_amount_val = _as_float(sip_amount) nifty_qty = (sip_amount_val * eq_w_val) / sp_price_val gold_qty = (sip_amount_val * gd_w_val) / gd_price_val if broker is None: return state, False funds = broker.get_funds(cur=cur) cash = funds.get("cash") if cash is not None and float(cash) < sip_amount_val: return state, False log_event( "DEBUG_EXECUTION_DECISION", { "last_sip_ts": state.get("last_sip_ts"), "now": now_ts.isoformat(), }, cur=cur, ts=event_ts, ) orders = [ broker.place_order( "NIFTYBEES.NS", "BUY", nifty_qty, sp_price_val, cur=cur, logical_time=logical_time, ), broker.place_order( "GOLDBEES.NS", "BUY", gold_qty, gd_price_val, cur=cur, logical_time=logical_time, ), ] applied = _apply_filled_orders_to_state(state, orders) executed = applied["amount"] > 0 if not executed: return state, False funds_after = broker.get_funds(cur=cur) cash_after = funds_after.get("cash") if cash_after is not None: state["cash"] = float(cash_after) state["last_sip_ts"] = now_ts.isoformat() state["last_run"] = now_ts.isoformat() save_state( state, mode=mode, cur=cur, emit_event=True, event_meta={"source": "sip"}, ) log_event( "SIP_EXECUTED", { "nifty_units": applied["nifty_units"], "gold_units": applied["gold_units"], "nifty_price": sp_price_val, "gold_price": gd_price_val, "amount": applied["amount"], }, cur=cur, ts=event_ts, logical_time=logical_time, ) return state, True return run_with_retry(_op) def _prepare_live_execution(now_ts, sip_interval, sip_amount_val, sp_price_val, gd_price_val, nifty_qty, gold_qty, mode): def _op(cur, _conn): state = load_state(mode=mode, cur=cur, for_update=True) should_run, _last, logical_time = _resolve_timing(state, now_ts, sip_interval) if not should_run: return {"ready": False, "state": state} if event_exists("SIP_EXECUTED", logical_time, cur=cur): return {"ready": False, "state": state} if not claim_execution_window(logical_time, mode=mode, cur=cur): return {"ready": False, "state": state} log_event( "SIP_ORDER_ATTEMPTED", { "nifty_units": nifty_qty, "gold_units": gold_qty, "nifty_price": sp_price_val, "gold_price": gd_price_val, "amount": sip_amount_val, }, cur=cur, ts=now_ts, logical_time=logical_time, ) return {"ready": True, "state": state, "logical_time": logical_time} return run_with_retry(_op) def _finalize_live_execution( *, now_ts, mode, logical_time, orders, funds_after, sp_price_val, gd_price_val, auth_failed: bool = False, failure_reason: str | None = None, ): def _op(cur, _conn): state = load_state(mode=mode, cur=cur, for_update=True) broker_name = "" for order in orders: if not broker_name: broker_name = str(order.get("broker") or "").strip().upper() order.setdefault("logical_time", logical_time) _record_live_order_events(cur, [order], now_ts) details = _upsert_broker_order_state( cur, broker_name=broker_name, logical_time=logical_time, order=order, checked_at=now_ts, ) if details["status"] == LOCAL_ORDER_PARTIAL: insert_engine_event( cur, "ORDER_PARTIAL_FILL_DETECTED", data={ "order_id": details["local_order_id"], "filled_qty": details["filled_qty"], "requested_qty": details["requested_qty"], }, ts=now_ts, ) if details["delta_fill_qty"] > 0: _apply_fill_delta_to_state(state, details) _mark_accounted_fill( cur, details["local_order_id"], details["accounted_fill_qty"] + details["delta_fill_qty"], now_ts, ) if funds_after is not None: cash_after = funds_after.get("cash") if cash_after is not None: state["cash"] = float(cash_after) cycle_orders = _load_cycle_order_rows(cur, logical_time) executed = _advance_cycle_state( cur, state=state, logical_time=logical_time, cycle_orders=cycle_orders, event_ts=now_ts, failure_reason=failure_reason or ("broker_auth_expired" if auth_failed else None), ) save_state( state, mode=mode, cur=cur, emit_event=True, event_meta={"source": "sip_live"}, ) return state, executed return run_with_retry(_op) def _try_execute_sip_live( now, market_open, sip_interval, sip_amount, sp_price, gd_price, eq_w, gd_w, broker: Broker | None, mode: str | None, ): now_ts = _normalize_now(now) if not market_open or broker is None: return load_state(mode=mode), False reconciliation = reconcile_live_orders(broker=broker, mode=mode, now_ts=now_ts) if reconciliation.get("blocked"): return load_state(mode=mode), False sp_price_val = _as_float(sp_price) gd_price_val = _as_float(gd_price) eq_w_val = _as_float(eq_w) gd_w_val = _as_float(gd_w) sip_amount_val = _as_float(sip_amount) nifty_qty = (sip_amount_val * eq_w_val) / sp_price_val gold_qty = (sip_amount_val * gd_w_val) / gd_price_val prepared = _prepare_live_execution( now_ts, sip_interval, sip_amount_val, sp_price_val, gd_price_val, nifty_qty, gold_qty, mode, ) if not prepared.get("ready"): return prepared.get("state") or load_state(mode=mode), False logical_time = prepared["logical_time"] orders = [] funds_after = None failure_reason = None auth_failed = False broker_name = getattr(broker, "__class__", type(broker)).__name__.replace("Live", "").replace("Broker", "").upper() try: funds_before = broker.get_funds() cash = funds_before.get("cash") if cash is not None and float(cash) < sip_amount_val: failure_reason = "insufficient_funds" else: if nifty_qty > 0: orders.append( broker.place_order( "NIFTYBEES.NS", "BUY", nifty_qty, sp_price_val, logical_time=logical_time, ) ) orders[-1]["broker"] = broker_name orders[-1]["logical_time"] = logical_time if gold_qty > 0: orders.append( broker.place_order( "GOLDBEES.NS", "BUY", gold_qty, gd_price_val, logical_time=logical_time, ) ) orders[-1]["broker"] = broker_name orders[-1]["logical_time"] = logical_time funds_after = broker.get_funds() except BrokerAuthExpired: auth_failed = True try: funds_after = broker.get_funds() except Exception: funds_after = None except Exception as exc: failure_reason = str(exc) try: funds_after = broker.get_funds() except Exception: funds_after = None state, _executed = _finalize_live_execution( now_ts=now_ts, mode=mode, logical_time=logical_time, orders=orders, funds_after=funds_after, sp_price_val=sp_price_val, gd_price_val=gd_price_val, auth_failed=False, failure_reason=failure_reason, ) raise state, executed = _finalize_live_execution( now_ts=now_ts, mode=mode, logical_time=logical_time, orders=orders, funds_after=funds_after, sp_price_val=sp_price_val, gd_price_val=gd_price_val, auth_failed=auth_failed, failure_reason=failure_reason, ) if auth_failed: raise BrokerAuthExpired("Broker session expired during live order execution") return state, executed def try_execute_sip( now, market_open, sip_interval, sip_amount, sp_price, gd_price, eq_w, gd_w, broker: Broker | None = None, mode: str | None = "LIVE", ): if broker is None: return load_state(mode=mode), False if getattr(broker, "external_orders", False): return _try_execute_sip_live( now, market_open, sip_interval, sip_amount, sp_price, gd_price, eq_w, gd_w, broker, mode, ) return _try_execute_sip_paper( now, market_open, sip_interval, sip_amount, sp_price, gd_price, eq_w, gd_w, broker, mode, )