thigal_test
This commit is contained in:
parent
f4b7575f00
commit
7677895b05
@ -11,7 +11,7 @@ from app.services.auth_service import (
|
||||
get_last_session_meta,
|
||||
verify_user,
|
||||
)
|
||||
from app.services.email_service import send_email
|
||||
from app.services.email_service import send_email_async
|
||||
|
||||
router = APIRouter(prefix="/api")
|
||||
SESSION_COOKIE_NAME = "session_id"
|
||||
@ -56,7 +56,7 @@ def signup(payload: AuthPayload, response: Response):
|
||||
"You can now log in and start using the platform.\n\n"
|
||||
"Quantfortune Support"
|
||||
)
|
||||
send_email(user["username"], "Welcome to Quantfortune", body)
|
||||
send_email_async(user["username"], "Welcome to Quantfortune", body)
|
||||
except Exception:
|
||||
pass
|
||||
return {"id": user["id"], "username": user["username"], "role": user.get("role")}
|
||||
@ -85,7 +85,7 @@ def login(payload: AuthPayload, response: Response, request: Request):
|
||||
f"Device: {user_agent or 'unknown'}\n\n"
|
||||
"If this wasn't you, please reset your password immediately."
|
||||
)
|
||||
send_email(user["username"], "New login detected", body)
|
||||
send_email_async(user["username"], "New login detected", body)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@ from app.broker_store import (
|
||||
)
|
||||
from app.services.auth_service import get_user_for_session
|
||||
from app.services.zerodha_service import build_login_url, exchange_request_token
|
||||
from app.services.email_service import send_email
|
||||
from app.services.email_service import send_email_async
|
||||
from app.services.zerodha_storage import set_session
|
||||
|
||||
router = APIRouter(prefix="/api/broker")
|
||||
@ -53,7 +53,7 @@ async def connect_broker(payload: dict, request: Request):
|
||||
f"Broker: {broker}\n"
|
||||
f"Broker User ID: {broker_user_id or 'N/A'}\n"
|
||||
)
|
||||
send_email(user["username"], "Broker connected", body)
|
||||
send_email_async(user["username"], "Broker connected", body)
|
||||
except Exception:
|
||||
pass
|
||||
return {"connected": True}
|
||||
@ -82,7 +82,7 @@ async def disconnect_broker(request: Request):
|
||||
set_broker_auth_state(user["id"], "DISCONNECTED")
|
||||
try:
|
||||
body = "Your broker connection has been disconnected from Quantfortune."
|
||||
send_email(user["username"], "Broker disconnected", body)
|
||||
send_email_async(user["username"], "Broker disconnected", body)
|
||||
except Exception:
|
||||
pass
|
||||
return {"connected": False}
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
import os
|
||||
import smtplib
|
||||
import ssl
|
||||
import threading
|
||||
from email.message import EmailMessage
|
||||
|
||||
|
||||
SMTP_TIMEOUT_SECONDS = float((os.getenv("SMTP_TIMEOUT_SECONDS") or "5").strip())
|
||||
|
||||
|
||||
def send_email(to_email: str, subject: str, body_text: str) -> bool:
|
||||
smtp_user = (os.getenv("SMTP_USER") or "").strip()
|
||||
smtp_pass = (os.getenv("SMTP_PASS") or "").replace(" ", "").strip()
|
||||
@ -21,8 +25,20 @@ def send_email(to_email: str, subject: str, body_text: str) -> bool:
|
||||
msg.set_content(body_text)
|
||||
|
||||
context = ssl.create_default_context()
|
||||
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
||||
with smtplib.SMTP(smtp_host, smtp_port, timeout=SMTP_TIMEOUT_SECONDS) as server:
|
||||
server.starttls(context=context)
|
||||
server.login(smtp_user, smtp_pass)
|
||||
server.send_message(msg)
|
||||
return True
|
||||
|
||||
|
||||
def send_email_async(to_email: str, subject: str, body_text: str) -> bool:
|
||||
def _worker():
|
||||
try:
|
||||
send_email(to_email, subject, body_text)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
thread = threading.Thread(target=_worker, daemon=True)
|
||||
thread.start()
|
||||
return True
|
||||
|
||||
@ -24,7 +24,7 @@ from app.services.run_service import (
|
||||
update_run_status,
|
||||
)
|
||||
from app.services.auth_service import get_user_by_id
|
||||
from app.services.email_service import send_email
|
||||
from app.services.email_service import send_email_async
|
||||
from psycopg2.extras import Json
|
||||
from psycopg2 import errors
|
||||
|
||||
@ -402,7 +402,7 @@ def start_strategy(req, user_id: str):
|
||||
f"Mode: {mode}\n"
|
||||
f"Run ID: {run_id}\n"
|
||||
)
|
||||
send_email(user["username"], "Strategy started", body)
|
||||
send_email_async(user["username"], "Strategy started", body)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@ -491,7 +491,7 @@ def stop_strategy(user_id: str):
|
||||
user = get_user_by_id(user_id)
|
||||
if user:
|
||||
body = "Your strategy has been stopped."
|
||||
send_email(user["username"], "Strategy stopped", body)
|
||||
send_email_async(user["username"], "Strategy stopped", body)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@ -1,208 +0,0 @@
|
||||
import streamlit as st
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import pandas as pd
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from indian_paper_trading_strategy.engine.history import load_monthly_close
|
||||
from indian_paper_trading_strategy.engine.market import india_market_status
|
||||
from indian_paper_trading_strategy.engine.data import fetch_live_price
|
||||
from indian_paper_trading_strategy.engine.strategy import allocation
|
||||
from indian_paper_trading_strategy.engine.execution import try_execute_sip
|
||||
from indian_paper_trading_strategy.engine.state import load_state, save_state
|
||||
from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_default_user_id, get_active_run_id, set_context
|
||||
from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm
|
||||
|
||||
_STREAMLIT_USER_ID = get_default_user_id()
|
||||
_STREAMLIT_RUN_ID = get_active_run_id(_STREAMLIT_USER_ID) if _STREAMLIT_USER_ID else None
|
||||
if _STREAMLIT_USER_ID and _STREAMLIT_RUN_ID:
|
||||
set_context(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID)
|
||||
|
||||
def reset_runtime_state():
|
||||
def _op(cur, _conn):
|
||||
cur.execute(
|
||||
"DELETE FROM mtm_ledger WHERE user_id = %s AND run_id = %s",
|
||||
(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID),
|
||||
)
|
||||
cur.execute(
|
||||
"DELETE FROM event_ledger WHERE user_id = %s AND run_id = %s",
|
||||
(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID),
|
||||
)
|
||||
cur.execute(
|
||||
"DELETE FROM engine_state WHERE user_id = %s AND run_id = %s",
|
||||
(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID),
|
||||
)
|
||||
insert_engine_event(cur, "LIVE_RESET", data={})
|
||||
|
||||
run_with_retry(_op)
|
||||
|
||||
def load_mtm_df():
|
||||
with db_connection() as conn:
|
||||
return pd.read_sql_query(
|
||||
"SELECT timestamp, pnl FROM mtm_ledger WHERE user_id = %s AND run_id = %s ORDER BY timestamp",
|
||||
conn,
|
||||
params=(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID),
|
||||
)
|
||||
|
||||
def is_engine_running():
|
||||
state = load_state(mode="LIVE")
|
||||
return state.get("total_invested", 0) > 0 or \
|
||||
state.get("nifty_units", 0) > 0 or \
|
||||
state.get("gold_units", 0) > 0
|
||||
|
||||
if "engine_active" not in st.session_state:
|
||||
st.session_state.engine_active = is_engine_running()
|
||||
|
||||
NIFTY = "NIFTYBEES.NS"
|
||||
GOLD = "GOLDBEES.NS"
|
||||
SMA_MONTHS = 36
|
||||
|
||||
def get_prices():
|
||||
try:
|
||||
nifty = fetch_live_price(NIFTY)
|
||||
gold = fetch_live_price(GOLD)
|
||||
return nifty, gold
|
||||
except Exception as e:
|
||||
st.error(e)
|
||||
return None, None
|
||||
|
||||
SIP_AMOUNT = st.number_input("SIP Amount (\u20B9)", 500, 100000, 5000)
|
||||
SIP_INTERVAL_SEC = st.number_input("SIP Interval (sec) [TEST]", 30, 3600, 120)
|
||||
REFRESH_SEC = st.slider("Refresh interval (sec)", 5, 60, 10)
|
||||
|
||||
st.title("SIPXAR INDIA - Phase-1 Safe Engine")
|
||||
|
||||
market_open, market_time = india_market_status()
|
||||
st.info(f"NSE Market {'OPEN' if market_open else 'CLOSED'} | IST {market_time}")
|
||||
if not market_open:
|
||||
st.info("Market is closed. Portfolio values are frozen at last available prices.")
|
||||
|
||||
col1, col2 = st.columns(2)
|
||||
|
||||
with col1:
|
||||
if st.button("START ENGINE"):
|
||||
if is_engine_running():
|
||||
st.info("Engine already running. Resuming.")
|
||||
st.session_state.engine_active = True
|
||||
else:
|
||||
st.session_state.engine_active = True
|
||||
|
||||
# HARD RESET ONLY ON FIRST START
|
||||
reset_runtime_state()
|
||||
|
||||
save_state({
|
||||
"total_invested": 0.0,
|
||||
"nifty_units": 0.0,
|
||||
"gold_units": 0.0,
|
||||
"last_sip_ts": None,
|
||||
}, mode="LIVE", emit_event=True, event_meta={"source": "streamlit_start"})
|
||||
|
||||
st.success("Engine started")
|
||||
|
||||
with col2:
|
||||
if st.button("KILL ENGINE"):
|
||||
st.session_state.engine_active = False
|
||||
|
||||
reset_runtime_state()
|
||||
|
||||
st.warning("Engine killed and state wiped")
|
||||
st.stop()
|
||||
|
||||
if not st.session_state.engine_active:
|
||||
st.stop()
|
||||
|
||||
state = load_state(mode="LIVE")
|
||||
nifty_price, gold_price = get_prices()
|
||||
|
||||
if nifty_price is None:
|
||||
st.stop()
|
||||
|
||||
st.subheader("Latest Market Prices (LTP)")
|
||||
|
||||
c1, c2 = st.columns(2)
|
||||
|
||||
with c1:
|
||||
st.metric(
|
||||
label="NIFTYBEES",
|
||||
value=f"\u20B9{nifty_price:,.2f}",
|
||||
help="Last traded price (delayed)"
|
||||
)
|
||||
|
||||
with c2:
|
||||
st.metric(
|
||||
label="GOLDBEES",
|
||||
value=f"\u20B9{gold_price:,.2f}",
|
||||
help="Last traded price (delayed)"
|
||||
)
|
||||
|
||||
st.caption(f"Price timestamp: {datetime.now().strftime('%H:%M:%S')}")
|
||||
|
||||
nifty_hist = load_monthly_close(NIFTY)
|
||||
gold_hist = load_monthly_close(GOLD)
|
||||
|
||||
nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
||||
gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
||||
|
||||
eq_w, gd_w = allocation(
|
||||
sp_price=nifty_price,
|
||||
gd_price=gold_price,
|
||||
sp_sma=nifty_sma,
|
||||
gd_sma=gold_sma
|
||||
)
|
||||
|
||||
state, executed = try_execute_sip(
|
||||
now=datetime.now(),
|
||||
market_open=market_open,
|
||||
sip_interval=SIP_INTERVAL_SEC,
|
||||
sip_amount=SIP_AMOUNT,
|
||||
sp_price=nifty_price,
|
||||
gd_price=gold_price,
|
||||
eq_w=eq_w,
|
||||
gd_w=gd_w,
|
||||
mode="LIVE",
|
||||
)
|
||||
|
||||
now = datetime.now()
|
||||
|
||||
if market_open and should_log_mtm(None, now):
|
||||
portfolio_value, pnl = log_mtm(
|
||||
nifty_units=state["nifty_units"],
|
||||
gold_units=state["gold_units"],
|
||||
nifty_price=nifty_price,
|
||||
gold_price=gold_price,
|
||||
total_invested=state["total_invested"],
|
||||
)
|
||||
else:
|
||||
# Market closed -> freeze valuation (do NOT log)
|
||||
portfolio_value = (
|
||||
state["nifty_units"] * nifty_price +
|
||||
state["gold_units"] * gold_price
|
||||
)
|
||||
pnl = portfolio_value - state["total_invested"]
|
||||
|
||||
st.subheader("Equity Curve (Unrealized PnL)")
|
||||
|
||||
mtm_df = load_mtm_df()
|
||||
|
||||
if "timestamp" in mtm_df.columns and "pnl" in mtm_df.columns and len(mtm_df) > 1:
|
||||
mtm_df["timestamp"] = pd.to_datetime(mtm_df["timestamp"])
|
||||
mtm_df = mtm_df.sort_values("timestamp").set_index("timestamp")
|
||||
|
||||
st.line_chart(mtm_df["pnl"], height=350)
|
||||
else:
|
||||
st.warning("Not enough MTM data or missing columns. Expected: timestamp, pnl.")
|
||||
|
||||
st.metric("Total Invested", f"\u20B9{state['total_invested']:,.0f}")
|
||||
st.metric("NIFTY Units", round(state["nifty_units"], 4))
|
||||
st.metric("Gold Units", round(state["gold_units"], 4))
|
||||
st.metric("Portfolio Value", f"\u20B9{portfolio_value:,.0f}")
|
||||
st.metric("PnL", f"\u20B9{pnl:,.0f}")
|
||||
|
||||
time.sleep(REFRESH_SEC)
|
||||
st.rerun()
|
||||
|
||||
@ -1 +0,0 @@
|
||||
"""Engine package for the India paper trading strategy."""
|
||||
@ -1,697 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
import hashlib
|
||||
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
from indian_paper_trading_strategy.engine.data import fetch_live_price
|
||||
from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_context
|
||||
|
||||
|
||||
class Broker(ABC):
|
||||
@abstractmethod
|
||||
def place_order(
|
||||
self,
|
||||
symbol: str,
|
||||
side: str,
|
||||
quantity: float,
|
||||
price: float | None = None,
|
||||
logical_time: datetime | None = None,
|
||||
):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_positions(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_orders(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_funds(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def _local_tz():
|
||||
return datetime.now().astimezone().tzinfo
|
||||
|
||||
|
||||
def _format_utc_ts(value: datetime | None):
|
||||
if value is None:
|
||||
return None
|
||||
if value.tzinfo is None:
|
||||
value = value.replace(tzinfo=_local_tz())
|
||||
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def _format_local_ts(value: datetime | None):
|
||||
if value is None:
|
||||
return None
|
||||
if value.tzinfo is None:
|
||||
value = value.replace(tzinfo=_local_tz())
|
||||
return value.astimezone(_local_tz()).replace(tzinfo=None).isoformat()
|
||||
|
||||
|
||||
def _parse_ts(value, assume_local: bool = True):
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
if value.tzinfo is None:
|
||||
return value.replace(tzinfo=_local_tz() if assume_local else timezone.utc)
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
text = value.strip()
|
||||
if not text:
|
||||
return None
|
||||
if text.endswith("Z"):
|
||||
try:
|
||||
return datetime.fromisoformat(text.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
try:
|
||||
parsed = datetime.fromisoformat(text)
|
||||
except ValueError:
|
||||
return None
|
||||
if parsed.tzinfo is None:
|
||||
parsed = parsed.replace(tzinfo=_local_tz() if assume_local else timezone.utc)
|
||||
return parsed
|
||||
return None
|
||||
|
||||
|
||||
def _stable_num(value: float) -> str:
|
||||
return f"{float(value):.12f}"
|
||||
|
||||
|
||||
def _normalize_ts_for_id(ts: datetime) -> str:
|
||||
if ts.tzinfo is None:
|
||||
ts = ts.replace(tzinfo=timezone.utc)
|
||||
return ts.astimezone(timezone.utc).replace(microsecond=0).isoformat()
|
||||
|
||||
|
||||
def _deterministic_id(prefix: str, parts: list[str]) -> str:
|
||||
payload = "|".join(parts)
|
||||
digest = hashlib.sha1(payload.encode("utf-8")).hexdigest()[:16]
|
||||
return f"{prefix}_{digest}"
|
||||
|
||||
|
||||
def _resolve_scope(user_id: str | None, run_id: str | None):
|
||||
return get_context(user_id, run_id)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PaperBroker(Broker):
|
||||
initial_cash: float
|
||||
store_path: str | None = None
|
||||
|
||||
def _default_store(self):
|
||||
return {
|
||||
"cash": float(self.initial_cash),
|
||||
"positions": {},
|
||||
"orders": [],
|
||||
"trades": [],
|
||||
"equity_curve": [],
|
||||
}
|
||||
|
||||
def _load_store(self, cur=None, for_update: bool = False, user_id: str | None = None, run_id: str | None = None):
|
||||
scope_user, scope_run = _resolve_scope(user_id, run_id)
|
||||
if cur is None:
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return self._load_store(
|
||||
cur=cur,
|
||||
for_update=for_update,
|
||||
user_id=scope_user,
|
||||
run_id=scope_run,
|
||||
)
|
||||
|
||||
store = self._default_store()
|
||||
lock_clause = " FOR UPDATE" if for_update else ""
|
||||
cur.execute(
|
||||
f"SELECT cash FROM paper_broker_account WHERE user_id = %s AND run_id = %s{lock_clause} LIMIT 1",
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row and row[0] is not None:
|
||||
store["cash"] = float(row[0])
|
||||
|
||||
cur.execute(
|
||||
f"""
|
||||
SELECT symbol, qty, avg_price, last_price
|
||||
FROM paper_position
|
||||
WHERE user_id = %s AND run_id = %s{lock_clause}
|
||||
"""
|
||||
,
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
positions = {}
|
||||
for symbol, qty, avg_price, last_price in cur.fetchall():
|
||||
positions[symbol] = {
|
||||
"qty": float(qty) if qty is not None else 0.0,
|
||||
"avg_price": float(avg_price) if avg_price is not None else 0.0,
|
||||
"last_price": float(last_price) if last_price is not None else 0.0,
|
||||
}
|
||||
store["positions"] = positions
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT id, symbol, side, qty, price, status, timestamp, logical_time
|
||||
FROM paper_order
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
ORDER BY timestamp, id
|
||||
"""
|
||||
,
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
orders = []
|
||||
for order_id, symbol, side, qty, price, status, ts, logical_ts in cur.fetchall():
|
||||
orders.append(
|
||||
{
|
||||
"id": order_id,
|
||||
"symbol": symbol,
|
||||
"side": side,
|
||||
"qty": float(qty) if qty is not None else 0.0,
|
||||
"price": float(price) if price is not None else 0.0,
|
||||
"status": status,
|
||||
"timestamp": _format_utc_ts(ts),
|
||||
"_logical_time": _format_utc_ts(logical_ts),
|
||||
}
|
||||
)
|
||||
store["orders"] = orders
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT id, order_id, symbol, side, qty, price, timestamp, logical_time
|
||||
FROM paper_trade
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
ORDER BY timestamp, id
|
||||
"""
|
||||
,
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
trades = []
|
||||
for trade_id, order_id, symbol, side, qty, price, ts, logical_ts in cur.fetchall():
|
||||
trades.append(
|
||||
{
|
||||
"id": trade_id,
|
||||
"order_id": order_id,
|
||||
"symbol": symbol,
|
||||
"side": side,
|
||||
"qty": float(qty) if qty is not None else 0.0,
|
||||
"price": float(price) if price is not None else 0.0,
|
||||
"timestamp": _format_utc_ts(ts),
|
||||
"_logical_time": _format_utc_ts(logical_ts),
|
||||
}
|
||||
)
|
||||
store["trades"] = trades
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT timestamp, logical_time, equity, pnl
|
||||
FROM paper_equity_curve
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
ORDER BY timestamp
|
||||
"""
|
||||
,
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
equity_curve = []
|
||||
for ts, logical_ts, equity, pnl in cur.fetchall():
|
||||
equity_curve.append(
|
||||
{
|
||||
"timestamp": _format_local_ts(ts),
|
||||
"_logical_time": _format_local_ts(logical_ts),
|
||||
"equity": float(equity) if equity is not None else 0.0,
|
||||
"pnl": float(pnl) if pnl is not None else 0.0,
|
||||
}
|
||||
)
|
||||
store["equity_curve"] = equity_curve
|
||||
return store
|
||||
|
||||
def _save_store(self, store, cur=None, user_id: str | None = None, run_id: str | None = None):
|
||||
scope_user, scope_run = _resolve_scope(user_id, run_id)
|
||||
if cur is None:
|
||||
def _persist(cur, _conn):
|
||||
self._save_store(store, cur=cur, user_id=scope_user, run_id=scope_run)
|
||||
return run_with_retry(_persist)
|
||||
|
||||
cash = store.get("cash")
|
||||
if cash is not None:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO paper_broker_account (user_id, run_id, cash)
|
||||
VALUES (%s, %s, %s)
|
||||
ON CONFLICT (user_id, run_id) DO UPDATE
|
||||
SET cash = EXCLUDED.cash
|
||||
""",
|
||||
(scope_user, scope_run, float(cash)),
|
||||
)
|
||||
|
||||
positions = store.get("positions")
|
||||
if isinstance(positions, dict):
|
||||
symbols = [s for s in positions.keys() if s]
|
||||
if symbols:
|
||||
cur.execute(
|
||||
"DELETE FROM paper_position WHERE user_id = %s AND run_id = %s AND symbol NOT IN %s",
|
||||
(scope_user, scope_run, tuple(symbols)),
|
||||
)
|
||||
else:
|
||||
cur.execute(
|
||||
"DELETE FROM paper_position WHERE user_id = %s AND run_id = %s",
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
|
||||
if symbols:
|
||||
rows = []
|
||||
updated_at = datetime.now(timezone.utc)
|
||||
for symbol, data in positions.items():
|
||||
if not symbol or not isinstance(data, dict):
|
||||
continue
|
||||
rows.append(
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
symbol,
|
||||
float(data.get("qty", 0.0)),
|
||||
float(data.get("avg_price", 0.0)),
|
||||
float(data.get("last_price", 0.0)),
|
||||
updated_at,
|
||||
)
|
||||
)
|
||||
if rows:
|
||||
execute_values(
|
||||
cur,
|
||||
"""
|
||||
INSERT INTO paper_position (
|
||||
user_id, run_id, symbol, qty, avg_price, last_price, updated_at
|
||||
)
|
||||
VALUES %s
|
||||
ON CONFLICT (user_id, run_id, symbol) DO UPDATE
|
||||
SET qty = EXCLUDED.qty,
|
||||
avg_price = EXCLUDED.avg_price,
|
||||
last_price = EXCLUDED.last_price,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
orders = store.get("orders")
|
||||
if isinstance(orders, list) and orders:
|
||||
rows = []
|
||||
for order in orders:
|
||||
if not isinstance(order, dict):
|
||||
continue
|
||||
order_id = order.get("id")
|
||||
if not order_id:
|
||||
continue
|
||||
ts = _parse_ts(order.get("timestamp"), assume_local=False)
|
||||
logical_ts = _parse_ts(order.get("_logical_time"), assume_local=False) or ts
|
||||
rows.append(
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
order_id,
|
||||
order.get("symbol"),
|
||||
order.get("side"),
|
||||
float(order.get("qty", 0.0)),
|
||||
float(order.get("price", 0.0)),
|
||||
order.get("status"),
|
||||
ts,
|
||||
logical_ts,
|
||||
)
|
||||
)
|
||||
if rows:
|
||||
execute_values(
|
||||
cur,
|
||||
"""
|
||||
INSERT INTO paper_order (
|
||||
user_id, run_id, id, symbol, side, qty, price, status, timestamp, logical_time
|
||||
)
|
||||
VALUES %s
|
||||
ON CONFLICT DO NOTHING
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
trades = store.get("trades")
|
||||
if isinstance(trades, list) and trades:
|
||||
rows = []
|
||||
for trade in trades:
|
||||
if not isinstance(trade, dict):
|
||||
continue
|
||||
trade_id = trade.get("id")
|
||||
if not trade_id:
|
||||
continue
|
||||
ts = _parse_ts(trade.get("timestamp"), assume_local=False)
|
||||
logical_ts = _parse_ts(trade.get("_logical_time"), assume_local=False) or ts
|
||||
rows.append(
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
trade_id,
|
||||
trade.get("order_id"),
|
||||
trade.get("symbol"),
|
||||
trade.get("side"),
|
||||
float(trade.get("qty", 0.0)),
|
||||
float(trade.get("price", 0.0)),
|
||||
ts,
|
||||
logical_ts,
|
||||
)
|
||||
)
|
||||
if rows:
|
||||
execute_values(
|
||||
cur,
|
||||
"""
|
||||
INSERT INTO paper_trade (
|
||||
user_id, run_id, id, order_id, symbol, side, qty, price, timestamp, logical_time
|
||||
)
|
||||
VALUES %s
|
||||
ON CONFLICT DO NOTHING
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
equity_curve = store.get("equity_curve")
|
||||
if isinstance(equity_curve, list) and equity_curve:
|
||||
rows = []
|
||||
for point in equity_curve:
|
||||
if not isinstance(point, dict):
|
||||
continue
|
||||
ts = _parse_ts(point.get("timestamp"), assume_local=True)
|
||||
logical_ts = _parse_ts(point.get("_logical_time"), assume_local=True) or ts
|
||||
if ts is None:
|
||||
continue
|
||||
rows.append(
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
ts,
|
||||
logical_ts,
|
||||
float(point.get("equity", 0.0)),
|
||||
float(point.get("pnl", 0.0)),
|
||||
)
|
||||
)
|
||||
if rows:
|
||||
execute_values(
|
||||
cur,
|
||||
"""
|
||||
INSERT INTO paper_equity_curve (user_id, run_id, timestamp, logical_time, equity, pnl)
|
||||
VALUES %s
|
||||
ON CONFLICT DO NOTHING
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
def get_funds(self, cur=None):
|
||||
store = self._load_store(cur=cur)
|
||||
cash = float(store.get("cash", 0))
|
||||
positions = store.get("positions", {})
|
||||
positions_value = 0.0
|
||||
for position in positions.values():
|
||||
qty = float(position.get("qty", 0))
|
||||
last_price = float(position.get("last_price", position.get("avg_price", 0)))
|
||||
positions_value += qty * last_price
|
||||
total_equity = cash + positions_value
|
||||
return {
|
||||
"cash_available": cash,
|
||||
"invested_value": positions_value,
|
||||
"cash": cash,
|
||||
"used_margin": 0.0,
|
||||
"available": cash,
|
||||
"net": total_equity,
|
||||
"total_equity": total_equity,
|
||||
}
|
||||
|
||||
def get_positions(self, cur=None):
|
||||
store = self._load_store(cur=cur)
|
||||
positions = store.get("positions", {})
|
||||
return [
|
||||
{
|
||||
"symbol": symbol,
|
||||
"qty": float(data.get("qty", 0)),
|
||||
"avg_price": float(data.get("avg_price", 0)),
|
||||
"last_price": float(data.get("last_price", data.get("avg_price", 0))),
|
||||
}
|
||||
for symbol, data in positions.items()
|
||||
]
|
||||
|
||||
def get_orders(self, cur=None):
|
||||
store = self._load_store(cur=cur)
|
||||
orders = []
|
||||
for order in store.get("orders", []):
|
||||
if isinstance(order, dict):
|
||||
order = {k: v for k, v in order.items() if k != "_logical_time"}
|
||||
orders.append(order)
|
||||
return orders
|
||||
|
||||
def get_trades(self, cur=None):
|
||||
store = self._load_store(cur=cur)
|
||||
trades = []
|
||||
for trade in store.get("trades", []):
|
||||
if isinstance(trade, dict):
|
||||
trade = {k: v for k, v in trade.items() if k != "_logical_time"}
|
||||
trades.append(trade)
|
||||
return trades
|
||||
|
||||
def get_equity_curve(self, cur=None):
|
||||
store = self._load_store(cur=cur)
|
||||
points = []
|
||||
for point in store.get("equity_curve", []):
|
||||
if isinstance(point, dict):
|
||||
point = {k: v for k, v in point.items() if k != "_logical_time"}
|
||||
points.append(point)
|
||||
return points
|
||||
|
||||
def _update_equity_in_tx(
|
||||
self,
|
||||
cur,
|
||||
prices: dict[str, float],
|
||||
now: datetime,
|
||||
logical_time: datetime | None = None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
store = self._load_store(cur=cur, for_update=True, user_id=user_id, run_id=run_id)
|
||||
positions = store.get("positions", {})
|
||||
for symbol, price in prices.items():
|
||||
if symbol in positions:
|
||||
positions[symbol]["last_price"] = float(price)
|
||||
|
||||
cash = float(store.get("cash", 0))
|
||||
positions_value = 0.0
|
||||
for symbol, position in positions.items():
|
||||
qty = float(position.get("qty", 0))
|
||||
price = float(position.get("last_price", position.get("avg_price", 0)))
|
||||
positions_value += qty * price
|
||||
|
||||
equity = cash + positions_value
|
||||
pnl = equity - float(self.initial_cash)
|
||||
ts_for_equity = logical_time or now
|
||||
store.setdefault("equity_curve", []).append(
|
||||
{
|
||||
"timestamp": _format_local_ts(ts_for_equity),
|
||||
"_logical_time": _format_local_ts(ts_for_equity),
|
||||
"equity": equity,
|
||||
"pnl": pnl,
|
||||
}
|
||||
)
|
||||
store["positions"] = positions
|
||||
self._save_store(store, cur=cur, user_id=user_id, run_id=run_id)
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"EQUITY_UPDATED",
|
||||
data={
|
||||
"timestamp": _format_utc_ts(ts_for_equity),
|
||||
"equity": equity,
|
||||
"pnl": pnl,
|
||||
},
|
||||
)
|
||||
return equity
|
||||
|
||||
def update_equity(
|
||||
self,
|
||||
prices: dict[str, float],
|
||||
now: datetime,
|
||||
cur=None,
|
||||
logical_time: datetime | None = None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
if cur is not None:
|
||||
return self._update_equity_in_tx(
|
||||
cur,
|
||||
prices,
|
||||
now,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
def _op(cur, _conn):
|
||||
return self._update_equity_in_tx(
|
||||
cur,
|
||||
prices,
|
||||
now,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
def _place_order_in_tx(
|
||||
self,
|
||||
cur,
|
||||
symbol: str,
|
||||
side: str,
|
||||
quantity: float,
|
||||
price: float | None,
|
||||
logical_time: datetime | None = None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
scope_user, scope_run = _resolve_scope(user_id, run_id)
|
||||
store = self._load_store(cur=cur, for_update=True, user_id=scope_user, run_id=scope_run)
|
||||
side = side.upper().strip()
|
||||
qty = float(quantity)
|
||||
if price is None:
|
||||
price = fetch_live_price(symbol)
|
||||
price = float(price)
|
||||
|
||||
logical_ts = logical_time or datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
timestamp = logical_ts
|
||||
timestamp_str = _format_utc_ts(timestamp)
|
||||
logical_ts_str = _format_utc_ts(logical_ts)
|
||||
order_id = _deterministic_id(
|
||||
"ord",
|
||||
[
|
||||
scope_user,
|
||||
scope_run,
|
||||
_normalize_ts_for_id(logical_ts),
|
||||
symbol,
|
||||
side,
|
||||
_stable_num(qty),
|
||||
_stable_num(price),
|
||||
],
|
||||
)
|
||||
|
||||
order = {
|
||||
"id": order_id,
|
||||
"symbol": symbol,
|
||||
"side": side,
|
||||
"qty": qty,
|
||||
"price": price,
|
||||
"status": "REJECTED",
|
||||
"timestamp": timestamp_str,
|
||||
"_logical_time": logical_ts_str,
|
||||
}
|
||||
|
||||
if qty <= 0 or price <= 0:
|
||||
store.setdefault("orders", []).append(order)
|
||||
self._save_store(store, cur=cur, user_id=user_id, run_id=run_id)
|
||||
insert_engine_event(cur, "ORDER_PLACED", data=order)
|
||||
return order
|
||||
|
||||
positions = store.get("positions", {})
|
||||
cash = float(store.get("cash", 0))
|
||||
trade = None
|
||||
|
||||
if side == "BUY":
|
||||
cost = qty * price
|
||||
if cash >= cost:
|
||||
cash -= cost
|
||||
existing = positions.get(symbol, {"qty": 0.0, "avg_price": 0.0, "last_price": price})
|
||||
new_qty = float(existing.get("qty", 0)) + qty
|
||||
prev_cost = float(existing.get("qty", 0)) * float(existing.get("avg_price", 0))
|
||||
avg_price = (prev_cost + cost) / new_qty if new_qty else price
|
||||
positions[symbol] = {
|
||||
"qty": new_qty,
|
||||
"avg_price": avg_price,
|
||||
"last_price": price,
|
||||
}
|
||||
order["status"] = "FILLED"
|
||||
trade = {
|
||||
"id": _deterministic_id("trd", [order_id]),
|
||||
"order_id": order_id,
|
||||
"symbol": symbol,
|
||||
"side": side,
|
||||
"qty": qty,
|
||||
"price": price,
|
||||
"timestamp": timestamp_str,
|
||||
"_logical_time": logical_ts_str,
|
||||
}
|
||||
store.setdefault("trades", []).append(trade)
|
||||
elif side == "SELL":
|
||||
existing = positions.get(symbol)
|
||||
if existing and float(existing.get("qty", 0)) >= qty:
|
||||
cash += qty * price
|
||||
remaining = float(existing.get("qty", 0)) - qty
|
||||
if remaining > 0:
|
||||
existing["qty"] = remaining
|
||||
existing["last_price"] = price
|
||||
positions[symbol] = existing
|
||||
else:
|
||||
positions.pop(symbol, None)
|
||||
order["status"] = "FILLED"
|
||||
trade = {
|
||||
"id": _deterministic_id("trd", [order_id]),
|
||||
"order_id": order_id,
|
||||
"symbol": symbol,
|
||||
"side": side,
|
||||
"qty": qty,
|
||||
"price": price,
|
||||
"timestamp": timestamp_str,
|
||||
"_logical_time": logical_ts_str,
|
||||
}
|
||||
store.setdefault("trades", []).append(trade)
|
||||
|
||||
store["cash"] = cash
|
||||
store["positions"] = positions
|
||||
store.setdefault("orders", []).append(order)
|
||||
self._save_store(store, cur=cur, user_id=user_id, run_id=run_id)
|
||||
insert_engine_event(cur, "ORDER_PLACED", data=order)
|
||||
if trade is not None:
|
||||
insert_engine_event(cur, "TRADE_EXECUTED", data=trade)
|
||||
insert_engine_event(cur, "ORDER_FILLED", data={"order_id": order_id})
|
||||
return order
|
||||
|
||||
def place_order(
|
||||
self,
|
||||
symbol: str,
|
||||
side: str,
|
||||
quantity: float,
|
||||
price: float | None = None,
|
||||
cur=None,
|
||||
logical_time: datetime | None = None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
if cur is not None:
|
||||
return self._place_order_in_tx(
|
||||
cur,
|
||||
symbol,
|
||||
side,
|
||||
quantity,
|
||||
price,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
def _op(cur, _conn):
|
||||
return self._place_order_in_tx(
|
||||
cur,
|
||||
symbol,
|
||||
side,
|
||||
quantity,
|
||||
price,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
@ -1,150 +0,0 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from indian_paper_trading_strategy.engine.db import db_connection, get_context
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"active": False,
|
||||
"sip_amount": 0,
|
||||
"sip_frequency": {"value": 30, "unit": "days"},
|
||||
"next_run": None
|
||||
}
|
||||
|
||||
def _maybe_parse_json(value):
|
||||
if value is None:
|
||||
return None
|
||||
if not isinstance(value, str):
|
||||
return value
|
||||
text = value.strip()
|
||||
if not text:
|
||||
return None
|
||||
try:
|
||||
return json.loads(text)
|
||||
except Exception:
|
||||
return value
|
||||
|
||||
|
||||
def _format_ts(value: datetime | None):
|
||||
if value is None:
|
||||
return None
|
||||
return value.isoformat()
|
||||
|
||||
|
||||
def load_strategy_config(user_id: str | None = None, run_id: str | None = None):
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit,
|
||||
mode, broker, active, frequency, frequency_days, unit, next_run
|
||||
FROM strategy_config
|
||||
WHERE user_id = %s AND run_id = %s
|
||||
LIMIT 1
|
||||
""",
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return DEFAULT_CONFIG.copy()
|
||||
|
||||
cfg = DEFAULT_CONFIG.copy()
|
||||
cfg["strategy"] = row[0]
|
||||
cfg["strategy_name"] = row[0]
|
||||
cfg["sip_amount"] = float(row[1]) if row[1] is not None else cfg.get("sip_amount")
|
||||
cfg["mode"] = row[4]
|
||||
cfg["broker"] = row[5]
|
||||
cfg["active"] = row[6] if row[6] is not None else cfg.get("active")
|
||||
cfg["frequency"] = _maybe_parse_json(row[7])
|
||||
cfg["frequency_days"] = row[8]
|
||||
cfg["unit"] = row[9]
|
||||
cfg["next_run"] = _format_ts(row[10])
|
||||
if row[2] is not None or row[3] is not None:
|
||||
cfg["sip_frequency"] = {"value": row[2], "unit": row[3]}
|
||||
else:
|
||||
value = cfg.get("frequency")
|
||||
unit = cfg.get("unit")
|
||||
if isinstance(value, dict):
|
||||
unit = value.get("unit", unit)
|
||||
value = value.get("value")
|
||||
if value is None and cfg.get("frequency_days") is not None:
|
||||
value = cfg.get("frequency_days")
|
||||
unit = unit or "days"
|
||||
if value is not None and unit:
|
||||
cfg["sip_frequency"] = {"value": value, "unit": unit}
|
||||
return cfg
|
||||
|
||||
def save_strategy_config(cfg, user_id: str | None = None, run_id: str | None = None):
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
sip_frequency = cfg.get("sip_frequency")
|
||||
sip_value = None
|
||||
sip_unit = None
|
||||
if isinstance(sip_frequency, dict):
|
||||
sip_value = sip_frequency.get("value")
|
||||
sip_unit = sip_frequency.get("unit")
|
||||
|
||||
frequency = cfg.get("frequency")
|
||||
if not isinstance(frequency, str) and frequency is not None:
|
||||
frequency = json.dumps(frequency)
|
||||
|
||||
next_run = cfg.get("next_run")
|
||||
next_run_dt = None
|
||||
if isinstance(next_run, str):
|
||||
try:
|
||||
next_run_dt = datetime.fromisoformat(next_run)
|
||||
except ValueError:
|
||||
next_run_dt = None
|
||||
|
||||
strategy = cfg.get("strategy") or cfg.get("strategy_name")
|
||||
|
||||
with db_connection() as conn:
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO strategy_config (
|
||||
user_id,
|
||||
run_id,
|
||||
strategy,
|
||||
sip_amount,
|
||||
sip_frequency_value,
|
||||
sip_frequency_unit,
|
||||
mode,
|
||||
broker,
|
||||
active,
|
||||
frequency,
|
||||
frequency_days,
|
||||
unit,
|
||||
next_run
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (user_id, run_id) DO UPDATE
|
||||
SET strategy = EXCLUDED.strategy,
|
||||
sip_amount = EXCLUDED.sip_amount,
|
||||
sip_frequency_value = EXCLUDED.sip_frequency_value,
|
||||
sip_frequency_unit = EXCLUDED.sip_frequency_unit,
|
||||
mode = EXCLUDED.mode,
|
||||
broker = EXCLUDED.broker,
|
||||
active = EXCLUDED.active,
|
||||
frequency = EXCLUDED.frequency,
|
||||
frequency_days = EXCLUDED.frequency_days,
|
||||
unit = EXCLUDED.unit,
|
||||
next_run = EXCLUDED.next_run
|
||||
""",
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
strategy,
|
||||
cfg.get("sip_amount"),
|
||||
sip_value,
|
||||
sip_unit,
|
||||
cfg.get("mode"),
|
||||
cfg.get("broker"),
|
||||
cfg.get("active"),
|
||||
frequency,
|
||||
cfg.get("frequency_days"),
|
||||
cfg.get("unit"),
|
||||
next_run_dt,
|
||||
),
|
||||
)
|
||||
|
||||
@ -1,81 +0,0 @@
|
||||
# engine/data.py
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
import os
|
||||
import threading
|
||||
|
||||
import pandas as pd
|
||||
import yfinance as yf
|
||||
|
||||
ENGINE_ROOT = Path(__file__).resolve().parents[1]
|
||||
HISTORY_DIR = ENGINE_ROOT / "storage" / "history"
|
||||
ALLOW_PRICE_CACHE = os.getenv("ALLOW_PRICE_CACHE", "0").strip().lower() in {"1", "true", "yes"}
|
||||
|
||||
_LAST_PRICE: dict[str, dict[str, object]] = {}
|
||||
_LAST_PRICE_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def _set_last_price(ticker: str, price: float, source: str):
|
||||
now = datetime.now(timezone.utc)
|
||||
with _LAST_PRICE_LOCK:
|
||||
_LAST_PRICE[ticker] = {"price": float(price), "source": source, "ts": now}
|
||||
|
||||
|
||||
def get_price_snapshot(ticker: str) -> dict[str, object] | None:
|
||||
with _LAST_PRICE_LOCK:
|
||||
data = _LAST_PRICE.get(ticker)
|
||||
if not data:
|
||||
return None
|
||||
return dict(data)
|
||||
|
||||
|
||||
def _get_last_live_price(ticker: str) -> float | None:
|
||||
with _LAST_PRICE_LOCK:
|
||||
data = _LAST_PRICE.get(ticker)
|
||||
if not data:
|
||||
return None
|
||||
if data.get("source") == "live":
|
||||
return float(data.get("price", 0))
|
||||
return None
|
||||
|
||||
|
||||
def _cached_last_close(ticker: str) -> float | None:
|
||||
file = HISTORY_DIR / f"{ticker}.csv"
|
||||
if not file.exists():
|
||||
return None
|
||||
df = pd.read_csv(file)
|
||||
if df.empty or "Close" not in df.columns:
|
||||
return None
|
||||
return float(df["Close"].iloc[-1])
|
||||
|
||||
|
||||
def fetch_live_price(ticker, allow_cache: bool | None = None):
|
||||
if allow_cache is None:
|
||||
allow_cache = ALLOW_PRICE_CACHE
|
||||
try:
|
||||
df = yf.download(
|
||||
ticker,
|
||||
period="1d",
|
||||
interval="1m",
|
||||
auto_adjust=True,
|
||||
progress=False,
|
||||
timeout=5,
|
||||
)
|
||||
if df is not None and not df.empty:
|
||||
price = float(df["Close"].iloc[-1])
|
||||
_set_last_price(ticker, price, "live")
|
||||
return price
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if allow_cache:
|
||||
last_live = _get_last_live_price(ticker)
|
||||
if last_live is not None:
|
||||
return last_live
|
||||
|
||||
cached = _cached_last_close(ticker)
|
||||
if cached is not None:
|
||||
_set_last_price(ticker, cached, "cache")
|
||||
return cached
|
||||
|
||||
raise RuntimeError(f"No live data for {ticker}")
|
||||
@ -1,198 +0,0 @@
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from indian_paper_trading_strategy.engine.db import (
|
||||
run_with_retry,
|
||||
insert_engine_event,
|
||||
get_default_user_id,
|
||||
get_active_run_id,
|
||||
get_running_runs,
|
||||
engine_context,
|
||||
)
|
||||
|
||||
def log_event(event: str, data: dict | None = None):
|
||||
now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
payload = data or {}
|
||||
|
||||
def _op(cur, _conn):
|
||||
insert_engine_event(cur, event, data=payload, ts=now)
|
||||
|
||||
run_with_retry(_op)
|
||||
|
||||
def _update_engine_status(user_id: str, run_id: str, status: str):
|
||||
now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
|
||||
def _op(cur, _conn):
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO engine_status (user_id, run_id, status, last_updated)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
ON CONFLICT (user_id, run_id) DO UPDATE
|
||||
SET status = EXCLUDED.status,
|
||||
last_updated = EXCLUDED.last_updated
|
||||
""",
|
||||
(user_id, run_id, status, now),
|
||||
)
|
||||
|
||||
run_with_retry(_op)
|
||||
|
||||
from indian_paper_trading_strategy.engine.config import load_strategy_config, save_strategy_config
|
||||
from indian_paper_trading_strategy.engine.market import india_market_status
|
||||
from indian_paper_trading_strategy.engine.execution import try_execute_sip
|
||||
from indian_paper_trading_strategy.engine.state import load_state
|
||||
from indian_paper_trading_strategy.engine.broker import PaperBroker
|
||||
from indian_paper_trading_strategy.engine.data import fetch_live_price
|
||||
from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm
|
||||
from indian_paper_trading_strategy.engine.history import load_monthly_close
|
||||
from indian_paper_trading_strategy.engine.strategy import allocation
|
||||
from indian_paper_trading_strategy.engine.time_utils import frequency_to_timedelta, normalize_logical_time
|
||||
|
||||
NIFTY = "NIFTYBEES.NS"
|
||||
GOLD = "GOLDBEES.NS"
|
||||
SMA_MONTHS = 36
|
||||
|
||||
def run_engine(user_id: str | None = None, run_id: str | None = None):
|
||||
print("Strategy engine started")
|
||||
active_runs: dict[tuple[str, str], bool] = {}
|
||||
|
||||
if run_id and not user_id:
|
||||
raise ValueError("user_id is required when run_id is provided")
|
||||
|
||||
while True:
|
||||
try:
|
||||
if user_id and run_id:
|
||||
runs = [(user_id, run_id)]
|
||||
elif user_id:
|
||||
runs = get_running_runs(user_id)
|
||||
else:
|
||||
runs = get_running_runs()
|
||||
if not runs:
|
||||
default_user = get_default_user_id()
|
||||
if default_user:
|
||||
runs = get_running_runs(default_user)
|
||||
|
||||
seen = set()
|
||||
for scope_user, scope_run in runs:
|
||||
if not scope_user or not scope_run:
|
||||
continue
|
||||
seen.add((scope_user, scope_run))
|
||||
with engine_context(scope_user, scope_run):
|
||||
cfg = load_strategy_config(user_id=scope_user, run_id=scope_run)
|
||||
if not cfg.get("active"):
|
||||
continue
|
||||
|
||||
strategy_name = cfg.get("strategy_name", "golden_nifty")
|
||||
sip_amount = cfg.get("sip_amount", 0)
|
||||
configured_frequency = cfg.get("sip_frequency") or {}
|
||||
if not isinstance(configured_frequency, dict):
|
||||
configured_frequency = {}
|
||||
frequency_value = int(configured_frequency.get("value", cfg.get("frequency", 0)))
|
||||
frequency_unit = configured_frequency.get("unit", cfg.get("unit", "days"))
|
||||
frequency_info = {"value": frequency_value, "unit": frequency_unit}
|
||||
frequency_label = f"{frequency_value} {frequency_unit}"
|
||||
|
||||
if not active_runs.get((scope_user, scope_run)):
|
||||
log_event(
|
||||
"ENGINE_START",
|
||||
{
|
||||
"strategy": strategy_name,
|
||||
"sip_amount": sip_amount,
|
||||
"frequency": frequency_label,
|
||||
},
|
||||
)
|
||||
active_runs[(scope_user, scope_run)] = True
|
||||
|
||||
_update_engine_status(scope_user, scope_run, "RUNNING")
|
||||
|
||||
market_open, _ = india_market_status()
|
||||
if not market_open:
|
||||
log_event("MARKET_CLOSED", {"reason": "Outside market hours"})
|
||||
continue
|
||||
|
||||
now = datetime.now()
|
||||
mode = (cfg.get("mode") or "PAPER").strip().upper()
|
||||
if mode not in {"PAPER", "LIVE"}:
|
||||
mode = "PAPER"
|
||||
state = load_state(mode=mode)
|
||||
initial_cash = float(state.get("initial_cash") or 0.0)
|
||||
broker = PaperBroker(initial_cash=initial_cash) if mode == "PAPER" else None
|
||||
|
||||
nifty_price = fetch_live_price(NIFTY)
|
||||
gold_price = fetch_live_price(GOLD)
|
||||
|
||||
next_run = cfg.get("next_run")
|
||||
if next_run is None or now >= datetime.fromisoformat(next_run):
|
||||
nifty_hist = load_monthly_close(NIFTY)
|
||||
gold_hist = load_monthly_close(GOLD)
|
||||
|
||||
nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
||||
gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
||||
|
||||
eq_w, gd_w = allocation(
|
||||
sp_price=nifty_price,
|
||||
gd_price=gold_price,
|
||||
sp_sma=nifty_sma,
|
||||
gd_sma=gold_sma,
|
||||
)
|
||||
|
||||
weights = {"equity": eq_w, "gold": gd_w}
|
||||
state, executed = try_execute_sip(
|
||||
now=now,
|
||||
market_open=True,
|
||||
sip_interval=frequency_to_timedelta(frequency_info).total_seconds(),
|
||||
sip_amount=sip_amount,
|
||||
sp_price=nifty_price,
|
||||
gd_price=gold_price,
|
||||
eq_w=eq_w,
|
||||
gd_w=gd_w,
|
||||
broker=broker,
|
||||
mode=mode,
|
||||
)
|
||||
|
||||
if executed:
|
||||
log_event(
|
||||
"SIP_TRIGGERED",
|
||||
{
|
||||
"date": now.date().isoformat(),
|
||||
"allocation": weights,
|
||||
"cash_used": sip_amount,
|
||||
},
|
||||
)
|
||||
portfolio_value = (
|
||||
state["nifty_units"] * nifty_price
|
||||
+ state["gold_units"] * gold_price
|
||||
)
|
||||
log_event(
|
||||
"PORTFOLIO_UPDATED",
|
||||
{
|
||||
"nifty_units": state["nifty_units"],
|
||||
"gold_units": state["gold_units"],
|
||||
"portfolio_value": portfolio_value,
|
||||
},
|
||||
)
|
||||
cfg["next_run"] = (now + frequency_to_timedelta(frequency_info)).isoformat()
|
||||
save_strategy_config(cfg, user_id=scope_user, run_id=scope_run)
|
||||
|
||||
if should_log_mtm(None, now):
|
||||
state = load_state(mode=mode)
|
||||
log_mtm(
|
||||
nifty_units=state["nifty_units"],
|
||||
gold_units=state["gold_units"],
|
||||
nifty_price=nifty_price,
|
||||
gold_price=gold_price,
|
||||
total_invested=state["total_invested"],
|
||||
logical_time=normalize_logical_time(now),
|
||||
)
|
||||
|
||||
for key in list(active_runs.keys()):
|
||||
if key not in seen:
|
||||
active_runs.pop(key, None)
|
||||
|
||||
time.sleep(30)
|
||||
except Exception as e:
|
||||
log_event("ENGINE_ERROR", {"error": str(e)})
|
||||
raise
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_engine()
|
||||
|
||||
@ -1,157 +0,0 @@
|
||||
# engine/execution.py
|
||||
from datetime import datetime, timezone
|
||||
from indian_paper_trading_strategy.engine.state import load_state, save_state
|
||||
|
||||
from indian_paper_trading_strategy.engine.broker import Broker
|
||||
from indian_paper_trading_strategy.engine.ledger import log_event, event_exists
|
||||
from indian_paper_trading_strategy.engine.db import run_with_retry
|
||||
from indian_paper_trading_strategy.engine.time_utils import compute_logical_time
|
||||
|
||||
def _as_float(value):
|
||||
if hasattr(value, "item"):
|
||||
try:
|
||||
return float(value.item())
|
||||
except Exception:
|
||||
pass
|
||||
if hasattr(value, "iloc"):
|
||||
try:
|
||||
return float(value.iloc[-1])
|
||||
except Exception:
|
||||
pass
|
||||
return float(value)
|
||||
|
||||
def _local_tz():
|
||||
return datetime.now().astimezone().tzinfo
|
||||
|
||||
def try_execute_sip(
|
||||
now,
|
||||
market_open,
|
||||
sip_interval,
|
||||
sip_amount,
|
||||
sp_price,
|
||||
gd_price,
|
||||
eq_w,
|
||||
gd_w,
|
||||
broker: Broker | None = None,
|
||||
mode: str | None = "LIVE",
|
||||
):
|
||||
def _op(cur, _conn):
|
||||
if now.tzinfo is None:
|
||||
now_ts = now.replace(tzinfo=_local_tz())
|
||||
else:
|
||||
now_ts = now
|
||||
event_ts = now_ts
|
||||
log_event("DEBUG_ENTER_TRY_EXECUTE", {
|
||||
"now": now_ts.isoformat(),
|
||||
}, cur=cur, ts=event_ts)
|
||||
|
||||
state = load_state(mode=mode, cur=cur, for_update=True)
|
||||
|
||||
force_execute = state.get("last_sip_ts") is None
|
||||
|
||||
if not market_open:
|
||||
return state, False
|
||||
|
||||
last = state.get("last_sip_ts") or state.get("last_run")
|
||||
if last and not force_execute:
|
||||
try:
|
||||
last_dt = datetime.fromisoformat(last)
|
||||
except ValueError:
|
||||
last_dt = None
|
||||
if last_dt:
|
||||
if last_dt.tzinfo is None:
|
||||
last_dt = last_dt.replace(tzinfo=_local_tz())
|
||||
if now_ts.tzinfo and last_dt.tzinfo and last_dt.tzinfo != now_ts.tzinfo:
|
||||
last_dt = last_dt.astimezone(now_ts.tzinfo)
|
||||
if last_dt and (now_ts - last_dt).total_seconds() < sip_interval:
|
||||
return state, False
|
||||
|
||||
logical_time = compute_logical_time(now_ts, last, sip_interval)
|
||||
if event_exists("SIP_EXECUTED", logical_time, cur=cur):
|
||||
return state, False
|
||||
|
||||
sp_price_val = _as_float(sp_price)
|
||||
gd_price_val = _as_float(gd_price)
|
||||
eq_w_val = _as_float(eq_w)
|
||||
gd_w_val = _as_float(gd_w)
|
||||
sip_amount_val = _as_float(sip_amount)
|
||||
|
||||
nifty_qty = (sip_amount_val * eq_w_val) / sp_price_val
|
||||
gold_qty = (sip_amount_val * gd_w_val) / gd_price_val
|
||||
|
||||
if broker is None:
|
||||
return state, False
|
||||
|
||||
funds = broker.get_funds(cur=cur)
|
||||
cash = funds.get("cash")
|
||||
if cash is not None and float(cash) < sip_amount_val:
|
||||
return state, False
|
||||
|
||||
log_event("DEBUG_EXECUTION_DECISION", {
|
||||
"force_execute": force_execute,
|
||||
"last_sip_ts": state.get("last_sip_ts"),
|
||||
"now": now_ts.isoformat(),
|
||||
}, cur=cur, ts=event_ts)
|
||||
|
||||
nifty_order = broker.place_order(
|
||||
"NIFTYBEES.NS",
|
||||
"BUY",
|
||||
nifty_qty,
|
||||
sp_price_val,
|
||||
cur=cur,
|
||||
logical_time=logical_time,
|
||||
)
|
||||
gold_order = broker.place_order(
|
||||
"GOLDBEES.NS",
|
||||
"BUY",
|
||||
gold_qty,
|
||||
gd_price_val,
|
||||
cur=cur,
|
||||
logical_time=logical_time,
|
||||
)
|
||||
orders = [nifty_order, gold_order]
|
||||
executed = all(
|
||||
isinstance(order, dict) and order.get("status") == "FILLED"
|
||||
for order in orders
|
||||
)
|
||||
if not executed:
|
||||
return state, False
|
||||
assert len(orders) > 0, "executed=True but no broker orders placed"
|
||||
|
||||
funds_after = broker.get_funds(cur=cur)
|
||||
cash_after = funds_after.get("cash")
|
||||
if cash_after is not None:
|
||||
state["cash"] = float(cash_after)
|
||||
|
||||
state["nifty_units"] += nifty_qty
|
||||
state["gold_units"] += gold_qty
|
||||
state["total_invested"] += sip_amount_val
|
||||
state["last_sip_ts"] = now_ts.isoformat()
|
||||
state["last_run"] = now_ts.isoformat()
|
||||
|
||||
save_state(
|
||||
state,
|
||||
mode=mode,
|
||||
cur=cur,
|
||||
emit_event=True,
|
||||
event_meta={"source": "sip"},
|
||||
)
|
||||
|
||||
log_event(
|
||||
"SIP_EXECUTED",
|
||||
{
|
||||
"nifty_units": nifty_qty,
|
||||
"gold_units": gold_qty,
|
||||
"nifty_price": sp_price_val,
|
||||
"gold_price": gd_price_val,
|
||||
"amount": sip_amount_val,
|
||||
},
|
||||
cur=cur,
|
||||
ts=event_ts,
|
||||
logical_time=logical_time,
|
||||
)
|
||||
|
||||
return state, True
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
@ -1,34 +0,0 @@
|
||||
# engine/history.py
|
||||
import yfinance as yf
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
|
||||
ENGINE_ROOT = Path(__file__).resolve().parents[1]
|
||||
STORAGE_DIR = ENGINE_ROOT / "storage"
|
||||
STORAGE_DIR.mkdir(exist_ok=True)
|
||||
|
||||
CACHE_DIR = STORAGE_DIR / "history"
|
||||
CACHE_DIR.mkdir(exist_ok=True)
|
||||
|
||||
def load_monthly_close(ticker, years=10):
|
||||
file = CACHE_DIR / f"{ticker}.csv"
|
||||
|
||||
if file.exists():
|
||||
df = pd.read_csv(file, parse_dates=["Date"], index_col="Date")
|
||||
return df["Close"]
|
||||
|
||||
df = yf.download(
|
||||
ticker,
|
||||
period=f"{years}y",
|
||||
auto_adjust=True,
|
||||
progress=False,
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
if df.empty:
|
||||
raise RuntimeError(f"No history for {ticker}")
|
||||
|
||||
series = df["Close"].resample("M").last()
|
||||
series.to_csv(file, header=["Close"])
|
||||
|
||||
return series
|
||||
@ -1,113 +0,0 @@
|
||||
# engine/ledger.py
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry, get_context
|
||||
from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time
|
||||
|
||||
|
||||
def _event_exists_in_tx(cur, event, logical_time, user_id: str | None = None, run_id: str | None = None):
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
logical_ts = normalize_logical_time(logical_time)
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM event_ledger
|
||||
WHERE user_id = %s AND run_id = %s AND event = %s AND logical_time = %s
|
||||
LIMIT 1
|
||||
""",
|
||||
(scope_user, scope_run, event, logical_ts),
|
||||
)
|
||||
return cur.fetchone() is not None
|
||||
|
||||
|
||||
def event_exists(event, logical_time, *, cur=None, user_id: str | None = None, run_id: str | None = None):
|
||||
if cur is not None:
|
||||
return _event_exists_in_tx(cur, event, logical_time, user_id=user_id, run_id=run_id)
|
||||
|
||||
def _op(cur, _conn):
|
||||
return _event_exists_in_tx(cur, event, logical_time, user_id=user_id, run_id=run_id)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
|
||||
def _log_event_in_tx(
|
||||
cur,
|
||||
event,
|
||||
payload,
|
||||
ts,
|
||||
logical_time=None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
logical_ts = normalize_logical_time(logical_time or ts)
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO event_ledger (
|
||||
user_id,
|
||||
run_id,
|
||||
timestamp,
|
||||
logical_time,
|
||||
event,
|
||||
nifty_units,
|
||||
gold_units,
|
||||
nifty_price,
|
||||
gold_price,
|
||||
amount
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT DO NOTHING
|
||||
""",
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
ts,
|
||||
logical_ts,
|
||||
event,
|
||||
payload.get("nifty_units"),
|
||||
payload.get("gold_units"),
|
||||
payload.get("nifty_price"),
|
||||
payload.get("gold_price"),
|
||||
payload.get("amount"),
|
||||
),
|
||||
)
|
||||
if cur.rowcount:
|
||||
insert_engine_event(cur, event, data=payload, ts=ts)
|
||||
|
||||
|
||||
def log_event(
|
||||
event,
|
||||
payload,
|
||||
*,
|
||||
cur=None,
|
||||
ts=None,
|
||||
logical_time=None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
now = ts or logical_time or datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
if cur is not None:
|
||||
_log_event_in_tx(
|
||||
cur,
|
||||
event,
|
||||
payload,
|
||||
now,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
return
|
||||
|
||||
def _op(cur, _conn):
|
||||
_log_event_in_tx(
|
||||
cur,
|
||||
event,
|
||||
payload,
|
||||
now,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
@ -1,42 +0,0 @@
|
||||
# engine/market.py
|
||||
from datetime import datetime, time as dtime, timedelta
|
||||
import pytz
|
||||
|
||||
_MARKET_TZ = pytz.timezone("Asia/Kolkata")
|
||||
_OPEN_T = dtime(9, 15)
|
||||
_CLOSE_T = dtime(15, 30)
|
||||
|
||||
def _as_market_tz(value: datetime) -> datetime:
|
||||
if value.tzinfo is None:
|
||||
return _MARKET_TZ.localize(value)
|
||||
return value.astimezone(_MARKET_TZ)
|
||||
|
||||
def is_market_open(now: datetime) -> bool:
|
||||
now = _as_market_tz(now)
|
||||
return now.weekday() < 5 and _OPEN_T <= now.time() <= _CLOSE_T
|
||||
|
||||
def india_market_status():
|
||||
now = datetime.now(_MARKET_TZ)
|
||||
|
||||
return is_market_open(now), now
|
||||
|
||||
def next_market_open_after(value: datetime) -> datetime:
|
||||
current = _as_market_tz(value)
|
||||
while current.weekday() >= 5:
|
||||
current = current + timedelta(days=1)
|
||||
current = current.replace(hour=_OPEN_T.hour, minute=_OPEN_T.minute, second=0, microsecond=0)
|
||||
if current.time() < _OPEN_T:
|
||||
return current.replace(hour=_OPEN_T.hour, minute=_OPEN_T.minute, second=0, microsecond=0)
|
||||
if current.time() > _CLOSE_T:
|
||||
current = current + timedelta(days=1)
|
||||
while current.weekday() >= 5:
|
||||
current = current + timedelta(days=1)
|
||||
return current.replace(hour=_OPEN_T.hour, minute=_OPEN_T.minute, second=0, microsecond=0)
|
||||
return current
|
||||
|
||||
def align_to_market_open(value: datetime) -> datetime:
|
||||
current = _as_market_tz(value)
|
||||
aligned = current if is_market_open(current) else next_market_open_after(current)
|
||||
if value.tzinfo is None:
|
||||
return aligned.replace(tzinfo=None)
|
||||
return aligned
|
||||
@ -1,154 +0,0 @@
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_context
|
||||
from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time
|
||||
|
||||
ENGINE_ROOT = Path(__file__).resolve().parents[1]
|
||||
STORAGE_DIR = ENGINE_ROOT / "storage"
|
||||
MTM_FILE = STORAGE_DIR / "mtm_ledger.csv"
|
||||
|
||||
MTM_INTERVAL_SECONDS = 60
|
||||
|
||||
def _log_mtm_in_tx(
|
||||
cur,
|
||||
nifty_units,
|
||||
gold_units,
|
||||
nifty_price,
|
||||
gold_price,
|
||||
total_invested,
|
||||
ts,
|
||||
logical_time=None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
logical_ts = normalize_logical_time(logical_time or ts)
|
||||
nifty_value = nifty_units * nifty_price
|
||||
gold_value = gold_units * gold_price
|
||||
portfolio_value = nifty_value + gold_value
|
||||
pnl = portfolio_value - total_invested
|
||||
|
||||
row = {
|
||||
"timestamp": ts.isoformat(),
|
||||
"logical_time": logical_ts.isoformat(),
|
||||
"nifty_units": nifty_units,
|
||||
"gold_units": gold_units,
|
||||
"nifty_price": nifty_price,
|
||||
"gold_price": gold_price,
|
||||
"nifty_value": nifty_value,
|
||||
"gold_value": gold_value,
|
||||
"portfolio_value": portfolio_value,
|
||||
"total_invested": total_invested,
|
||||
"pnl": pnl,
|
||||
}
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO mtm_ledger (
|
||||
user_id,
|
||||
run_id,
|
||||
timestamp,
|
||||
logical_time,
|
||||
nifty_units,
|
||||
gold_units,
|
||||
nifty_price,
|
||||
gold_price,
|
||||
nifty_value,
|
||||
gold_value,
|
||||
portfolio_value,
|
||||
total_invested,
|
||||
pnl
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT DO NOTHING
|
||||
""",
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
ts,
|
||||
logical_ts,
|
||||
row["nifty_units"],
|
||||
row["gold_units"],
|
||||
row["nifty_price"],
|
||||
row["gold_price"],
|
||||
row["nifty_value"],
|
||||
row["gold_value"],
|
||||
row["portfolio_value"],
|
||||
row["total_invested"],
|
||||
row["pnl"],
|
||||
),
|
||||
)
|
||||
if cur.rowcount:
|
||||
insert_engine_event(cur, "MTM_UPDATED", data=row, ts=ts)
|
||||
return portfolio_value, pnl
|
||||
|
||||
def log_mtm(
|
||||
nifty_units,
|
||||
gold_units,
|
||||
nifty_price,
|
||||
gold_price,
|
||||
total_invested,
|
||||
*,
|
||||
cur=None,
|
||||
logical_time=None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
ts = logical_time or datetime.now(timezone.utc)
|
||||
if cur is not None:
|
||||
return _log_mtm_in_tx(
|
||||
cur,
|
||||
nifty_units,
|
||||
gold_units,
|
||||
nifty_price,
|
||||
gold_price,
|
||||
total_invested,
|
||||
ts,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
def _op(cur, _conn):
|
||||
return _log_mtm_in_tx(
|
||||
cur,
|
||||
nifty_units,
|
||||
gold_units,
|
||||
nifty_price,
|
||||
gold_price,
|
||||
total_invested,
|
||||
ts,
|
||||
logical_time=logical_time,
|
||||
user_id=user_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
def _get_last_mtm_ts(user_id: str | None = None, run_id: str | None = None):
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT MAX(timestamp) FROM mtm_ledger WHERE user_id = %s AND run_id = %s",
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row or row[0] is None:
|
||||
return None
|
||||
return row[0].astimezone().replace(tzinfo=None)
|
||||
|
||||
def should_log_mtm(df, now, user_id: str | None = None, run_id: str | None = None):
|
||||
if df is None:
|
||||
last_ts = _get_last_mtm_ts(user_id=user_id, run_id=run_id)
|
||||
if last_ts is None:
|
||||
return True
|
||||
return (now - last_ts).total_seconds() >= MTM_INTERVAL_SECONDS
|
||||
if getattr(df, "empty", False):
|
||||
return True
|
||||
try:
|
||||
last_ts = datetime.fromisoformat(str(df.iloc[-1]["timestamp"]))
|
||||
except Exception:
|
||||
return True
|
||||
return (now - last_ts).total_seconds() >= MTM_INTERVAL_SECONDS
|
||||
|
||||
@ -1,518 +0,0 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from indian_paper_trading_strategy.engine.market import is_market_open, align_to_market_open
|
||||
from indian_paper_trading_strategy.engine.execution import try_execute_sip
|
||||
from indian_paper_trading_strategy.engine.broker import PaperBroker
|
||||
from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm
|
||||
from indian_paper_trading_strategy.engine.state import load_state
|
||||
from indian_paper_trading_strategy.engine.data import fetch_live_price
|
||||
from indian_paper_trading_strategy.engine.history import load_monthly_close
|
||||
from indian_paper_trading_strategy.engine.strategy import allocation
|
||||
from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time
|
||||
|
||||
from indian_paper_trading_strategy.engine.db import db_transaction, insert_engine_event, run_with_retry, get_context, set_context
|
||||
|
||||
|
||||
def _update_engine_status(user_id: str, run_id: str, status: str):
|
||||
now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
|
||||
def _op(cur, _conn):
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO engine_status (user_id, run_id, status, last_updated)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
ON CONFLICT (user_id, run_id) DO UPDATE
|
||||
SET status = EXCLUDED.status,
|
||||
last_updated = EXCLUDED.last_updated
|
||||
""",
|
||||
(user_id, run_id, status, now),
|
||||
)
|
||||
|
||||
run_with_retry(_op)
|
||||
|
||||
NIFTY = "NIFTYBEES.NS"
|
||||
GOLD = "GOLDBEES.NS"
|
||||
SMA_MONTHS = 36
|
||||
|
||||
_DEFAULT_ENGINE_STATE = {
|
||||
"state": "STOPPED",
|
||||
"run_id": None,
|
||||
"user_id": None,
|
||||
"last_heartbeat_ts": None,
|
||||
}
|
||||
|
||||
_ENGINE_STATES = {}
|
||||
_ENGINE_STATES_LOCK = threading.Lock()
|
||||
|
||||
_RUNNERS = {}
|
||||
_RUNNERS_LOCK = threading.Lock()
|
||||
|
||||
engine_state = _ENGINE_STATES
|
||||
|
||||
|
||||
def _state_key(user_id: str, run_id: str):
|
||||
return (user_id, run_id)
|
||||
|
||||
|
||||
def _get_state(user_id: str, run_id: str):
|
||||
key = _state_key(user_id, run_id)
|
||||
with _ENGINE_STATES_LOCK:
|
||||
state = _ENGINE_STATES.get(key)
|
||||
if state is None:
|
||||
state = dict(_DEFAULT_ENGINE_STATE)
|
||||
state["user_id"] = user_id
|
||||
state["run_id"] = run_id
|
||||
_ENGINE_STATES[key] = state
|
||||
return state
|
||||
|
||||
|
||||
def _set_state(user_id: str, run_id: str, **updates):
|
||||
key = _state_key(user_id, run_id)
|
||||
with _ENGINE_STATES_LOCK:
|
||||
state = _ENGINE_STATES.get(key)
|
||||
if state is None:
|
||||
state = dict(_DEFAULT_ENGINE_STATE)
|
||||
state["user_id"] = user_id
|
||||
state["run_id"] = run_id
|
||||
_ENGINE_STATES[key] = state
|
||||
state.update(updates)
|
||||
|
||||
|
||||
def get_engine_state(user_id: str, run_id: str):
|
||||
state = _get_state(user_id, run_id)
|
||||
return dict(state)
|
||||
|
||||
def log_event(
|
||||
event: str,
|
||||
data: dict | None = None,
|
||||
message: str | None = None,
|
||||
meta: dict | None = None,
|
||||
):
|
||||
entry = {
|
||||
"ts": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
|
||||
"event": event,
|
||||
}
|
||||
if message is not None or meta is not None:
|
||||
entry["message"] = message or ""
|
||||
entry["meta"] = meta or {}
|
||||
else:
|
||||
entry["data"] = data or {}
|
||||
event_ts = datetime.fromisoformat(entry["ts"].replace("Z", "+00:00"))
|
||||
data = entry.get("data") if "data" in entry else None
|
||||
meta = entry.get("meta") if "meta" in entry else None
|
||||
|
||||
def _op(cur, _conn):
|
||||
insert_engine_event(
|
||||
cur,
|
||||
entry.get("event"),
|
||||
data=data,
|
||||
message=entry.get("message"),
|
||||
meta=meta,
|
||||
ts=event_ts,
|
||||
)
|
||||
|
||||
run_with_retry(_op)
|
||||
|
||||
def sleep_with_heartbeat(
|
||||
total_seconds: int,
|
||||
stop_event: threading.Event,
|
||||
user_id: str,
|
||||
run_id: str,
|
||||
step_seconds: int = 5,
|
||||
):
|
||||
remaining = total_seconds
|
||||
while remaining > 0 and not stop_event.is_set():
|
||||
time.sleep(min(step_seconds, remaining))
|
||||
_set_state(user_id, run_id, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
|
||||
remaining -= step_seconds
|
||||
|
||||
def _clear_runner(user_id: str, run_id: str):
|
||||
key = _state_key(user_id, run_id)
|
||||
with _RUNNERS_LOCK:
|
||||
_RUNNERS.pop(key, None)
|
||||
|
||||
def can_execute(now: datetime) -> tuple[bool, str]:
|
||||
if not is_market_open(now):
|
||||
return False, "MARKET_CLOSED"
|
||||
return True, "OK"
|
||||
|
||||
def _engine_loop(config, stop_event: threading.Event):
|
||||
print("Strategy engine started with config:", config)
|
||||
|
||||
user_id = config.get("user_id")
|
||||
run_id = config.get("run_id")
|
||||
scope_user, scope_run = get_context(user_id, run_id)
|
||||
set_context(scope_user, scope_run)
|
||||
|
||||
strategy_name = config.get("strategy_name") or config.get("strategy") or "golden_nifty"
|
||||
sip_amount = config["sip_amount"]
|
||||
configured_frequency = config.get("sip_frequency") or {}
|
||||
if not isinstance(configured_frequency, dict):
|
||||
configured_frequency = {}
|
||||
frequency_value = int(configured_frequency.get("value", config.get("frequency", 0)))
|
||||
frequency_unit = configured_frequency.get("unit", config.get("unit", "days"))
|
||||
frequency_label = f"{frequency_value} {frequency_unit}"
|
||||
emit_event_cb = config.get("emit_event")
|
||||
if not callable(emit_event_cb):
|
||||
emit_event_cb = None
|
||||
debug_enabled = os.getenv("ENGINE_DEBUG", "1").strip().lower() not in {"0", "false", "no"}
|
||||
|
||||
def debug_event(event: str, message: str, meta: dict | None = None):
|
||||
if not debug_enabled:
|
||||
return
|
||||
try:
|
||||
log_event(event=event, message=message, meta=meta or {})
|
||||
except Exception:
|
||||
pass
|
||||
if emit_event_cb:
|
||||
emit_event_cb(event=event, message=message, meta=meta or {})
|
||||
print(f"[ENGINE] {event} {message} {meta or {}}", flush=True)
|
||||
mode = (config.get("mode") or "LIVE").strip().upper()
|
||||
if mode not in {"PAPER", "LIVE"}:
|
||||
mode = "LIVE"
|
||||
broker_type = config.get("broker") or "paper"
|
||||
if broker_type != "paper":
|
||||
broker_type = "paper"
|
||||
if broker_type == "paper":
|
||||
mode = "PAPER"
|
||||
initial_cash = float(config.get("initial_cash", 0))
|
||||
broker = PaperBroker(initial_cash=initial_cash)
|
||||
log_event(
|
||||
event="DEBUG_PAPER_STORE_PATH",
|
||||
message="Paper broker store path",
|
||||
meta={
|
||||
"cwd": os.getcwd(),
|
||||
"paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH",
|
||||
"abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A",
|
||||
},
|
||||
)
|
||||
if emit_event_cb:
|
||||
emit_event_cb(
|
||||
event="DEBUG_PAPER_STORE_PATH",
|
||||
message="Paper broker store path",
|
||||
meta={
|
||||
"cwd": os.getcwd(),
|
||||
"paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH",
|
||||
"abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A",
|
||||
},
|
||||
)
|
||||
|
||||
log_event("ENGINE_START", {
|
||||
"strategy": strategy_name,
|
||||
"sip_amount": sip_amount,
|
||||
"frequency": frequency_label,
|
||||
})
|
||||
debug_event("ENGINE_START_DEBUG", "engine loop started", {"run_id": scope_run, "user_id": scope_user})
|
||||
|
||||
_set_state(
|
||||
scope_user,
|
||||
scope_run,
|
||||
state="RUNNING",
|
||||
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
||||
)
|
||||
_update_engine_status(scope_user, scope_run, "RUNNING")
|
||||
|
||||
try:
|
||||
while not stop_event.is_set():
|
||||
_set_state(scope_user, scope_run, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
|
||||
_update_engine_status(scope_user, scope_run, "RUNNING")
|
||||
|
||||
state = load_state(mode=mode)
|
||||
debug_event(
|
||||
"STATE_LOADED",
|
||||
"loaded engine state",
|
||||
{
|
||||
"last_sip_ts": state.get("last_sip_ts"),
|
||||
"last_run": state.get("last_run"),
|
||||
"cash": state.get("cash"),
|
||||
"total_invested": state.get("total_invested"),
|
||||
},
|
||||
)
|
||||
state_frequency = state.get("sip_frequency")
|
||||
if not isinstance(state_frequency, dict):
|
||||
state_frequency = {"value": frequency_value, "unit": frequency_unit}
|
||||
freq = int(state_frequency.get("value", frequency_value))
|
||||
unit = state_frequency.get("unit", frequency_unit)
|
||||
frequency_label = f"{freq} {unit}"
|
||||
if unit == "minutes":
|
||||
delta = timedelta(minutes=freq)
|
||||
else:
|
||||
delta = timedelta(days=freq)
|
||||
|
||||
# Gate 2: time to SIP
|
||||
last_run = state.get("last_run") or state.get("last_sip_ts")
|
||||
is_first_run = last_run is None
|
||||
now = datetime.now()
|
||||
debug_event(
|
||||
"ENGINE_LOOP_TICK",
|
||||
"engine loop tick",
|
||||
{"now": now.isoformat(), "frequency": frequency_label},
|
||||
)
|
||||
|
||||
if last_run and not is_first_run:
|
||||
next_run = datetime.fromisoformat(last_run) + delta
|
||||
next_run = align_to_market_open(next_run)
|
||||
if now < next_run:
|
||||
log_event(
|
||||
event="SIP_WAITING",
|
||||
message="Waiting for next SIP window",
|
||||
meta={
|
||||
"last_run": last_run,
|
||||
"next_eligible": next_run.isoformat(),
|
||||
"now": now.isoformat(),
|
||||
"frequency": frequency_label,
|
||||
},
|
||||
)
|
||||
if emit_event_cb:
|
||||
emit_event_cb(
|
||||
event="SIP_WAITING",
|
||||
message="Waiting for next SIP window",
|
||||
meta={
|
||||
"last_run": last_run,
|
||||
"next_eligible": next_run.isoformat(),
|
||||
"now": now.isoformat(),
|
||||
"frequency": frequency_label,
|
||||
},
|
||||
)
|
||||
sleep_with_heartbeat(60, stop_event, scope_user, scope_run)
|
||||
continue
|
||||
|
||||
try:
|
||||
debug_event("PRICE_FETCH_START", "fetching live prices", {"tickers": [NIFTY, GOLD]})
|
||||
nifty_price = fetch_live_price(NIFTY)
|
||||
gold_price = fetch_live_price(GOLD)
|
||||
debug_event(
|
||||
"PRICE_FETCHED",
|
||||
"fetched live prices",
|
||||
{"nifty_price": float(nifty_price), "gold_price": float(gold_price)},
|
||||
)
|
||||
except Exception as exc:
|
||||
debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)})
|
||||
sleep_with_heartbeat(30, stop_event, scope_user, scope_run)
|
||||
continue
|
||||
|
||||
try:
|
||||
nifty_hist = load_monthly_close(NIFTY)
|
||||
gold_hist = load_monthly_close(GOLD)
|
||||
except Exception as exc:
|
||||
debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)})
|
||||
sleep_with_heartbeat(30, stop_event, scope_user, scope_run)
|
||||
continue
|
||||
|
||||
nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
||||
gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1]
|
||||
|
||||
eq_w, gd_w = allocation(
|
||||
sp_price=nifty_price,
|
||||
gd_price=gold_price,
|
||||
sp_sma=nifty_sma,
|
||||
gd_sma=gold_sma
|
||||
)
|
||||
debug_event(
|
||||
"WEIGHTS_COMPUTED",
|
||||
"computed allocation weights",
|
||||
{"equity_weight": float(eq_w), "gold_weight": float(gd_w)},
|
||||
)
|
||||
|
||||
weights = {"equity": eq_w, "gold": gd_w}
|
||||
allowed, reason = can_execute(now)
|
||||
executed = False
|
||||
if not allowed:
|
||||
log_event(
|
||||
event="EXECUTION_BLOCKED",
|
||||
message="Execution blocked by market gate",
|
||||
meta={
|
||||
"reason": reason,
|
||||
"eligible_since": last_run,
|
||||
"checked_at": now.isoformat(),
|
||||
},
|
||||
)
|
||||
debug_event("MARKET_GATE", "market closed", {"reason": reason})
|
||||
if emit_event_cb:
|
||||
emit_event_cb(
|
||||
event="EXECUTION_BLOCKED",
|
||||
message="Execution blocked by market gate",
|
||||
meta={
|
||||
"reason": reason,
|
||||
"eligible_since": last_run,
|
||||
"checked_at": now.isoformat(),
|
||||
},
|
||||
)
|
||||
else:
|
||||
log_event(
|
||||
event="DEBUG_BEFORE_TRY_EXECUTE",
|
||||
message="About to call try_execute_sip",
|
||||
meta={
|
||||
"last_run": last_run,
|
||||
"frequency": frequency_label,
|
||||
"allowed": allowed,
|
||||
"reason": reason,
|
||||
"sip_amount": sip_amount,
|
||||
"broker": type(broker).__name__,
|
||||
"now": now.isoformat(),
|
||||
},
|
||||
)
|
||||
if emit_event_cb:
|
||||
emit_event_cb(
|
||||
event="DEBUG_BEFORE_TRY_EXECUTE",
|
||||
message="About to call try_execute_sip",
|
||||
meta={
|
||||
"last_run": last_run,
|
||||
"frequency": frequency_label,
|
||||
"allowed": allowed,
|
||||
"reason": reason,
|
||||
"sip_amount": sip_amount,
|
||||
"broker": type(broker).__name__,
|
||||
"now": now.isoformat(),
|
||||
},
|
||||
)
|
||||
debug_event(
|
||||
"TRY_EXECUTE_START",
|
||||
"calling try_execute_sip",
|
||||
{"sip_interval_sec": delta.total_seconds(), "sip_amount": sip_amount},
|
||||
)
|
||||
state, executed = try_execute_sip(
|
||||
now=now,
|
||||
market_open=True,
|
||||
sip_interval=delta.total_seconds(),
|
||||
sip_amount=sip_amount,
|
||||
sp_price=nifty_price,
|
||||
gd_price=gold_price,
|
||||
eq_w=eq_w,
|
||||
gd_w=gd_w,
|
||||
broker=broker,
|
||||
mode=mode,
|
||||
)
|
||||
log_event(
|
||||
event="DEBUG_AFTER_TRY_EXECUTE",
|
||||
message="Returned from try_execute_sip",
|
||||
meta={
|
||||
"executed": executed,
|
||||
"state_last_run": state.get("last_run"),
|
||||
"state_last_sip_ts": state.get("last_sip_ts"),
|
||||
},
|
||||
)
|
||||
if emit_event_cb:
|
||||
emit_event_cb(
|
||||
event="DEBUG_AFTER_TRY_EXECUTE",
|
||||
message="Returned from try_execute_sip",
|
||||
meta={
|
||||
"executed": executed,
|
||||
"state_last_run": state.get("last_run"),
|
||||
"state_last_sip_ts": state.get("last_sip_ts"),
|
||||
},
|
||||
)
|
||||
debug_event(
|
||||
"TRY_EXECUTE_DONE",
|
||||
"try_execute_sip finished",
|
||||
{"executed": executed, "last_run": state.get("last_run")},
|
||||
)
|
||||
|
||||
if executed:
|
||||
log_event("SIP_TRIGGERED", {
|
||||
"date": now.date().isoformat(),
|
||||
"allocation": weights,
|
||||
"cash_used": sip_amount
|
||||
})
|
||||
debug_event("SIP_TRIGGERED", "sip executed", {"cash_used": sip_amount})
|
||||
portfolio_value = (
|
||||
state["nifty_units"] * nifty_price
|
||||
+ state["gold_units"] * gold_price
|
||||
)
|
||||
log_event("PORTFOLIO_UPDATED", {
|
||||
"nifty_units": state["nifty_units"],
|
||||
"gold_units": state["gold_units"],
|
||||
"portfolio_value": portfolio_value
|
||||
})
|
||||
print("SIP executed at", now)
|
||||
|
||||
if should_log_mtm(None, now):
|
||||
logical_time = normalize_logical_time(now)
|
||||
with db_transaction() as cur:
|
||||
log_mtm(
|
||||
nifty_units=state["nifty_units"],
|
||||
gold_units=state["gold_units"],
|
||||
nifty_price=nifty_price,
|
||||
gold_price=gold_price,
|
||||
total_invested=state["total_invested"],
|
||||
cur=cur,
|
||||
logical_time=logical_time,
|
||||
)
|
||||
broker.update_equity(
|
||||
{NIFTY: nifty_price, GOLD: gold_price},
|
||||
now,
|
||||
cur=cur,
|
||||
logical_time=logical_time,
|
||||
)
|
||||
|
||||
sleep_with_heartbeat(30, stop_event, scope_user, scope_run)
|
||||
except Exception as e:
|
||||
_set_state(scope_user, scope_run, state="ERROR", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
|
||||
_update_engine_status(scope_user, scope_run, "ERROR")
|
||||
log_event("ENGINE_ERROR", {"error": str(e)})
|
||||
raise
|
||||
|
||||
log_event("ENGINE_STOP")
|
||||
_set_state(
|
||||
scope_user,
|
||||
scope_run,
|
||||
state="STOPPED",
|
||||
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
|
||||
)
|
||||
_update_engine_status(scope_user, scope_run, "STOPPED")
|
||||
print("Strategy engine stopped")
|
||||
_clear_runner(scope_user, scope_run)
|
||||
|
||||
def start_engine(config):
|
||||
user_id = config.get("user_id")
|
||||
run_id = config.get("run_id")
|
||||
if not user_id:
|
||||
raise ValueError("user_id is required to start engine")
|
||||
if not run_id:
|
||||
raise ValueError("run_id is required to start engine")
|
||||
|
||||
with _RUNNERS_LOCK:
|
||||
key = _state_key(user_id, run_id)
|
||||
runner = _RUNNERS.get(key)
|
||||
if runner and runner["thread"].is_alive():
|
||||
return False
|
||||
|
||||
stop_event = threading.Event()
|
||||
thread = threading.Thread(
|
||||
target=_engine_loop,
|
||||
args=(config, stop_event),
|
||||
daemon=True,
|
||||
)
|
||||
_RUNNERS[key] = {"thread": thread, "stop_event": stop_event}
|
||||
thread.start()
|
||||
return True
|
||||
|
||||
def stop_engine(user_id: str, run_id: str | None = None, timeout: float | None = 10.0):
|
||||
runners = []
|
||||
with _RUNNERS_LOCK:
|
||||
if run_id:
|
||||
key = _state_key(user_id, run_id)
|
||||
runner = _RUNNERS.get(key)
|
||||
if runner:
|
||||
runners.append((key, runner))
|
||||
else:
|
||||
for key, runner in list(_RUNNERS.items()):
|
||||
if key[0] == user_id:
|
||||
runners.append((key, runner))
|
||||
for _key, runner in runners:
|
||||
runner["stop_event"].set()
|
||||
stopped_all = True
|
||||
for key, runner in runners:
|
||||
thread = runner["thread"]
|
||||
if timeout is not None:
|
||||
thread.join(timeout=timeout)
|
||||
stopped = not thread.is_alive()
|
||||
if stopped:
|
||||
_clear_runner(key[0], key[1])
|
||||
else:
|
||||
stopped_all = False
|
||||
return stopped_all
|
||||
|
||||
@ -1,303 +0,0 @@
|
||||
# engine/state.py
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_context
|
||||
|
||||
DEFAULT_STATE = {
|
||||
"initial_cash": 0.0,
|
||||
"cash": 0.0,
|
||||
"total_invested": 0.0,
|
||||
"nifty_units": 0.0,
|
||||
"gold_units": 0.0,
|
||||
"last_sip_ts": None,
|
||||
"last_run": None,
|
||||
"sip_frequency": None,
|
||||
}
|
||||
|
||||
DEFAULT_PAPER_STATE = {
|
||||
**DEFAULT_STATE,
|
||||
"initial_cash": 1_000_000.0,
|
||||
"cash": 1_000_000.0,
|
||||
"sip_frequency": {"value": 30, "unit": "days"},
|
||||
}
|
||||
|
||||
def _state_key(mode: str | None):
|
||||
key = (mode or "LIVE").strip().upper()
|
||||
return "PAPER" if key == "PAPER" else "LIVE"
|
||||
|
||||
def _default_state(mode: str | None):
|
||||
if _state_key(mode) == "PAPER":
|
||||
return DEFAULT_PAPER_STATE.copy()
|
||||
return DEFAULT_STATE.copy()
|
||||
|
||||
def _local_tz():
|
||||
return datetime.now().astimezone().tzinfo
|
||||
|
||||
def _format_local_ts(value: datetime | None):
|
||||
if value is None:
|
||||
return None
|
||||
return value.astimezone(_local_tz()).replace(tzinfo=None).isoformat()
|
||||
|
||||
def _parse_ts(value):
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
if value.tzinfo is None:
|
||||
return value.replace(tzinfo=_local_tz())
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
text = value.strip()
|
||||
if not text:
|
||||
return None
|
||||
try:
|
||||
parsed = datetime.fromisoformat(text.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
if parsed.tzinfo is None:
|
||||
parsed = parsed.replace(tzinfo=_local_tz())
|
||||
return parsed
|
||||
return None
|
||||
|
||||
def _resolve_scope(user_id: str | None, run_id: str | None):
|
||||
return get_context(user_id, run_id)
|
||||
|
||||
|
||||
def load_state(
|
||||
mode: str | None = "LIVE",
|
||||
*,
|
||||
cur=None,
|
||||
for_update: bool = False,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
scope_user, scope_run = _resolve_scope(user_id, run_id)
|
||||
key = _state_key(mode)
|
||||
if key == "PAPER":
|
||||
if cur is None:
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return load_state(
|
||||
mode=mode,
|
||||
cur=cur,
|
||||
for_update=for_update,
|
||||
user_id=scope_user,
|
||||
run_id=scope_run,
|
||||
)
|
||||
lock_clause = " FOR UPDATE" if for_update else ""
|
||||
cur.execute(
|
||||
f"""
|
||||
SELECT initial_cash, cash, total_invested, nifty_units, gold_units,
|
||||
last_sip_ts, last_run, sip_frequency_value, sip_frequency_unit
|
||||
FROM engine_state_paper
|
||||
WHERE user_id = %s AND run_id = %s{lock_clause}
|
||||
LIMIT 1
|
||||
""",
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return _default_state(mode)
|
||||
merged = _default_state(mode)
|
||||
merged.update(
|
||||
{
|
||||
"initial_cash": float(row[0]) if row[0] is not None else merged["initial_cash"],
|
||||
"cash": float(row[1]) if row[1] is not None else merged["cash"],
|
||||
"total_invested": float(row[2]) if row[2] is not None else merged["total_invested"],
|
||||
"nifty_units": float(row[3]) if row[3] is not None else merged["nifty_units"],
|
||||
"gold_units": float(row[4]) if row[4] is not None else merged["gold_units"],
|
||||
"last_sip_ts": _format_local_ts(row[5]),
|
||||
"last_run": _format_local_ts(row[6]),
|
||||
}
|
||||
)
|
||||
if row[7] is not None or row[8] is not None:
|
||||
merged["sip_frequency"] = {"value": row[7], "unit": row[8]}
|
||||
return merged
|
||||
|
||||
if cur is None:
|
||||
with db_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return load_state(
|
||||
mode=mode,
|
||||
cur=cur,
|
||||
for_update=for_update,
|
||||
user_id=scope_user,
|
||||
run_id=scope_run,
|
||||
)
|
||||
lock_clause = " FOR UPDATE" if for_update else ""
|
||||
cur.execute(
|
||||
f"""
|
||||
SELECT total_invested, nifty_units, gold_units, last_sip_ts, last_run
|
||||
FROM engine_state
|
||||
WHERE user_id = %s AND run_id = %s{lock_clause}
|
||||
LIMIT 1
|
||||
""",
|
||||
(scope_user, scope_run),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return _default_state(mode)
|
||||
merged = _default_state(mode)
|
||||
merged.update(
|
||||
{
|
||||
"total_invested": float(row[0]) if row[0] is not None else merged["total_invested"],
|
||||
"nifty_units": float(row[1]) if row[1] is not None else merged["nifty_units"],
|
||||
"gold_units": float(row[2]) if row[2] is not None else merged["gold_units"],
|
||||
"last_sip_ts": _format_local_ts(row[3]),
|
||||
"last_run": _format_local_ts(row[4]),
|
||||
}
|
||||
)
|
||||
return merged
|
||||
|
||||
def init_paper_state(
|
||||
initial_cash: float,
|
||||
sip_frequency: dict | None = None,
|
||||
*,
|
||||
cur=None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
state = DEFAULT_PAPER_STATE.copy()
|
||||
state.update(
|
||||
{
|
||||
"initial_cash": float(initial_cash),
|
||||
"cash": float(initial_cash),
|
||||
"total_invested": 0.0,
|
||||
"nifty_units": 0.0,
|
||||
"gold_units": 0.0,
|
||||
"last_sip_ts": None,
|
||||
"last_run": None,
|
||||
"sip_frequency": sip_frequency or state.get("sip_frequency"),
|
||||
}
|
||||
)
|
||||
save_state(state, mode="PAPER", cur=cur, emit_event=True, user_id=user_id, run_id=run_id)
|
||||
return state
|
||||
|
||||
def save_state(
|
||||
state,
|
||||
mode: str | None = "LIVE",
|
||||
*,
|
||||
cur=None,
|
||||
emit_event: bool = False,
|
||||
event_meta: dict | None = None,
|
||||
user_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
):
|
||||
scope_user, scope_run = _resolve_scope(user_id, run_id)
|
||||
key = _state_key(mode)
|
||||
last_sip_ts = _parse_ts(state.get("last_sip_ts"))
|
||||
last_run = _parse_ts(state.get("last_run"))
|
||||
if key == "PAPER":
|
||||
sip_frequency = state.get("sip_frequency")
|
||||
sip_value = None
|
||||
sip_unit = None
|
||||
if isinstance(sip_frequency, dict):
|
||||
sip_value = sip_frequency.get("value")
|
||||
sip_unit = sip_frequency.get("unit")
|
||||
def _save(cur):
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO engine_state_paper (
|
||||
user_id, run_id, initial_cash, cash, total_invested, nifty_units, gold_units,
|
||||
last_sip_ts, last_run, sip_frequency_value, sip_frequency_unit
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (user_id, run_id) DO UPDATE
|
||||
SET initial_cash = EXCLUDED.initial_cash,
|
||||
cash = EXCLUDED.cash,
|
||||
total_invested = EXCLUDED.total_invested,
|
||||
nifty_units = EXCLUDED.nifty_units,
|
||||
gold_units = EXCLUDED.gold_units,
|
||||
last_sip_ts = EXCLUDED.last_sip_ts,
|
||||
last_run = EXCLUDED.last_run,
|
||||
sip_frequency_value = EXCLUDED.sip_frequency_value,
|
||||
sip_frequency_unit = EXCLUDED.sip_frequency_unit
|
||||
""",
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
float(state.get("initial_cash", 0.0)),
|
||||
float(state.get("cash", 0.0)),
|
||||
float(state.get("total_invested", 0.0)),
|
||||
float(state.get("nifty_units", 0.0)),
|
||||
float(state.get("gold_units", 0.0)),
|
||||
last_sip_ts,
|
||||
last_run,
|
||||
sip_value,
|
||||
sip_unit,
|
||||
),
|
||||
)
|
||||
if emit_event:
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"STATE_UPDATED",
|
||||
data={
|
||||
"mode": "PAPER",
|
||||
"cash": state.get("cash"),
|
||||
"total_invested": state.get("total_invested"),
|
||||
"nifty_units": state.get("nifty_units"),
|
||||
"gold_units": state.get("gold_units"),
|
||||
"last_sip_ts": state.get("last_sip_ts"),
|
||||
"last_run": state.get("last_run"),
|
||||
},
|
||||
meta=event_meta,
|
||||
ts=datetime.utcnow().replace(tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
if cur is not None:
|
||||
_save(cur)
|
||||
return
|
||||
|
||||
def _op(cur, _conn):
|
||||
_save(cur)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
def _save(cur):
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO engine_state (
|
||||
user_id, run_id, total_invested, nifty_units, gold_units, last_sip_ts, last_run
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (user_id, run_id) DO UPDATE
|
||||
SET total_invested = EXCLUDED.total_invested,
|
||||
nifty_units = EXCLUDED.nifty_units,
|
||||
gold_units = EXCLUDED.gold_units,
|
||||
last_sip_ts = EXCLUDED.last_sip_ts,
|
||||
last_run = EXCLUDED.last_run
|
||||
""",
|
||||
(
|
||||
scope_user,
|
||||
scope_run,
|
||||
float(state.get("total_invested", 0.0)),
|
||||
float(state.get("nifty_units", 0.0)),
|
||||
float(state.get("gold_units", 0.0)),
|
||||
last_sip_ts,
|
||||
last_run,
|
||||
),
|
||||
)
|
||||
if emit_event:
|
||||
insert_engine_event(
|
||||
cur,
|
||||
"STATE_UPDATED",
|
||||
data={
|
||||
"mode": "LIVE",
|
||||
"total_invested": state.get("total_invested"),
|
||||
"nifty_units": state.get("nifty_units"),
|
||||
"gold_units": state.get("gold_units"),
|
||||
"last_sip_ts": state.get("last_sip_ts"),
|
||||
"last_run": state.get("last_run"),
|
||||
},
|
||||
meta=event_meta,
|
||||
ts=datetime.utcnow().replace(tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
if cur is not None:
|
||||
_save(cur)
|
||||
return
|
||||
|
||||
def _op(cur, _conn):
|
||||
_save(cur)
|
||||
|
||||
return run_with_retry(_op)
|
||||
|
||||
@ -1,12 +0,0 @@
|
||||
# engine/strategy.py
|
||||
import numpy as np
|
||||
|
||||
def allocation(sp_price, gd_price, sp_sma, gd_sma,
|
||||
base=0.6, tilt_mult=1.5,
|
||||
max_tilt=0.25, min_eq=0.2, max_eq=0.9):
|
||||
|
||||
rd = (sp_price / sp_sma) - (gd_price / gd_sma)
|
||||
tilt = np.clip(-rd * tilt_mult, -max_tilt, max_tilt)
|
||||
|
||||
eq_w = np.clip(base * (1 + tilt), min_eq, max_eq)
|
||||
return eq_w, 1 - eq_w
|
||||
@ -1,41 +0,0 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
def frequency_to_timedelta(freq: dict) -> timedelta:
|
||||
value = int(freq.get("value", 0))
|
||||
unit = freq.get("unit")
|
||||
|
||||
if value <= 0:
|
||||
raise ValueError("Frequency value must be > 0")
|
||||
|
||||
if unit == "minutes":
|
||||
return timedelta(minutes=value)
|
||||
if unit == "days":
|
||||
return timedelta(days=value)
|
||||
raise ValueError(f"Unsupported frequency unit: {unit}")
|
||||
|
||||
|
||||
def normalize_logical_time(ts: datetime) -> datetime:
|
||||
return ts.replace(microsecond=0)
|
||||
|
||||
|
||||
def compute_logical_time(
|
||||
now: datetime,
|
||||
last_run: str | None,
|
||||
interval_seconds: float | None,
|
||||
) -> datetime:
|
||||
base = now
|
||||
if last_run and interval_seconds:
|
||||
try:
|
||||
parsed = datetime.fromisoformat(last_run.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
parsed = None
|
||||
if parsed is not None:
|
||||
if now.tzinfo and parsed.tzinfo is None:
|
||||
parsed = parsed.replace(tzinfo=now.tzinfo)
|
||||
elif now.tzinfo is None and parsed.tzinfo:
|
||||
parsed = parsed.replace(tzinfo=None)
|
||||
candidate = parsed + timedelta(seconds=interval_seconds)
|
||||
if now >= candidate:
|
||||
base = candidate
|
||||
return normalize_logical_time(base)
|
||||
Loading…
x
Reference in New Issue
Block a user