1016 lines
33 KiB
Python
1016 lines
33 KiB
Python
# engine/execution.py
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from indian_paper_trading_strategy.engine.state import load_state, save_state
|
|
|
|
from indian_paper_trading_strategy.engine.broker import Broker, BrokerAuthExpired
|
|
from indian_paper_trading_strategy.engine.ledger import claim_execution_window, log_event, event_exists
|
|
from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry, get_context
|
|
from indian_paper_trading_strategy.engine.market import market_now
|
|
from indian_paper_trading_strategy.engine.time_utils import compute_logical_time
|
|
|
|
LOCAL_ORDER_PENDING = "PENDING"
|
|
LOCAL_ORDER_PARTIAL = "PARTIAL"
|
|
LOCAL_ORDER_FILLED = "FILLED"
|
|
LOCAL_ORDER_REJECTED = "REJECTED"
|
|
LOCAL_ORDER_CANCELLED = "CANCELLED"
|
|
LOCAL_ORDER_UNKNOWN = "UNKNOWN"
|
|
|
|
_RECONCILEABLE_ORDER_STATES = {
|
|
LOCAL_ORDER_PENDING,
|
|
LOCAL_ORDER_PARTIAL,
|
|
LOCAL_ORDER_UNKNOWN,
|
|
}
|
|
RECONCILIATION_INTERVAL = timedelta(
|
|
seconds=int(os.getenv("ORDER_RECONCILIATION_INTERVAL_SEC", "15"))
|
|
)
|
|
RECONCILIATION_TIMEOUT = timedelta(
|
|
seconds=int(os.getenv("ORDER_RECONCILIATION_TIMEOUT_SEC", "300"))
|
|
)
|
|
|
|
def _as_float(value):
|
|
if hasattr(value, "item"):
|
|
try:
|
|
return float(value.item())
|
|
except Exception:
|
|
pass
|
|
if hasattr(value, "iloc"):
|
|
try:
|
|
return float(value.iloc[-1])
|
|
except Exception:
|
|
pass
|
|
return float(value)
|
|
|
|
def _local_tz():
|
|
return market_now().tzinfo
|
|
|
|
|
|
def _normalize_now(now):
|
|
if now.tzinfo is None:
|
|
return now.replace(tzinfo=_local_tz())
|
|
return now
|
|
|
|
|
|
def _resolve_timing(state, now_ts, sip_interval):
|
|
force_execute = state.get("last_sip_ts") is None
|
|
last = state.get("last_sip_ts") or state.get("last_run")
|
|
if last and not force_execute:
|
|
try:
|
|
last_dt = datetime.fromisoformat(last)
|
|
except ValueError:
|
|
last_dt = None
|
|
if last_dt:
|
|
if last_dt.tzinfo is None:
|
|
last_dt = last_dt.replace(tzinfo=_local_tz())
|
|
if now_ts.tzinfo and last_dt.tzinfo and last_dt.tzinfo != now_ts.tzinfo:
|
|
last_dt = last_dt.astimezone(now_ts.tzinfo)
|
|
if last_dt and (now_ts - last_dt).total_seconds() < sip_interval:
|
|
return False, last, None
|
|
logical_time = compute_logical_time(now_ts, last, sip_interval)
|
|
return True, last, logical_time
|
|
|
|
|
|
def _order_fill(order):
|
|
if not isinstance(order, dict):
|
|
return 0.0, 0.0
|
|
filled_qty = float(order.get("filled_qty") or 0.0)
|
|
average_price = float(order.get("average_price") or order.get("price") or 0.0)
|
|
return filled_qty, average_price
|
|
|
|
|
|
def _apply_filled_orders_to_state(state, orders):
|
|
nifty_filled = 0.0
|
|
gold_filled = 0.0
|
|
total_spent = 0.0
|
|
|
|
for order in orders:
|
|
filled_qty, average_price = _order_fill(order)
|
|
if filled_qty <= 0:
|
|
continue
|
|
symbol = (order.get("symbol") or "").upper()
|
|
if symbol.startswith("NIFTYBEES"):
|
|
nifty_filled += filled_qty
|
|
elif symbol.startswith("GOLDBEES"):
|
|
gold_filled += filled_qty
|
|
total_spent += filled_qty * average_price
|
|
|
|
if nifty_filled:
|
|
state["nifty_units"] += nifty_filled
|
|
if gold_filled:
|
|
state["gold_units"] += gold_filled
|
|
if total_spent:
|
|
state["total_invested"] += total_spent
|
|
|
|
return {
|
|
"nifty_units": nifty_filled,
|
|
"gold_units": gold_filled,
|
|
"amount": total_spent,
|
|
}
|
|
|
|
|
|
def _utc_now():
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def _current_scope():
|
|
return get_context()
|
|
|
|
|
|
def _order_status_details(order: dict) -> dict:
|
|
payload = dict(order or {})
|
|
requested_qty = float(payload.get("requested_qty") or payload.get("qty") or 0.0)
|
|
filled_qty = max(float(payload.get("filled_qty") or 0.0), 0.0)
|
|
requested_price = float(payload.get("requested_price") or payload.get("price") or 0.0)
|
|
average_price = float(payload.get("average_price") or requested_price or 0.0)
|
|
raw_status = str(payload.get("status") or "").strip().upper()
|
|
|
|
if requested_qty > 0 and filled_qty >= requested_qty:
|
|
local_status = LOCAL_ORDER_FILLED
|
|
needs_reconciliation = False
|
|
elif filled_qty > 0:
|
|
local_status = LOCAL_ORDER_PARTIAL
|
|
needs_reconciliation = raw_status not in {LOCAL_ORDER_FILLED, LOCAL_ORDER_REJECTED, LOCAL_ORDER_CANCELLED}
|
|
elif raw_status == LOCAL_ORDER_REJECTED:
|
|
local_status = LOCAL_ORDER_REJECTED
|
|
needs_reconciliation = False
|
|
elif raw_status == LOCAL_ORDER_CANCELLED:
|
|
local_status = LOCAL_ORDER_CANCELLED
|
|
needs_reconciliation = False
|
|
elif raw_status == LOCAL_ORDER_PENDING:
|
|
local_status = LOCAL_ORDER_PENDING
|
|
needs_reconciliation = True
|
|
else:
|
|
local_status = LOCAL_ORDER_UNKNOWN
|
|
needs_reconciliation = True
|
|
|
|
return {
|
|
"local_order_id": str(payload.get("id") or payload.get("broker_order_id") or ""),
|
|
"broker_order_id": str(payload.get("broker_order_id") or payload.get("id") or "") or None,
|
|
"symbol": str(payload.get("symbol") or ""),
|
|
"side": str(payload.get("side") or "").upper(),
|
|
"requested_qty": requested_qty,
|
|
"filled_qty": filled_qty,
|
|
"requested_price": requested_price,
|
|
"average_price": average_price,
|
|
"status": local_status,
|
|
"broker_status": raw_status or None,
|
|
"status_message": payload.get("status_message"),
|
|
"needs_reconciliation": needs_reconciliation,
|
|
"logical_time": payload.get("logical_time"),
|
|
}
|
|
|
|
|
|
def _upsert_broker_order_state(cur, *, broker_name: str, logical_time, order: dict, checked_at: datetime):
|
|
user_id, run_id = _current_scope()
|
|
details = _order_status_details(order)
|
|
if not details["local_order_id"]:
|
|
raise ValueError("Broker order state requires a stable order id")
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT status, filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at
|
|
FROM broker_order_state
|
|
WHERE local_order_id = %s
|
|
FOR UPDATE
|
|
""",
|
|
(details["local_order_id"],),
|
|
)
|
|
row = cur.fetchone()
|
|
previous = {
|
|
"status": row[0],
|
|
"filled_qty": float(row[1] or 0.0),
|
|
"accounted_fill_qty": float(row[2] or 0.0),
|
|
"needs_reconciliation": bool(row[3]),
|
|
"last_checked_at": row[4],
|
|
} if row else {
|
|
"status": None,
|
|
"filled_qty": 0.0,
|
|
"accounted_fill_qty": 0.0,
|
|
"needs_reconciliation": False,
|
|
"last_checked_at": None,
|
|
}
|
|
|
|
logical_ts = logical_time or checked_at
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO broker_order_state (
|
|
local_order_id,
|
|
user_id,
|
|
run_id,
|
|
logical_time,
|
|
broker,
|
|
symbol,
|
|
side,
|
|
broker_order_id,
|
|
requested_qty,
|
|
filled_qty,
|
|
accounted_fill_qty,
|
|
requested_price,
|
|
average_price,
|
|
status,
|
|
broker_status,
|
|
status_message,
|
|
needs_reconciliation,
|
|
last_checked_at,
|
|
created_at,
|
|
updated_at
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (local_order_id) DO UPDATE
|
|
SET broker_order_id = EXCLUDED.broker_order_id,
|
|
requested_qty = EXCLUDED.requested_qty,
|
|
filled_qty = EXCLUDED.filled_qty,
|
|
requested_price = EXCLUDED.requested_price,
|
|
average_price = EXCLUDED.average_price,
|
|
status = EXCLUDED.status,
|
|
broker_status = EXCLUDED.broker_status,
|
|
status_message = EXCLUDED.status_message,
|
|
needs_reconciliation = EXCLUDED.needs_reconciliation,
|
|
last_checked_at = EXCLUDED.last_checked_at,
|
|
updated_at = EXCLUDED.updated_at
|
|
""",
|
|
(
|
|
details["local_order_id"],
|
|
user_id,
|
|
run_id,
|
|
logical_ts,
|
|
broker_name,
|
|
details["symbol"],
|
|
details["side"],
|
|
details["broker_order_id"],
|
|
details["requested_qty"],
|
|
details["filled_qty"],
|
|
previous["accounted_fill_qty"],
|
|
details["requested_price"],
|
|
details["average_price"],
|
|
details["status"],
|
|
details["broker_status"],
|
|
details["status_message"],
|
|
details["needs_reconciliation"],
|
|
checked_at,
|
|
checked_at,
|
|
checked_at,
|
|
),
|
|
)
|
|
|
|
details["accounted_fill_qty"] = previous["accounted_fill_qty"]
|
|
details["delta_fill_qty"] = max(details["filled_qty"] - previous["accounted_fill_qty"], 0.0)
|
|
details["previous_status"] = previous["status"]
|
|
details["previous_filled_qty"] = previous["filled_qty"]
|
|
details["previous_needs_reconciliation"] = previous["needs_reconciliation"]
|
|
return details
|
|
|
|
|
|
def _mark_accounted_fill(cur, local_order_id: str, accounted_fill_qty: float, checked_at: datetime):
|
|
cur.execute(
|
|
"""
|
|
UPDATE broker_order_state
|
|
SET accounted_fill_qty = %s,
|
|
updated_at = %s
|
|
WHERE local_order_id = %s
|
|
""",
|
|
(accounted_fill_qty, checked_at, local_order_id),
|
|
)
|
|
|
|
|
|
def _apply_fill_delta_to_state(state: dict, order_details: dict):
|
|
delta_qty = max(float(order_details.get("delta_fill_qty") or 0.0), 0.0)
|
|
average_price = float(order_details.get("average_price") or order_details.get("requested_price") or 0.0)
|
|
symbol = str(order_details.get("symbol") or "").upper()
|
|
delta_amount = delta_qty * average_price
|
|
|
|
if delta_qty <= 0:
|
|
return {"nifty_units": 0.0, "gold_units": 0.0, "amount": 0.0}
|
|
|
|
nifty_units = 0.0
|
|
gold_units = 0.0
|
|
if symbol.startswith("NIFTYBEES"):
|
|
state["nifty_units"] += delta_qty
|
|
nifty_units = delta_qty
|
|
elif symbol.startswith("GOLDBEES"):
|
|
state["gold_units"] += delta_qty
|
|
gold_units = delta_qty
|
|
state["total_invested"] += delta_amount
|
|
return {"nifty_units": nifty_units, "gold_units": gold_units, "amount": delta_amount}
|
|
|
|
|
|
def _load_cycle_order_rows(cur, logical_time):
|
|
user_id, run_id = _current_scope()
|
|
cur.execute(
|
|
"""
|
|
SELECT local_order_id, symbol, side, requested_qty, filled_qty, accounted_fill_qty,
|
|
requested_price, average_price, status, broker_status, status_message,
|
|
needs_reconciliation, broker_order_id
|
|
FROM broker_order_state
|
|
WHERE user_id = %s AND run_id = %s AND logical_time = %s
|
|
ORDER BY local_order_id
|
|
""",
|
|
(user_id, run_id, logical_time),
|
|
)
|
|
rows = []
|
|
for row in cur.fetchall():
|
|
rows.append(
|
|
{
|
|
"local_order_id": row[0],
|
|
"symbol": row[1],
|
|
"side": row[2],
|
|
"requested_qty": float(row[3] or 0.0),
|
|
"filled_qty": float(row[4] or 0.0),
|
|
"accounted_fill_qty": float(row[5] or 0.0),
|
|
"requested_price": float(row[6] or 0.0),
|
|
"average_price": float(row[7] or 0.0),
|
|
"status": row[8],
|
|
"broker_status": row[9],
|
|
"status_message": row[10],
|
|
"needs_reconciliation": bool(row[11]),
|
|
"broker_order_id": row[12],
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def _aggregate_cycle_orders(orders: list[dict]):
|
|
summary = {
|
|
"nifty_units": 0.0,
|
|
"gold_units": 0.0,
|
|
"nifty_price": 0.0,
|
|
"gold_price": 0.0,
|
|
"amount": 0.0,
|
|
"has_any_fill": False,
|
|
"all_filled": bool(orders),
|
|
"has_unresolved": False,
|
|
}
|
|
for order in orders:
|
|
symbol = str(order.get("symbol") or "").upper()
|
|
filled_qty = float(order.get("filled_qty") or 0.0)
|
|
requested_qty = float(order.get("requested_qty") or 0.0)
|
|
avg_price = float(order.get("average_price") or order.get("requested_price") or 0.0)
|
|
status = str(order.get("status") or "")
|
|
needs_reconciliation = bool(order.get("needs_reconciliation"))
|
|
if status != LOCAL_ORDER_FILLED:
|
|
summary["all_filled"] = False
|
|
if needs_reconciliation or status in _RECONCILEABLE_ORDER_STATES:
|
|
summary["has_unresolved"] = True
|
|
if filled_qty <= 0:
|
|
continue
|
|
summary["has_any_fill"] = True
|
|
summary["amount"] += filled_qty * avg_price
|
|
if symbol.startswith("NIFTYBEES"):
|
|
summary["nifty_units"] += filled_qty
|
|
summary["nifty_price"] = avg_price or summary["nifty_price"]
|
|
elif symbol.startswith("GOLDBEES"):
|
|
summary["gold_units"] += filled_qty
|
|
summary["gold_price"] = avg_price or summary["gold_price"]
|
|
if requested_qty > 0 and filled_qty < requested_qty:
|
|
summary["all_filled"] = False
|
|
return summary
|
|
|
|
|
|
def _record_reconciliation_event(cur, event: str, *, logical_time, payload: dict, ts: datetime):
|
|
if event_exists(event, logical_time, cur=cur):
|
|
return
|
|
if event in {"SIP_EXECUTED", "SIP_PARTIAL", "SIP_NO_FILL", "ORDER_RECONCILIATION_PENDING"}:
|
|
log_event(event, payload, cur=cur, ts=ts, logical_time=logical_time)
|
|
return
|
|
insert_engine_event(cur, event, data=payload, ts=ts)
|
|
|
|
|
|
def _advance_cycle_state(cur, *, state: dict, logical_time, cycle_orders: list[dict], event_ts: datetime, failure_reason: str | None = None):
|
|
cycle_summary = _aggregate_cycle_orders(cycle_orders)
|
|
if cycle_orders or failure_reason:
|
|
state["last_run"] = event_ts.isoformat()
|
|
|
|
if cycle_summary["all_filled"]:
|
|
state["last_sip_ts"] = event_ts.isoformat()
|
|
_record_reconciliation_event(
|
|
cur,
|
|
"SIP_EXECUTED",
|
|
logical_time=logical_time,
|
|
payload={
|
|
"nifty_units": cycle_summary["nifty_units"],
|
|
"gold_units": cycle_summary["gold_units"],
|
|
"nifty_price": cycle_summary["nifty_price"],
|
|
"gold_price": cycle_summary["gold_price"],
|
|
"amount": cycle_summary["amount"],
|
|
},
|
|
ts=event_ts,
|
|
)
|
|
return True
|
|
|
|
if cycle_summary["has_any_fill"] and not cycle_summary["has_unresolved"]:
|
|
_record_reconciliation_event(
|
|
cur,
|
|
"SIP_PARTIAL",
|
|
logical_time=logical_time,
|
|
payload={
|
|
"nifty_units": cycle_summary["nifty_units"],
|
|
"gold_units": cycle_summary["gold_units"],
|
|
"nifty_price": cycle_summary["nifty_price"],
|
|
"gold_price": cycle_summary["gold_price"],
|
|
"amount": cycle_summary["amount"],
|
|
"reason": "partial_fill",
|
|
},
|
|
ts=event_ts,
|
|
)
|
|
return False
|
|
|
|
if cycle_summary["has_unresolved"]:
|
|
_record_reconciliation_event(
|
|
cur,
|
|
"ORDER_RECONCILIATION_PENDING",
|
|
logical_time=logical_time,
|
|
payload={
|
|
"logical_time": logical_time.isoformat(),
|
|
"orders": cycle_orders,
|
|
},
|
|
ts=event_ts,
|
|
)
|
|
return False
|
|
|
|
_record_reconciliation_event(
|
|
cur,
|
|
"SIP_NO_FILL",
|
|
logical_time=logical_time,
|
|
payload={
|
|
"reason": failure_reason or "no_fill",
|
|
"orders": cycle_orders,
|
|
},
|
|
ts=event_ts,
|
|
)
|
|
return False
|
|
|
|
|
|
def _list_orders_to_reconcile(cur, *, checked_before: datetime):
|
|
user_id, run_id = _current_scope()
|
|
cur.execute(
|
|
"""
|
|
SELECT local_order_id, logical_time, broker, symbol, side, broker_order_id, requested_qty,
|
|
requested_price, average_price, status, broker_status, status_message,
|
|
filled_qty, accounted_fill_qty, needs_reconciliation, last_checked_at, created_at
|
|
FROM broker_order_state
|
|
WHERE user_id = %s
|
|
AND run_id = %s
|
|
AND needs_reconciliation = TRUE
|
|
AND (last_checked_at IS NULL OR last_checked_at <= %s)
|
|
ORDER BY logical_time ASC, created_at ASC
|
|
""",
|
|
(user_id, run_id, checked_before),
|
|
)
|
|
rows = []
|
|
for row in cur.fetchall():
|
|
rows.append(
|
|
{
|
|
"local_order_id": row[0],
|
|
"logical_time": row[1],
|
|
"broker": row[2],
|
|
"symbol": row[3],
|
|
"side": row[4],
|
|
"broker_order_id": row[5],
|
|
"requested_qty": float(row[6] or 0.0),
|
|
"requested_price": float(row[7] or 0.0),
|
|
"average_price": float(row[8] or 0.0),
|
|
"status": row[9],
|
|
"broker_status": row[10],
|
|
"status_message": row[11],
|
|
"filled_qty": float(row[12] or 0.0),
|
|
"accounted_fill_qty": float(row[13] or 0.0),
|
|
"needs_reconciliation": bool(row[14]),
|
|
"last_checked_at": row[15],
|
|
"created_at": row[16],
|
|
"id": row[0],
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def reconcile_live_orders(*, broker: Broker, mode: str | None = "LIVE", now_ts: datetime | None = None):
|
|
checked_at = _normalize_now(now_ts or _utc_now())
|
|
blocked = False
|
|
summaries: list[dict] = []
|
|
|
|
pending_orders = run_with_retry(
|
|
lambda cur, _conn: _list_orders_to_reconcile(
|
|
cur,
|
|
checked_before=checked_at - RECONCILIATION_INTERVAL,
|
|
)
|
|
)
|
|
if not pending_orders:
|
|
return {"blocked": False, "orders": [], "summaries": []}
|
|
|
|
def _op(cur, _conn):
|
|
state = load_state(mode=mode, cur=cur, for_update=True)
|
|
insert_engine_event(
|
|
cur,
|
|
"ORDER_RECONCILIATION_STARTED",
|
|
data={"count": len(pending_orders)},
|
|
ts=checked_at,
|
|
)
|
|
|
|
nonlocal blocked
|
|
for pending in pending_orders:
|
|
refreshed = broker.refresh_order_status(pending)
|
|
refreshed["logical_time"] = pending["logical_time"]
|
|
details = _upsert_broker_order_state(
|
|
cur,
|
|
broker_name=str(pending.get("broker") or ""),
|
|
logical_time=pending["logical_time"],
|
|
order=refreshed,
|
|
checked_at=checked_at,
|
|
)
|
|
|
|
if details["status"] != pending.get("status"):
|
|
insert_engine_event(
|
|
cur,
|
|
"ORDER_RECONCILIATION_STATUS_CHANGED",
|
|
data={
|
|
"order_id": details["local_order_id"],
|
|
"from": pending.get("status"),
|
|
"to": details["status"],
|
|
},
|
|
ts=checked_at,
|
|
)
|
|
|
|
if details["status"] == LOCAL_ORDER_PARTIAL:
|
|
insert_engine_event(
|
|
cur,
|
|
"ORDER_PARTIAL_FILL_DETECTED",
|
|
data={
|
|
"order_id": details["local_order_id"],
|
|
"filled_qty": details["filled_qty"],
|
|
"requested_qty": details["requested_qty"],
|
|
},
|
|
ts=checked_at,
|
|
)
|
|
|
|
applied = _apply_fill_delta_to_state(state, details)
|
|
if details["delta_fill_qty"] > 0:
|
|
_mark_accounted_fill(
|
|
cur,
|
|
details["local_order_id"],
|
|
details["accounted_fill_qty"] + details["delta_fill_qty"],
|
|
checked_at,
|
|
)
|
|
|
|
cycle_orders = _load_cycle_order_rows(cur, pending["logical_time"])
|
|
executed = _advance_cycle_state(
|
|
cur,
|
|
state=state,
|
|
logical_time=pending["logical_time"],
|
|
cycle_orders=cycle_orders,
|
|
event_ts=checked_at,
|
|
)
|
|
|
|
unresolved = any(order["needs_reconciliation"] for order in cycle_orders)
|
|
if unresolved:
|
|
oldest_ts = min(
|
|
[
|
|
pending.get("created_at") or checked_at
|
|
]
|
|
+ [checked_at]
|
|
)
|
|
if checked_at - oldest_ts >= RECONCILIATION_TIMEOUT:
|
|
insert_engine_event(
|
|
cur,
|
|
"ORDER_RECONCILIATION_TIMEOUT",
|
|
data={
|
|
"logical_time": pending["logical_time"].isoformat(),
|
|
"orders": cycle_orders,
|
|
},
|
|
ts=checked_at,
|
|
)
|
|
blocked = True
|
|
|
|
summaries.append(
|
|
{
|
|
"logical_time": pending["logical_time"],
|
|
"executed": executed,
|
|
"applied_amount": applied["amount"],
|
|
"unresolved": unresolved,
|
|
}
|
|
)
|
|
|
|
save_state(
|
|
state,
|
|
mode=mode,
|
|
cur=cur,
|
|
emit_event=True,
|
|
event_meta={"source": "live_reconciliation"},
|
|
)
|
|
insert_engine_event(
|
|
cur,
|
|
"ORDER_RECONCILIATION_COMPLETE",
|
|
data={"blocked": blocked, "count": len(pending_orders)},
|
|
ts=checked_at,
|
|
)
|
|
return {"blocked": blocked}
|
|
|
|
run_with_retry(_op)
|
|
return {"blocked": blocked, "orders": pending_orders, "summaries": summaries}
|
|
|
|
|
|
def _record_live_order_events(cur, orders, event_ts):
|
|
for order in orders:
|
|
insert_engine_event(cur, "ORDER_PLACED", data=order, ts=event_ts)
|
|
filled_qty, _average_price = _order_fill(order)
|
|
status = (order.get("status") or "").upper()
|
|
if filled_qty > 0:
|
|
insert_engine_event(
|
|
cur,
|
|
"TRADE_EXECUTED",
|
|
data={
|
|
"order_id": order.get("id"),
|
|
"symbol": order.get("symbol"),
|
|
"side": order.get("side"),
|
|
"qty": filled_qty,
|
|
"price": order.get("average_price") or order.get("price"),
|
|
},
|
|
ts=event_ts,
|
|
)
|
|
insert_engine_event(cur, "ORDER_FILLED", data={"order_id": order.get("id")}, ts=event_ts)
|
|
elif status == "REJECTED":
|
|
insert_engine_event(cur, "ORDER_REJECTED", data=order, ts=event_ts)
|
|
elif status == "CANCELLED":
|
|
insert_engine_event(cur, "ORDER_CANCELLED", data=order, ts=event_ts)
|
|
elif status == "PENDING":
|
|
insert_engine_event(cur, "ORDER_PENDING", data=order, ts=event_ts)
|
|
|
|
|
|
def _try_execute_sip_paper(
|
|
now,
|
|
market_open,
|
|
sip_interval,
|
|
sip_amount,
|
|
sp_price,
|
|
gd_price,
|
|
eq_w,
|
|
gd_w,
|
|
broker: Broker | None,
|
|
mode: str | None,
|
|
):
|
|
def _op(cur, _conn):
|
|
now_ts = _normalize_now(now)
|
|
event_ts = now_ts
|
|
log_event("DEBUG_ENTER_TRY_EXECUTE", {"now": now_ts.isoformat()}, cur=cur, ts=event_ts)
|
|
|
|
state = load_state(mode=mode, cur=cur, for_update=True)
|
|
|
|
if not market_open:
|
|
return state, False
|
|
|
|
should_run, _last, logical_time = _resolve_timing(state, now_ts, sip_interval)
|
|
if not should_run:
|
|
return state, False
|
|
if event_exists("SIP_EXECUTED", logical_time, cur=cur):
|
|
return state, False
|
|
|
|
sp_price_val = _as_float(sp_price)
|
|
gd_price_val = _as_float(gd_price)
|
|
eq_w_val = _as_float(eq_w)
|
|
gd_w_val = _as_float(gd_w)
|
|
sip_amount_val = _as_float(sip_amount)
|
|
|
|
nifty_qty = (sip_amount_val * eq_w_val) / sp_price_val
|
|
gold_qty = (sip_amount_val * gd_w_val) / gd_price_val
|
|
|
|
if broker is None:
|
|
return state, False
|
|
|
|
funds = broker.get_funds(cur=cur)
|
|
cash = funds.get("cash")
|
|
if cash is not None and float(cash) < sip_amount_val:
|
|
return state, False
|
|
|
|
log_event(
|
|
"DEBUG_EXECUTION_DECISION",
|
|
{
|
|
"last_sip_ts": state.get("last_sip_ts"),
|
|
"now": now_ts.isoformat(),
|
|
},
|
|
cur=cur,
|
|
ts=event_ts,
|
|
)
|
|
|
|
orders = [
|
|
broker.place_order(
|
|
"NIFTYBEES.NS",
|
|
"BUY",
|
|
nifty_qty,
|
|
sp_price_val,
|
|
cur=cur,
|
|
logical_time=logical_time,
|
|
),
|
|
broker.place_order(
|
|
"GOLDBEES.NS",
|
|
"BUY",
|
|
gold_qty,
|
|
gd_price_val,
|
|
cur=cur,
|
|
logical_time=logical_time,
|
|
),
|
|
]
|
|
|
|
applied = _apply_filled_orders_to_state(state, orders)
|
|
executed = applied["amount"] > 0
|
|
if not executed:
|
|
return state, False
|
|
|
|
funds_after = broker.get_funds(cur=cur)
|
|
cash_after = funds_after.get("cash")
|
|
if cash_after is not None:
|
|
state["cash"] = float(cash_after)
|
|
|
|
state["last_sip_ts"] = now_ts.isoformat()
|
|
state["last_run"] = now_ts.isoformat()
|
|
|
|
save_state(
|
|
state,
|
|
mode=mode,
|
|
cur=cur,
|
|
emit_event=True,
|
|
event_meta={"source": "sip"},
|
|
)
|
|
|
|
log_event(
|
|
"SIP_EXECUTED",
|
|
{
|
|
"nifty_units": applied["nifty_units"],
|
|
"gold_units": applied["gold_units"],
|
|
"nifty_price": sp_price_val,
|
|
"gold_price": gd_price_val,
|
|
"amount": applied["amount"],
|
|
},
|
|
cur=cur,
|
|
ts=event_ts,
|
|
logical_time=logical_time,
|
|
)
|
|
|
|
return state, True
|
|
|
|
return run_with_retry(_op)
|
|
|
|
|
|
def _prepare_live_execution(now_ts, sip_interval, sip_amount_val, sp_price_val, gd_price_val, nifty_qty, gold_qty, mode):
|
|
def _op(cur, _conn):
|
|
state = load_state(mode=mode, cur=cur, for_update=True)
|
|
should_run, _last, logical_time = _resolve_timing(state, now_ts, sip_interval)
|
|
if not should_run:
|
|
return {"ready": False, "state": state}
|
|
if event_exists("SIP_EXECUTED", logical_time, cur=cur):
|
|
return {"ready": False, "state": state}
|
|
if not claim_execution_window(logical_time, mode=mode, cur=cur):
|
|
return {"ready": False, "state": state}
|
|
|
|
log_event(
|
|
"SIP_ORDER_ATTEMPTED",
|
|
{
|
|
"nifty_units": nifty_qty,
|
|
"gold_units": gold_qty,
|
|
"nifty_price": sp_price_val,
|
|
"gold_price": gd_price_val,
|
|
"amount": sip_amount_val,
|
|
},
|
|
cur=cur,
|
|
ts=now_ts,
|
|
logical_time=logical_time,
|
|
)
|
|
return {"ready": True, "state": state, "logical_time": logical_time}
|
|
|
|
return run_with_retry(_op)
|
|
|
|
|
|
def _finalize_live_execution(
|
|
*,
|
|
now_ts,
|
|
mode,
|
|
logical_time,
|
|
orders,
|
|
funds_after,
|
|
sp_price_val,
|
|
gd_price_val,
|
|
auth_failed: bool = False,
|
|
failure_reason: str | None = None,
|
|
):
|
|
def _op(cur, _conn):
|
|
state = load_state(mode=mode, cur=cur, for_update=True)
|
|
broker_name = ""
|
|
for order in orders:
|
|
if not broker_name:
|
|
broker_name = str(order.get("broker") or "").strip().upper()
|
|
order.setdefault("logical_time", logical_time)
|
|
_record_live_order_events(cur, [order], now_ts)
|
|
details = _upsert_broker_order_state(
|
|
cur,
|
|
broker_name=broker_name,
|
|
logical_time=logical_time,
|
|
order=order,
|
|
checked_at=now_ts,
|
|
)
|
|
if details["status"] == LOCAL_ORDER_PARTIAL:
|
|
insert_engine_event(
|
|
cur,
|
|
"ORDER_PARTIAL_FILL_DETECTED",
|
|
data={
|
|
"order_id": details["local_order_id"],
|
|
"filled_qty": details["filled_qty"],
|
|
"requested_qty": details["requested_qty"],
|
|
},
|
|
ts=now_ts,
|
|
)
|
|
if details["delta_fill_qty"] > 0:
|
|
_apply_fill_delta_to_state(state, details)
|
|
_mark_accounted_fill(
|
|
cur,
|
|
details["local_order_id"],
|
|
details["accounted_fill_qty"] + details["delta_fill_qty"],
|
|
now_ts,
|
|
)
|
|
|
|
if funds_after is not None:
|
|
cash_after = funds_after.get("cash")
|
|
if cash_after is not None:
|
|
state["cash"] = float(cash_after)
|
|
|
|
cycle_orders = _load_cycle_order_rows(cur, logical_time)
|
|
executed = _advance_cycle_state(
|
|
cur,
|
|
state=state,
|
|
logical_time=logical_time,
|
|
cycle_orders=cycle_orders,
|
|
event_ts=now_ts,
|
|
failure_reason=failure_reason or ("broker_auth_expired" if auth_failed else None),
|
|
)
|
|
|
|
save_state(
|
|
state,
|
|
mode=mode,
|
|
cur=cur,
|
|
emit_event=True,
|
|
event_meta={"source": "sip_live"},
|
|
)
|
|
|
|
return state, executed
|
|
|
|
return run_with_retry(_op)
|
|
|
|
|
|
def _try_execute_sip_live(
|
|
now,
|
|
market_open,
|
|
sip_interval,
|
|
sip_amount,
|
|
sp_price,
|
|
gd_price,
|
|
eq_w,
|
|
gd_w,
|
|
broker: Broker | None,
|
|
mode: str | None,
|
|
):
|
|
now_ts = _normalize_now(now)
|
|
if not market_open or broker is None:
|
|
return load_state(mode=mode), False
|
|
|
|
reconciliation = reconcile_live_orders(broker=broker, mode=mode, now_ts=now_ts)
|
|
if reconciliation.get("blocked"):
|
|
return load_state(mode=mode), False
|
|
|
|
sp_price_val = _as_float(sp_price)
|
|
gd_price_val = _as_float(gd_price)
|
|
eq_w_val = _as_float(eq_w)
|
|
gd_w_val = _as_float(gd_w)
|
|
sip_amount_val = _as_float(sip_amount)
|
|
|
|
nifty_qty = (sip_amount_val * eq_w_val) / sp_price_val
|
|
gold_qty = (sip_amount_val * gd_w_val) / gd_price_val
|
|
|
|
prepared = _prepare_live_execution(
|
|
now_ts,
|
|
sip_interval,
|
|
sip_amount_val,
|
|
sp_price_val,
|
|
gd_price_val,
|
|
nifty_qty,
|
|
gold_qty,
|
|
mode,
|
|
)
|
|
if not prepared.get("ready"):
|
|
return prepared.get("state") or load_state(mode=mode), False
|
|
|
|
logical_time = prepared["logical_time"]
|
|
orders = []
|
|
funds_after = None
|
|
failure_reason = None
|
|
auth_failed = False
|
|
broker_name = getattr(broker, "__class__", type(broker)).__name__.replace("Live", "").replace("Broker", "").upper()
|
|
|
|
try:
|
|
funds_before = broker.get_funds()
|
|
cash = funds_before.get("cash")
|
|
if cash is not None and float(cash) < sip_amount_val:
|
|
failure_reason = "insufficient_funds"
|
|
else:
|
|
if nifty_qty > 0:
|
|
orders.append(
|
|
broker.place_order(
|
|
"NIFTYBEES.NS",
|
|
"BUY",
|
|
nifty_qty,
|
|
sp_price_val,
|
|
logical_time=logical_time,
|
|
)
|
|
)
|
|
orders[-1]["broker"] = broker_name
|
|
orders[-1]["logical_time"] = logical_time
|
|
if gold_qty > 0:
|
|
orders.append(
|
|
broker.place_order(
|
|
"GOLDBEES.NS",
|
|
"BUY",
|
|
gold_qty,
|
|
gd_price_val,
|
|
logical_time=logical_time,
|
|
)
|
|
)
|
|
orders[-1]["broker"] = broker_name
|
|
orders[-1]["logical_time"] = logical_time
|
|
funds_after = broker.get_funds()
|
|
except BrokerAuthExpired:
|
|
auth_failed = True
|
|
try:
|
|
funds_after = broker.get_funds()
|
|
except Exception:
|
|
funds_after = None
|
|
except Exception as exc:
|
|
failure_reason = str(exc)
|
|
try:
|
|
funds_after = broker.get_funds()
|
|
except Exception:
|
|
funds_after = None
|
|
state, _executed = _finalize_live_execution(
|
|
now_ts=now_ts,
|
|
mode=mode,
|
|
logical_time=logical_time,
|
|
orders=orders,
|
|
funds_after=funds_after,
|
|
sp_price_val=sp_price_val,
|
|
gd_price_val=gd_price_val,
|
|
auth_failed=False,
|
|
failure_reason=failure_reason,
|
|
)
|
|
raise
|
|
|
|
state, executed = _finalize_live_execution(
|
|
now_ts=now_ts,
|
|
mode=mode,
|
|
logical_time=logical_time,
|
|
orders=orders,
|
|
funds_after=funds_after,
|
|
sp_price_val=sp_price_val,
|
|
gd_price_val=gd_price_val,
|
|
auth_failed=auth_failed,
|
|
failure_reason=failure_reason,
|
|
)
|
|
if auth_failed:
|
|
raise BrokerAuthExpired("Broker session expired during live order execution")
|
|
return state, executed
|
|
|
|
def try_execute_sip(
|
|
now,
|
|
market_open,
|
|
sip_interval,
|
|
sip_amount,
|
|
sp_price,
|
|
gd_price,
|
|
eq_w,
|
|
gd_w,
|
|
broker: Broker | None = None,
|
|
mode: str | None = "LIVE",
|
|
):
|
|
if broker is None:
|
|
return load_state(mode=mode), False
|
|
if getattr(broker, "external_orders", False):
|
|
return _try_execute_sip_live(
|
|
now,
|
|
market_open,
|
|
sip_interval,
|
|
sip_amount,
|
|
sp_price,
|
|
gd_price,
|
|
eq_w,
|
|
gd_w,
|
|
broker,
|
|
mode,
|
|
)
|
|
return _try_execute_sip_paper(
|
|
now,
|
|
market_open,
|
|
sip_interval,
|
|
sip_amount,
|
|
sp_price,
|
|
gd_price,
|
|
eq_w,
|
|
gd_w,
|
|
broker,
|
|
mode,
|
|
)
|
|
|