From 7677895b05bffff9d87639d2bc09408ac56f5cb3 Mon Sep 17 00:00:00 2001 From: Thigazhezhilan J Date: Sun, 22 Mar 2026 14:37:33 +0530 Subject: [PATCH] thigal_test --- backend/app/routers/auth.py | 6 +- backend/app/routers/broker.py | 6 +- backend/app/services/email_service.py | 18 +- backend/app/services/strategy_service.py | 6 +- .../app/streamlit_app.py | 208 ------ .../engine/__init__.py | 1 - .../engine/broker.py | 697 ------------------ .../engine/config.py | 150 ---- .../engine/data.py | 81 -- .../engine/engine_runner.py | 198 ----- .../engine/execution.py | 157 ---- .../engine/history.py | 34 - .../engine/ledger.py | 113 --- .../engine/market.py | 42 -- indian_paper_trading_strategy_1/engine/mtm.py | 154 ---- .../engine/runner.py | 518 ------------- .../engine/state.py | 303 -------- .../engine/strategy.py | 12 - .../engine/time_utils.py | 41 -- 19 files changed, 26 insertions(+), 2719 deletions(-) delete mode 100644 indian_paper_trading_strategy_1/app/streamlit_app.py delete mode 100644 indian_paper_trading_strategy_1/engine/__init__.py delete mode 100644 indian_paper_trading_strategy_1/engine/broker.py delete mode 100644 indian_paper_trading_strategy_1/engine/config.py delete mode 100644 indian_paper_trading_strategy_1/engine/data.py delete mode 100644 indian_paper_trading_strategy_1/engine/engine_runner.py delete mode 100644 indian_paper_trading_strategy_1/engine/execution.py delete mode 100644 indian_paper_trading_strategy_1/engine/history.py delete mode 100644 indian_paper_trading_strategy_1/engine/ledger.py delete mode 100644 indian_paper_trading_strategy_1/engine/market.py delete mode 100644 indian_paper_trading_strategy_1/engine/mtm.py delete mode 100644 indian_paper_trading_strategy_1/engine/runner.py delete mode 100644 indian_paper_trading_strategy_1/engine/state.py delete mode 100644 indian_paper_trading_strategy_1/engine/strategy.py delete mode 100644 indian_paper_trading_strategy_1/engine/time_utils.py diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index 1ac7f16..c794c97 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -11,7 +11,7 @@ from app.services.auth_service import ( get_last_session_meta, verify_user, ) -from app.services.email_service import send_email +from app.services.email_service import send_email_async router = APIRouter(prefix="/api") SESSION_COOKIE_NAME = "session_id" @@ -56,7 +56,7 @@ def signup(payload: AuthPayload, response: Response): "You can now log in and start using the platform.\n\n" "Quantfortune Support" ) - send_email(user["username"], "Welcome to Quantfortune", body) + send_email_async(user["username"], "Welcome to Quantfortune", body) except Exception: pass return {"id": user["id"], "username": user["username"], "role": user.get("role")} @@ -85,7 +85,7 @@ def login(payload: AuthPayload, response: Response, request: Request): f"Device: {user_agent or 'unknown'}\n\n" "If this wasn't you, please reset your password immediately." ) - send_email(user["username"], "New login detected", body) + send_email_async(user["username"], "New login detected", body) except Exception: pass diff --git a/backend/app/routers/broker.py b/backend/app/routers/broker.py index 8e5f7d6..f16443b 100644 --- a/backend/app/routers/broker.py +++ b/backend/app/routers/broker.py @@ -14,7 +14,7 @@ from app.broker_store import ( ) from app.services.auth_service import get_user_for_session from app.services.zerodha_service import build_login_url, exchange_request_token -from app.services.email_service import send_email +from app.services.email_service import send_email_async from app.services.zerodha_storage import set_session router = APIRouter(prefix="/api/broker") @@ -53,7 +53,7 @@ async def connect_broker(payload: dict, request: Request): f"Broker: {broker}\n" f"Broker User ID: {broker_user_id or 'N/A'}\n" ) - send_email(user["username"], "Broker connected", body) + send_email_async(user["username"], "Broker connected", body) except Exception: pass return {"connected": True} @@ -82,7 +82,7 @@ async def disconnect_broker(request: Request): set_broker_auth_state(user["id"], "DISCONNECTED") try: body = "Your broker connection has been disconnected from Quantfortune." - send_email(user["username"], "Broker disconnected", body) + send_email_async(user["username"], "Broker disconnected", body) except Exception: pass return {"connected": False} diff --git a/backend/app/services/email_service.py b/backend/app/services/email_service.py index f7213b0..07cf77d 100644 --- a/backend/app/services/email_service.py +++ b/backend/app/services/email_service.py @@ -1,9 +1,13 @@ import os import smtplib import ssl +import threading from email.message import EmailMessage +SMTP_TIMEOUT_SECONDS = float((os.getenv("SMTP_TIMEOUT_SECONDS") or "5").strip()) + + def send_email(to_email: str, subject: str, body_text: str) -> bool: smtp_user = (os.getenv("SMTP_USER") or "").strip() smtp_pass = (os.getenv("SMTP_PASS") or "").replace(" ", "").strip() @@ -21,8 +25,20 @@ def send_email(to_email: str, subject: str, body_text: str) -> bool: msg.set_content(body_text) context = ssl.create_default_context() - with smtplib.SMTP(smtp_host, smtp_port) as server: + with smtplib.SMTP(smtp_host, smtp_port, timeout=SMTP_TIMEOUT_SECONDS) as server: server.starttls(context=context) server.login(smtp_user, smtp_pass) server.send_message(msg) return True + + +def send_email_async(to_email: str, subject: str, body_text: str) -> bool: + def _worker(): + try: + send_email(to_email, subject, body_text) + except Exception: + pass + + thread = threading.Thread(target=_worker, daemon=True) + thread.start() + return True diff --git a/backend/app/services/strategy_service.py b/backend/app/services/strategy_service.py index f31c591..3c83e9a 100644 --- a/backend/app/services/strategy_service.py +++ b/backend/app/services/strategy_service.py @@ -24,7 +24,7 @@ from app.services.run_service import ( update_run_status, ) from app.services.auth_service import get_user_by_id -from app.services.email_service import send_email +from app.services.email_service import send_email_async from psycopg2.extras import Json from psycopg2 import errors @@ -402,7 +402,7 @@ def start_strategy(req, user_id: str): f"Mode: {mode}\n" f"Run ID: {run_id}\n" ) - send_email(user["username"], "Strategy started", body) + send_email_async(user["username"], "Strategy started", body) except Exception: pass @@ -491,7 +491,7 @@ def stop_strategy(user_id: str): user = get_user_by_id(user_id) if user: body = "Your strategy has been stopped." - send_email(user["username"], "Strategy stopped", body) + send_email_async(user["username"], "Strategy stopped", body) except Exception: pass diff --git a/indian_paper_trading_strategy_1/app/streamlit_app.py b/indian_paper_trading_strategy_1/app/streamlit_app.py deleted file mode 100644 index 8ce6e63..0000000 --- a/indian_paper_trading_strategy_1/app/streamlit_app.py +++ /dev/null @@ -1,208 +0,0 @@ -import streamlit as st -import time -from datetime import datetime -from pathlib import Path -import sys -import pandas as pd - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -if str(PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(PROJECT_ROOT)) - -from indian_paper_trading_strategy.engine.history import load_monthly_close -from indian_paper_trading_strategy.engine.market import india_market_status -from indian_paper_trading_strategy.engine.data import fetch_live_price -from indian_paper_trading_strategy.engine.strategy import allocation -from indian_paper_trading_strategy.engine.execution import try_execute_sip -from indian_paper_trading_strategy.engine.state import load_state, save_state -from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_default_user_id, get_active_run_id, set_context -from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm - -_STREAMLIT_USER_ID = get_default_user_id() -_STREAMLIT_RUN_ID = get_active_run_id(_STREAMLIT_USER_ID) if _STREAMLIT_USER_ID else None -if _STREAMLIT_USER_ID and _STREAMLIT_RUN_ID: - set_context(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID) - -def reset_runtime_state(): - def _op(cur, _conn): - cur.execute( - "DELETE FROM mtm_ledger WHERE user_id = %s AND run_id = %s", - (_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID), - ) - cur.execute( - "DELETE FROM event_ledger WHERE user_id = %s AND run_id = %s", - (_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID), - ) - cur.execute( - "DELETE FROM engine_state WHERE user_id = %s AND run_id = %s", - (_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID), - ) - insert_engine_event(cur, "LIVE_RESET", data={}) - - run_with_retry(_op) - -def load_mtm_df(): - with db_connection() as conn: - return pd.read_sql_query( - "SELECT timestamp, pnl FROM mtm_ledger WHERE user_id = %s AND run_id = %s ORDER BY timestamp", - conn, - params=(_STREAMLIT_USER_ID, _STREAMLIT_RUN_ID), - ) - -def is_engine_running(): - state = load_state(mode="LIVE") - return state.get("total_invested", 0) > 0 or \ - state.get("nifty_units", 0) > 0 or \ - state.get("gold_units", 0) > 0 - -if "engine_active" not in st.session_state: - st.session_state.engine_active = is_engine_running() - -NIFTY = "NIFTYBEES.NS" -GOLD = "GOLDBEES.NS" -SMA_MONTHS = 36 - -def get_prices(): - try: - nifty = fetch_live_price(NIFTY) - gold = fetch_live_price(GOLD) - return nifty, gold - except Exception as e: - st.error(e) - return None, None - -SIP_AMOUNT = st.number_input("SIP Amount (\u20B9)", 500, 100000, 5000) -SIP_INTERVAL_SEC = st.number_input("SIP Interval (sec) [TEST]", 30, 3600, 120) -REFRESH_SEC = st.slider("Refresh interval (sec)", 5, 60, 10) - -st.title("SIPXAR INDIA - Phase-1 Safe Engine") - -market_open, market_time = india_market_status() -st.info(f"NSE Market {'OPEN' if market_open else 'CLOSED'} | IST {market_time}") -if not market_open: - st.info("Market is closed. Portfolio values are frozen at last available prices.") - -col1, col2 = st.columns(2) - -with col1: - if st.button("START ENGINE"): - if is_engine_running(): - st.info("Engine already running. Resuming.") - st.session_state.engine_active = True - else: - st.session_state.engine_active = True - - # HARD RESET ONLY ON FIRST START - reset_runtime_state() - - save_state({ - "total_invested": 0.0, - "nifty_units": 0.0, - "gold_units": 0.0, - "last_sip_ts": None, - }, mode="LIVE", emit_event=True, event_meta={"source": "streamlit_start"}) - - st.success("Engine started") - -with col2: - if st.button("KILL ENGINE"): - st.session_state.engine_active = False - - reset_runtime_state() - - st.warning("Engine killed and state wiped") - st.stop() - -if not st.session_state.engine_active: - st.stop() - -state = load_state(mode="LIVE") -nifty_price, gold_price = get_prices() - -if nifty_price is None: - st.stop() - -st.subheader("Latest Market Prices (LTP)") - -c1, c2 = st.columns(2) - -with c1: - st.metric( - label="NIFTYBEES", - value=f"\u20B9{nifty_price:,.2f}", - help="Last traded price (delayed)" - ) - -with c2: - st.metric( - label="GOLDBEES", - value=f"\u20B9{gold_price:,.2f}", - help="Last traded price (delayed)" - ) - -st.caption(f"Price timestamp: {datetime.now().strftime('%H:%M:%S')}") - -nifty_hist = load_monthly_close(NIFTY) -gold_hist = load_monthly_close(GOLD) - -nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1] -gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1] - -eq_w, gd_w = allocation( - sp_price=nifty_price, - gd_price=gold_price, - sp_sma=nifty_sma, - gd_sma=gold_sma -) - -state, executed = try_execute_sip( - now=datetime.now(), - market_open=market_open, - sip_interval=SIP_INTERVAL_SEC, - sip_amount=SIP_AMOUNT, - sp_price=nifty_price, - gd_price=gold_price, - eq_w=eq_w, - gd_w=gd_w, - mode="LIVE", -) - -now = datetime.now() - -if market_open and should_log_mtm(None, now): - portfolio_value, pnl = log_mtm( - nifty_units=state["nifty_units"], - gold_units=state["gold_units"], - nifty_price=nifty_price, - gold_price=gold_price, - total_invested=state["total_invested"], - ) -else: - # Market closed -> freeze valuation (do NOT log) - portfolio_value = ( - state["nifty_units"] * nifty_price + - state["gold_units"] * gold_price - ) - pnl = portfolio_value - state["total_invested"] - -st.subheader("Equity Curve (Unrealized PnL)") - -mtm_df = load_mtm_df() - -if "timestamp" in mtm_df.columns and "pnl" in mtm_df.columns and len(mtm_df) > 1: - mtm_df["timestamp"] = pd.to_datetime(mtm_df["timestamp"]) - mtm_df = mtm_df.sort_values("timestamp").set_index("timestamp") - - st.line_chart(mtm_df["pnl"], height=350) -else: - st.warning("Not enough MTM data or missing columns. Expected: timestamp, pnl.") - -st.metric("Total Invested", f"\u20B9{state['total_invested']:,.0f}") -st.metric("NIFTY Units", round(state["nifty_units"], 4)) -st.metric("Gold Units", round(state["gold_units"], 4)) -st.metric("Portfolio Value", f"\u20B9{portfolio_value:,.0f}") -st.metric("PnL", f"\u20B9{pnl:,.0f}") - -time.sleep(REFRESH_SEC) -st.rerun() - diff --git a/indian_paper_trading_strategy_1/engine/__init__.py b/indian_paper_trading_strategy_1/engine/__init__.py deleted file mode 100644 index bf0d74e..0000000 --- a/indian_paper_trading_strategy_1/engine/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Engine package for the India paper trading strategy.""" diff --git a/indian_paper_trading_strategy_1/engine/broker.py b/indian_paper_trading_strategy_1/engine/broker.py deleted file mode 100644 index cc3ec87..0000000 --- a/indian_paper_trading_strategy_1/engine/broker.py +++ /dev/null @@ -1,697 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from dataclasses import dataclass -from datetime import datetime, timezone -import hashlib - -from psycopg2.extras import execute_values - -from indian_paper_trading_strategy.engine.data import fetch_live_price -from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_context - - -class Broker(ABC): - @abstractmethod - def place_order( - self, - symbol: str, - side: str, - quantity: float, - price: float | None = None, - logical_time: datetime | None = None, - ): - raise NotImplementedError - - @abstractmethod - def get_positions(self): - raise NotImplementedError - - @abstractmethod - def get_orders(self): - raise NotImplementedError - - @abstractmethod - def get_funds(self): - raise NotImplementedError - - -def _local_tz(): - return datetime.now().astimezone().tzinfo - - -def _format_utc_ts(value: datetime | None): - if value is None: - return None - if value.tzinfo is None: - value = value.replace(tzinfo=_local_tz()) - return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") - - -def _format_local_ts(value: datetime | None): - if value is None: - return None - if value.tzinfo is None: - value = value.replace(tzinfo=_local_tz()) - return value.astimezone(_local_tz()).replace(tzinfo=None).isoformat() - - -def _parse_ts(value, assume_local: bool = True): - if value is None: - return None - if isinstance(value, datetime): - if value.tzinfo is None: - return value.replace(tzinfo=_local_tz() if assume_local else timezone.utc) - return value - if isinstance(value, str): - text = value.strip() - if not text: - return None - if text.endswith("Z"): - try: - return datetime.fromisoformat(text.replace("Z", "+00:00")) - except ValueError: - return None - try: - parsed = datetime.fromisoformat(text) - except ValueError: - return None - if parsed.tzinfo is None: - parsed = parsed.replace(tzinfo=_local_tz() if assume_local else timezone.utc) - return parsed - return None - - -def _stable_num(value: float) -> str: - return f"{float(value):.12f}" - - -def _normalize_ts_for_id(ts: datetime) -> str: - if ts.tzinfo is None: - ts = ts.replace(tzinfo=timezone.utc) - return ts.astimezone(timezone.utc).replace(microsecond=0).isoformat() - - -def _deterministic_id(prefix: str, parts: list[str]) -> str: - payload = "|".join(parts) - digest = hashlib.sha1(payload.encode("utf-8")).hexdigest()[:16] - return f"{prefix}_{digest}" - - -def _resolve_scope(user_id: str | None, run_id: str | None): - return get_context(user_id, run_id) - - -@dataclass -class PaperBroker(Broker): - initial_cash: float - store_path: str | None = None - - def _default_store(self): - return { - "cash": float(self.initial_cash), - "positions": {}, - "orders": [], - "trades": [], - "equity_curve": [], - } - - def _load_store(self, cur=None, for_update: bool = False, user_id: str | None = None, run_id: str | None = None): - scope_user, scope_run = _resolve_scope(user_id, run_id) - if cur is None: - with db_connection() as conn: - with conn.cursor() as cur: - return self._load_store( - cur=cur, - for_update=for_update, - user_id=scope_user, - run_id=scope_run, - ) - - store = self._default_store() - lock_clause = " FOR UPDATE" if for_update else "" - cur.execute( - f"SELECT cash FROM paper_broker_account WHERE user_id = %s AND run_id = %s{lock_clause} LIMIT 1", - (scope_user, scope_run), - ) - row = cur.fetchone() - if row and row[0] is not None: - store["cash"] = float(row[0]) - - cur.execute( - f""" - SELECT symbol, qty, avg_price, last_price - FROM paper_position - WHERE user_id = %s AND run_id = %s{lock_clause} - """ - , - (scope_user, scope_run), - ) - positions = {} - for symbol, qty, avg_price, last_price in cur.fetchall(): - positions[symbol] = { - "qty": float(qty) if qty is not None else 0.0, - "avg_price": float(avg_price) if avg_price is not None else 0.0, - "last_price": float(last_price) if last_price is not None else 0.0, - } - store["positions"] = positions - - cur.execute( - """ - SELECT id, symbol, side, qty, price, status, timestamp, logical_time - FROM paper_order - WHERE user_id = %s AND run_id = %s - ORDER BY timestamp, id - """ - , - (scope_user, scope_run), - ) - orders = [] - for order_id, symbol, side, qty, price, status, ts, logical_ts in cur.fetchall(): - orders.append( - { - "id": order_id, - "symbol": symbol, - "side": side, - "qty": float(qty) if qty is not None else 0.0, - "price": float(price) if price is not None else 0.0, - "status": status, - "timestamp": _format_utc_ts(ts), - "_logical_time": _format_utc_ts(logical_ts), - } - ) - store["orders"] = orders - - cur.execute( - """ - SELECT id, order_id, symbol, side, qty, price, timestamp, logical_time - FROM paper_trade - WHERE user_id = %s AND run_id = %s - ORDER BY timestamp, id - """ - , - (scope_user, scope_run), - ) - trades = [] - for trade_id, order_id, symbol, side, qty, price, ts, logical_ts in cur.fetchall(): - trades.append( - { - "id": trade_id, - "order_id": order_id, - "symbol": symbol, - "side": side, - "qty": float(qty) if qty is not None else 0.0, - "price": float(price) if price is not None else 0.0, - "timestamp": _format_utc_ts(ts), - "_logical_time": _format_utc_ts(logical_ts), - } - ) - store["trades"] = trades - - cur.execute( - """ - SELECT timestamp, logical_time, equity, pnl - FROM paper_equity_curve - WHERE user_id = %s AND run_id = %s - ORDER BY timestamp - """ - , - (scope_user, scope_run), - ) - equity_curve = [] - for ts, logical_ts, equity, pnl in cur.fetchall(): - equity_curve.append( - { - "timestamp": _format_local_ts(ts), - "_logical_time": _format_local_ts(logical_ts), - "equity": float(equity) if equity is not None else 0.0, - "pnl": float(pnl) if pnl is not None else 0.0, - } - ) - store["equity_curve"] = equity_curve - return store - - def _save_store(self, store, cur=None, user_id: str | None = None, run_id: str | None = None): - scope_user, scope_run = _resolve_scope(user_id, run_id) - if cur is None: - def _persist(cur, _conn): - self._save_store(store, cur=cur, user_id=scope_user, run_id=scope_run) - return run_with_retry(_persist) - - cash = store.get("cash") - if cash is not None: - cur.execute( - """ - INSERT INTO paper_broker_account (user_id, run_id, cash) - VALUES (%s, %s, %s) - ON CONFLICT (user_id, run_id) DO UPDATE - SET cash = EXCLUDED.cash - """, - (scope_user, scope_run, float(cash)), - ) - - positions = store.get("positions") - if isinstance(positions, dict): - symbols = [s for s in positions.keys() if s] - if symbols: - cur.execute( - "DELETE FROM paper_position WHERE user_id = %s AND run_id = %s AND symbol NOT IN %s", - (scope_user, scope_run, tuple(symbols)), - ) - else: - cur.execute( - "DELETE FROM paper_position WHERE user_id = %s AND run_id = %s", - (scope_user, scope_run), - ) - - if symbols: - rows = [] - updated_at = datetime.now(timezone.utc) - for symbol, data in positions.items(): - if not symbol or not isinstance(data, dict): - continue - rows.append( - ( - scope_user, - scope_run, - symbol, - float(data.get("qty", 0.0)), - float(data.get("avg_price", 0.0)), - float(data.get("last_price", 0.0)), - updated_at, - ) - ) - if rows: - execute_values( - cur, - """ - INSERT INTO paper_position ( - user_id, run_id, symbol, qty, avg_price, last_price, updated_at - ) - VALUES %s - ON CONFLICT (user_id, run_id, symbol) DO UPDATE - SET qty = EXCLUDED.qty, - avg_price = EXCLUDED.avg_price, - last_price = EXCLUDED.last_price, - updated_at = EXCLUDED.updated_at - """, - rows, - ) - - orders = store.get("orders") - if isinstance(orders, list) and orders: - rows = [] - for order in orders: - if not isinstance(order, dict): - continue - order_id = order.get("id") - if not order_id: - continue - ts = _parse_ts(order.get("timestamp"), assume_local=False) - logical_ts = _parse_ts(order.get("_logical_time"), assume_local=False) or ts - rows.append( - ( - scope_user, - scope_run, - order_id, - order.get("symbol"), - order.get("side"), - float(order.get("qty", 0.0)), - float(order.get("price", 0.0)), - order.get("status"), - ts, - logical_ts, - ) - ) - if rows: - execute_values( - cur, - """ - INSERT INTO paper_order ( - user_id, run_id, id, symbol, side, qty, price, status, timestamp, logical_time - ) - VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - - trades = store.get("trades") - if isinstance(trades, list) and trades: - rows = [] - for trade in trades: - if not isinstance(trade, dict): - continue - trade_id = trade.get("id") - if not trade_id: - continue - ts = _parse_ts(trade.get("timestamp"), assume_local=False) - logical_ts = _parse_ts(trade.get("_logical_time"), assume_local=False) or ts - rows.append( - ( - scope_user, - scope_run, - trade_id, - trade.get("order_id"), - trade.get("symbol"), - trade.get("side"), - float(trade.get("qty", 0.0)), - float(trade.get("price", 0.0)), - ts, - logical_ts, - ) - ) - if rows: - execute_values( - cur, - """ - INSERT INTO paper_trade ( - user_id, run_id, id, order_id, symbol, side, qty, price, timestamp, logical_time - ) - VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - - equity_curve = store.get("equity_curve") - if isinstance(equity_curve, list) and equity_curve: - rows = [] - for point in equity_curve: - if not isinstance(point, dict): - continue - ts = _parse_ts(point.get("timestamp"), assume_local=True) - logical_ts = _parse_ts(point.get("_logical_time"), assume_local=True) or ts - if ts is None: - continue - rows.append( - ( - scope_user, - scope_run, - ts, - logical_ts, - float(point.get("equity", 0.0)), - float(point.get("pnl", 0.0)), - ) - ) - if rows: - execute_values( - cur, - """ - INSERT INTO paper_equity_curve (user_id, run_id, timestamp, logical_time, equity, pnl) - VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - - def get_funds(self, cur=None): - store = self._load_store(cur=cur) - cash = float(store.get("cash", 0)) - positions = store.get("positions", {}) - positions_value = 0.0 - for position in positions.values(): - qty = float(position.get("qty", 0)) - last_price = float(position.get("last_price", position.get("avg_price", 0))) - positions_value += qty * last_price - total_equity = cash + positions_value - return { - "cash_available": cash, - "invested_value": positions_value, - "cash": cash, - "used_margin": 0.0, - "available": cash, - "net": total_equity, - "total_equity": total_equity, - } - - def get_positions(self, cur=None): - store = self._load_store(cur=cur) - positions = store.get("positions", {}) - return [ - { - "symbol": symbol, - "qty": float(data.get("qty", 0)), - "avg_price": float(data.get("avg_price", 0)), - "last_price": float(data.get("last_price", data.get("avg_price", 0))), - } - for symbol, data in positions.items() - ] - - def get_orders(self, cur=None): - store = self._load_store(cur=cur) - orders = [] - for order in store.get("orders", []): - if isinstance(order, dict): - order = {k: v for k, v in order.items() if k != "_logical_time"} - orders.append(order) - return orders - - def get_trades(self, cur=None): - store = self._load_store(cur=cur) - trades = [] - for trade in store.get("trades", []): - if isinstance(trade, dict): - trade = {k: v for k, v in trade.items() if k != "_logical_time"} - trades.append(trade) - return trades - - def get_equity_curve(self, cur=None): - store = self._load_store(cur=cur) - points = [] - for point in store.get("equity_curve", []): - if isinstance(point, dict): - point = {k: v for k, v in point.items() if k != "_logical_time"} - points.append(point) - return points - - def _update_equity_in_tx( - self, - cur, - prices: dict[str, float], - now: datetime, - logical_time: datetime | None = None, - user_id: str | None = None, - run_id: str | None = None, - ): - store = self._load_store(cur=cur, for_update=True, user_id=user_id, run_id=run_id) - positions = store.get("positions", {}) - for symbol, price in prices.items(): - if symbol in positions: - positions[symbol]["last_price"] = float(price) - - cash = float(store.get("cash", 0)) - positions_value = 0.0 - for symbol, position in positions.items(): - qty = float(position.get("qty", 0)) - price = float(position.get("last_price", position.get("avg_price", 0))) - positions_value += qty * price - - equity = cash + positions_value - pnl = equity - float(self.initial_cash) - ts_for_equity = logical_time or now - store.setdefault("equity_curve", []).append( - { - "timestamp": _format_local_ts(ts_for_equity), - "_logical_time": _format_local_ts(ts_for_equity), - "equity": equity, - "pnl": pnl, - } - ) - store["positions"] = positions - self._save_store(store, cur=cur, user_id=user_id, run_id=run_id) - insert_engine_event( - cur, - "EQUITY_UPDATED", - data={ - "timestamp": _format_utc_ts(ts_for_equity), - "equity": equity, - "pnl": pnl, - }, - ) - return equity - - def update_equity( - self, - prices: dict[str, float], - now: datetime, - cur=None, - logical_time: datetime | None = None, - user_id: str | None = None, - run_id: str | None = None, - ): - if cur is not None: - return self._update_equity_in_tx( - cur, - prices, - now, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - def _op(cur, _conn): - return self._update_equity_in_tx( - cur, - prices, - now, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - return run_with_retry(_op) - - def _place_order_in_tx( - self, - cur, - symbol: str, - side: str, - quantity: float, - price: float | None, - logical_time: datetime | None = None, - user_id: str | None = None, - run_id: str | None = None, - ): - scope_user, scope_run = _resolve_scope(user_id, run_id) - store = self._load_store(cur=cur, for_update=True, user_id=scope_user, run_id=scope_run) - side = side.upper().strip() - qty = float(quantity) - if price is None: - price = fetch_live_price(symbol) - price = float(price) - - logical_ts = logical_time or datetime.utcnow().replace(tzinfo=timezone.utc) - timestamp = logical_ts - timestamp_str = _format_utc_ts(timestamp) - logical_ts_str = _format_utc_ts(logical_ts) - order_id = _deterministic_id( - "ord", - [ - scope_user, - scope_run, - _normalize_ts_for_id(logical_ts), - symbol, - side, - _stable_num(qty), - _stable_num(price), - ], - ) - - order = { - "id": order_id, - "symbol": symbol, - "side": side, - "qty": qty, - "price": price, - "status": "REJECTED", - "timestamp": timestamp_str, - "_logical_time": logical_ts_str, - } - - if qty <= 0 or price <= 0: - store.setdefault("orders", []).append(order) - self._save_store(store, cur=cur, user_id=user_id, run_id=run_id) - insert_engine_event(cur, "ORDER_PLACED", data=order) - return order - - positions = store.get("positions", {}) - cash = float(store.get("cash", 0)) - trade = None - - if side == "BUY": - cost = qty * price - if cash >= cost: - cash -= cost - existing = positions.get(symbol, {"qty": 0.0, "avg_price": 0.0, "last_price": price}) - new_qty = float(existing.get("qty", 0)) + qty - prev_cost = float(existing.get("qty", 0)) * float(existing.get("avg_price", 0)) - avg_price = (prev_cost + cost) / new_qty if new_qty else price - positions[symbol] = { - "qty": new_qty, - "avg_price": avg_price, - "last_price": price, - } - order["status"] = "FILLED" - trade = { - "id": _deterministic_id("trd", [order_id]), - "order_id": order_id, - "symbol": symbol, - "side": side, - "qty": qty, - "price": price, - "timestamp": timestamp_str, - "_logical_time": logical_ts_str, - } - store.setdefault("trades", []).append(trade) - elif side == "SELL": - existing = positions.get(symbol) - if existing and float(existing.get("qty", 0)) >= qty: - cash += qty * price - remaining = float(existing.get("qty", 0)) - qty - if remaining > 0: - existing["qty"] = remaining - existing["last_price"] = price - positions[symbol] = existing - else: - positions.pop(symbol, None) - order["status"] = "FILLED" - trade = { - "id": _deterministic_id("trd", [order_id]), - "order_id": order_id, - "symbol": symbol, - "side": side, - "qty": qty, - "price": price, - "timestamp": timestamp_str, - "_logical_time": logical_ts_str, - } - store.setdefault("trades", []).append(trade) - - store["cash"] = cash - store["positions"] = positions - store.setdefault("orders", []).append(order) - self._save_store(store, cur=cur, user_id=user_id, run_id=run_id) - insert_engine_event(cur, "ORDER_PLACED", data=order) - if trade is not None: - insert_engine_event(cur, "TRADE_EXECUTED", data=trade) - insert_engine_event(cur, "ORDER_FILLED", data={"order_id": order_id}) - return order - - def place_order( - self, - symbol: str, - side: str, - quantity: float, - price: float | None = None, - cur=None, - logical_time: datetime | None = None, - user_id: str | None = None, - run_id: str | None = None, - ): - if cur is not None: - return self._place_order_in_tx( - cur, - symbol, - side, - quantity, - price, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - def _op(cur, _conn): - return self._place_order_in_tx( - cur, - symbol, - side, - quantity, - price, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - return run_with_retry(_op) - diff --git a/indian_paper_trading_strategy_1/engine/config.py b/indian_paper_trading_strategy_1/engine/config.py deleted file mode 100644 index 9321b89..0000000 --- a/indian_paper_trading_strategy_1/engine/config.py +++ /dev/null @@ -1,150 +0,0 @@ -import json -from datetime import datetime - -from indian_paper_trading_strategy.engine.db import db_connection, get_context - -DEFAULT_CONFIG = { - "active": False, - "sip_amount": 0, - "sip_frequency": {"value": 30, "unit": "days"}, - "next_run": None -} - -def _maybe_parse_json(value): - if value is None: - return None - if not isinstance(value, str): - return value - text = value.strip() - if not text: - return None - try: - return json.loads(text) - except Exception: - return value - - -def _format_ts(value: datetime | None): - if value is None: - return None - return value.isoformat() - - -def load_strategy_config(user_id: str | None = None, run_id: str | None = None): - scope_user, scope_run = get_context(user_id, run_id) - with db_connection() as conn: - with conn.cursor() as cur: - cur.execute( - """ - SELECT strategy, sip_amount, sip_frequency_value, sip_frequency_unit, - mode, broker, active, frequency, frequency_days, unit, next_run - FROM strategy_config - WHERE user_id = %s AND run_id = %s - LIMIT 1 - """, - (scope_user, scope_run), - ) - row = cur.fetchone() - if not row: - return DEFAULT_CONFIG.copy() - - cfg = DEFAULT_CONFIG.copy() - cfg["strategy"] = row[0] - cfg["strategy_name"] = row[0] - cfg["sip_amount"] = float(row[1]) if row[1] is not None else cfg.get("sip_amount") - cfg["mode"] = row[4] - cfg["broker"] = row[5] - cfg["active"] = row[6] if row[6] is not None else cfg.get("active") - cfg["frequency"] = _maybe_parse_json(row[7]) - cfg["frequency_days"] = row[8] - cfg["unit"] = row[9] - cfg["next_run"] = _format_ts(row[10]) - if row[2] is not None or row[3] is not None: - cfg["sip_frequency"] = {"value": row[2], "unit": row[3]} - else: - value = cfg.get("frequency") - unit = cfg.get("unit") - if isinstance(value, dict): - unit = value.get("unit", unit) - value = value.get("value") - if value is None and cfg.get("frequency_days") is not None: - value = cfg.get("frequency_days") - unit = unit or "days" - if value is not None and unit: - cfg["sip_frequency"] = {"value": value, "unit": unit} - return cfg - -def save_strategy_config(cfg, user_id: str | None = None, run_id: str | None = None): - scope_user, scope_run = get_context(user_id, run_id) - sip_frequency = cfg.get("sip_frequency") - sip_value = None - sip_unit = None - if isinstance(sip_frequency, dict): - sip_value = sip_frequency.get("value") - sip_unit = sip_frequency.get("unit") - - frequency = cfg.get("frequency") - if not isinstance(frequency, str) and frequency is not None: - frequency = json.dumps(frequency) - - next_run = cfg.get("next_run") - next_run_dt = None - if isinstance(next_run, str): - try: - next_run_dt = datetime.fromisoformat(next_run) - except ValueError: - next_run_dt = None - - strategy = cfg.get("strategy") or cfg.get("strategy_name") - - with db_connection() as conn: - with conn: - with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO strategy_config ( - user_id, - run_id, - strategy, - sip_amount, - sip_frequency_value, - sip_frequency_unit, - mode, - broker, - active, - frequency, - frequency_days, - unit, - next_run - ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (user_id, run_id) DO UPDATE - SET strategy = EXCLUDED.strategy, - sip_amount = EXCLUDED.sip_amount, - sip_frequency_value = EXCLUDED.sip_frequency_value, - sip_frequency_unit = EXCLUDED.sip_frequency_unit, - mode = EXCLUDED.mode, - broker = EXCLUDED.broker, - active = EXCLUDED.active, - frequency = EXCLUDED.frequency, - frequency_days = EXCLUDED.frequency_days, - unit = EXCLUDED.unit, - next_run = EXCLUDED.next_run - """, - ( - scope_user, - scope_run, - strategy, - cfg.get("sip_amount"), - sip_value, - sip_unit, - cfg.get("mode"), - cfg.get("broker"), - cfg.get("active"), - frequency, - cfg.get("frequency_days"), - cfg.get("unit"), - next_run_dt, - ), - ) - diff --git a/indian_paper_trading_strategy_1/engine/data.py b/indian_paper_trading_strategy_1/engine/data.py deleted file mode 100644 index c60aba2..0000000 --- a/indian_paper_trading_strategy_1/engine/data.py +++ /dev/null @@ -1,81 +0,0 @@ -# engine/data.py -from datetime import datetime, timezone -from pathlib import Path -import os -import threading - -import pandas as pd -import yfinance as yf - -ENGINE_ROOT = Path(__file__).resolve().parents[1] -HISTORY_DIR = ENGINE_ROOT / "storage" / "history" -ALLOW_PRICE_CACHE = os.getenv("ALLOW_PRICE_CACHE", "0").strip().lower() in {"1", "true", "yes"} - -_LAST_PRICE: dict[str, dict[str, object]] = {} -_LAST_PRICE_LOCK = threading.Lock() - - -def _set_last_price(ticker: str, price: float, source: str): - now = datetime.now(timezone.utc) - with _LAST_PRICE_LOCK: - _LAST_PRICE[ticker] = {"price": float(price), "source": source, "ts": now} - - -def get_price_snapshot(ticker: str) -> dict[str, object] | None: - with _LAST_PRICE_LOCK: - data = _LAST_PRICE.get(ticker) - if not data: - return None - return dict(data) - - -def _get_last_live_price(ticker: str) -> float | None: - with _LAST_PRICE_LOCK: - data = _LAST_PRICE.get(ticker) - if not data: - return None - if data.get("source") == "live": - return float(data.get("price", 0)) - return None - - -def _cached_last_close(ticker: str) -> float | None: - file = HISTORY_DIR / f"{ticker}.csv" - if not file.exists(): - return None - df = pd.read_csv(file) - if df.empty or "Close" not in df.columns: - return None - return float(df["Close"].iloc[-1]) - - -def fetch_live_price(ticker, allow_cache: bool | None = None): - if allow_cache is None: - allow_cache = ALLOW_PRICE_CACHE - try: - df = yf.download( - ticker, - period="1d", - interval="1m", - auto_adjust=True, - progress=False, - timeout=5, - ) - if df is not None and not df.empty: - price = float(df["Close"].iloc[-1]) - _set_last_price(ticker, price, "live") - return price - except Exception: - pass - - if allow_cache: - last_live = _get_last_live_price(ticker) - if last_live is not None: - return last_live - - cached = _cached_last_close(ticker) - if cached is not None: - _set_last_price(ticker, cached, "cache") - return cached - - raise RuntimeError(f"No live data for {ticker}") diff --git a/indian_paper_trading_strategy_1/engine/engine_runner.py b/indian_paper_trading_strategy_1/engine/engine_runner.py deleted file mode 100644 index 0be5f65..0000000 --- a/indian_paper_trading_strategy_1/engine/engine_runner.py +++ /dev/null @@ -1,198 +0,0 @@ -import time -from datetime import datetime, timezone - -from indian_paper_trading_strategy.engine.db import ( - run_with_retry, - insert_engine_event, - get_default_user_id, - get_active_run_id, - get_running_runs, - engine_context, -) - -def log_event(event: str, data: dict | None = None): - now = datetime.utcnow().replace(tzinfo=timezone.utc) - payload = data or {} - - def _op(cur, _conn): - insert_engine_event(cur, event, data=payload, ts=now) - - run_with_retry(_op) - -def _update_engine_status(user_id: str, run_id: str, status: str): - now = datetime.utcnow().replace(tzinfo=timezone.utc) - - def _op(cur, _conn): - cur.execute( - """ - INSERT INTO engine_status (user_id, run_id, status, last_updated) - VALUES (%s, %s, %s, %s) - ON CONFLICT (user_id, run_id) DO UPDATE - SET status = EXCLUDED.status, - last_updated = EXCLUDED.last_updated - """, - (user_id, run_id, status, now), - ) - - run_with_retry(_op) - -from indian_paper_trading_strategy.engine.config import load_strategy_config, save_strategy_config -from indian_paper_trading_strategy.engine.market import india_market_status -from indian_paper_trading_strategy.engine.execution import try_execute_sip -from indian_paper_trading_strategy.engine.state import load_state -from indian_paper_trading_strategy.engine.broker import PaperBroker -from indian_paper_trading_strategy.engine.data import fetch_live_price -from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm -from indian_paper_trading_strategy.engine.history import load_monthly_close -from indian_paper_trading_strategy.engine.strategy import allocation -from indian_paper_trading_strategy.engine.time_utils import frequency_to_timedelta, normalize_logical_time - -NIFTY = "NIFTYBEES.NS" -GOLD = "GOLDBEES.NS" -SMA_MONTHS = 36 - -def run_engine(user_id: str | None = None, run_id: str | None = None): - print("Strategy engine started") - active_runs: dict[tuple[str, str], bool] = {} - - if run_id and not user_id: - raise ValueError("user_id is required when run_id is provided") - - while True: - try: - if user_id and run_id: - runs = [(user_id, run_id)] - elif user_id: - runs = get_running_runs(user_id) - else: - runs = get_running_runs() - if not runs: - default_user = get_default_user_id() - if default_user: - runs = get_running_runs(default_user) - - seen = set() - for scope_user, scope_run in runs: - if not scope_user or not scope_run: - continue - seen.add((scope_user, scope_run)) - with engine_context(scope_user, scope_run): - cfg = load_strategy_config(user_id=scope_user, run_id=scope_run) - if not cfg.get("active"): - continue - - strategy_name = cfg.get("strategy_name", "golden_nifty") - sip_amount = cfg.get("sip_amount", 0) - configured_frequency = cfg.get("sip_frequency") or {} - if not isinstance(configured_frequency, dict): - configured_frequency = {} - frequency_value = int(configured_frequency.get("value", cfg.get("frequency", 0))) - frequency_unit = configured_frequency.get("unit", cfg.get("unit", "days")) - frequency_info = {"value": frequency_value, "unit": frequency_unit} - frequency_label = f"{frequency_value} {frequency_unit}" - - if not active_runs.get((scope_user, scope_run)): - log_event( - "ENGINE_START", - { - "strategy": strategy_name, - "sip_amount": sip_amount, - "frequency": frequency_label, - }, - ) - active_runs[(scope_user, scope_run)] = True - - _update_engine_status(scope_user, scope_run, "RUNNING") - - market_open, _ = india_market_status() - if not market_open: - log_event("MARKET_CLOSED", {"reason": "Outside market hours"}) - continue - - now = datetime.now() - mode = (cfg.get("mode") or "PAPER").strip().upper() - if mode not in {"PAPER", "LIVE"}: - mode = "PAPER" - state = load_state(mode=mode) - initial_cash = float(state.get("initial_cash") or 0.0) - broker = PaperBroker(initial_cash=initial_cash) if mode == "PAPER" else None - - nifty_price = fetch_live_price(NIFTY) - gold_price = fetch_live_price(GOLD) - - next_run = cfg.get("next_run") - if next_run is None or now >= datetime.fromisoformat(next_run): - nifty_hist = load_monthly_close(NIFTY) - gold_hist = load_monthly_close(GOLD) - - nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1] - gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1] - - eq_w, gd_w = allocation( - sp_price=nifty_price, - gd_price=gold_price, - sp_sma=nifty_sma, - gd_sma=gold_sma, - ) - - weights = {"equity": eq_w, "gold": gd_w} - state, executed = try_execute_sip( - now=now, - market_open=True, - sip_interval=frequency_to_timedelta(frequency_info).total_seconds(), - sip_amount=sip_amount, - sp_price=nifty_price, - gd_price=gold_price, - eq_w=eq_w, - gd_w=gd_w, - broker=broker, - mode=mode, - ) - - if executed: - log_event( - "SIP_TRIGGERED", - { - "date": now.date().isoformat(), - "allocation": weights, - "cash_used": sip_amount, - }, - ) - portfolio_value = ( - state["nifty_units"] * nifty_price - + state["gold_units"] * gold_price - ) - log_event( - "PORTFOLIO_UPDATED", - { - "nifty_units": state["nifty_units"], - "gold_units": state["gold_units"], - "portfolio_value": portfolio_value, - }, - ) - cfg["next_run"] = (now + frequency_to_timedelta(frequency_info)).isoformat() - save_strategy_config(cfg, user_id=scope_user, run_id=scope_run) - - if should_log_mtm(None, now): - state = load_state(mode=mode) - log_mtm( - nifty_units=state["nifty_units"], - gold_units=state["gold_units"], - nifty_price=nifty_price, - gold_price=gold_price, - total_invested=state["total_invested"], - logical_time=normalize_logical_time(now), - ) - - for key in list(active_runs.keys()): - if key not in seen: - active_runs.pop(key, None) - - time.sleep(30) - except Exception as e: - log_event("ENGINE_ERROR", {"error": str(e)}) - raise - -if __name__ == "__main__": - run_engine() - diff --git a/indian_paper_trading_strategy_1/engine/execution.py b/indian_paper_trading_strategy_1/engine/execution.py deleted file mode 100644 index e135a24..0000000 --- a/indian_paper_trading_strategy_1/engine/execution.py +++ /dev/null @@ -1,157 +0,0 @@ -# engine/execution.py -from datetime import datetime, timezone -from indian_paper_trading_strategy.engine.state import load_state, save_state - -from indian_paper_trading_strategy.engine.broker import Broker -from indian_paper_trading_strategy.engine.ledger import log_event, event_exists -from indian_paper_trading_strategy.engine.db import run_with_retry -from indian_paper_trading_strategy.engine.time_utils import compute_logical_time - -def _as_float(value): - if hasattr(value, "item"): - try: - return float(value.item()) - except Exception: - pass - if hasattr(value, "iloc"): - try: - return float(value.iloc[-1]) - except Exception: - pass - return float(value) - -def _local_tz(): - return datetime.now().astimezone().tzinfo - -def try_execute_sip( - now, - market_open, - sip_interval, - sip_amount, - sp_price, - gd_price, - eq_w, - gd_w, - broker: Broker | None = None, - mode: str | None = "LIVE", -): - def _op(cur, _conn): - if now.tzinfo is None: - now_ts = now.replace(tzinfo=_local_tz()) - else: - now_ts = now - event_ts = now_ts - log_event("DEBUG_ENTER_TRY_EXECUTE", { - "now": now_ts.isoformat(), - }, cur=cur, ts=event_ts) - - state = load_state(mode=mode, cur=cur, for_update=True) - - force_execute = state.get("last_sip_ts") is None - - if not market_open: - return state, False - - last = state.get("last_sip_ts") or state.get("last_run") - if last and not force_execute: - try: - last_dt = datetime.fromisoformat(last) - except ValueError: - last_dt = None - if last_dt: - if last_dt.tzinfo is None: - last_dt = last_dt.replace(tzinfo=_local_tz()) - if now_ts.tzinfo and last_dt.tzinfo and last_dt.tzinfo != now_ts.tzinfo: - last_dt = last_dt.astimezone(now_ts.tzinfo) - if last_dt and (now_ts - last_dt).total_seconds() < sip_interval: - return state, False - - logical_time = compute_logical_time(now_ts, last, sip_interval) - if event_exists("SIP_EXECUTED", logical_time, cur=cur): - return state, False - - sp_price_val = _as_float(sp_price) - gd_price_val = _as_float(gd_price) - eq_w_val = _as_float(eq_w) - gd_w_val = _as_float(gd_w) - sip_amount_val = _as_float(sip_amount) - - nifty_qty = (sip_amount_val * eq_w_val) / sp_price_val - gold_qty = (sip_amount_val * gd_w_val) / gd_price_val - - if broker is None: - return state, False - - funds = broker.get_funds(cur=cur) - cash = funds.get("cash") - if cash is not None and float(cash) < sip_amount_val: - return state, False - - log_event("DEBUG_EXECUTION_DECISION", { - "force_execute": force_execute, - "last_sip_ts": state.get("last_sip_ts"), - "now": now_ts.isoformat(), - }, cur=cur, ts=event_ts) - - nifty_order = broker.place_order( - "NIFTYBEES.NS", - "BUY", - nifty_qty, - sp_price_val, - cur=cur, - logical_time=logical_time, - ) - gold_order = broker.place_order( - "GOLDBEES.NS", - "BUY", - gold_qty, - gd_price_val, - cur=cur, - logical_time=logical_time, - ) - orders = [nifty_order, gold_order] - executed = all( - isinstance(order, dict) and order.get("status") == "FILLED" - for order in orders - ) - if not executed: - return state, False - assert len(orders) > 0, "executed=True but no broker orders placed" - - funds_after = broker.get_funds(cur=cur) - cash_after = funds_after.get("cash") - if cash_after is not None: - state["cash"] = float(cash_after) - - state["nifty_units"] += nifty_qty - state["gold_units"] += gold_qty - state["total_invested"] += sip_amount_val - state["last_sip_ts"] = now_ts.isoformat() - state["last_run"] = now_ts.isoformat() - - save_state( - state, - mode=mode, - cur=cur, - emit_event=True, - event_meta={"source": "sip"}, - ) - - log_event( - "SIP_EXECUTED", - { - "nifty_units": nifty_qty, - "gold_units": gold_qty, - "nifty_price": sp_price_val, - "gold_price": gd_price_val, - "amount": sip_amount_val, - }, - cur=cur, - ts=event_ts, - logical_time=logical_time, - ) - - return state, True - - return run_with_retry(_op) - diff --git a/indian_paper_trading_strategy_1/engine/history.py b/indian_paper_trading_strategy_1/engine/history.py deleted file mode 100644 index 28e4697..0000000 --- a/indian_paper_trading_strategy_1/engine/history.py +++ /dev/null @@ -1,34 +0,0 @@ -# engine/history.py -import yfinance as yf -import pandas as pd -from pathlib import Path - -ENGINE_ROOT = Path(__file__).resolve().parents[1] -STORAGE_DIR = ENGINE_ROOT / "storage" -STORAGE_DIR.mkdir(exist_ok=True) - -CACHE_DIR = STORAGE_DIR / "history" -CACHE_DIR.mkdir(exist_ok=True) - -def load_monthly_close(ticker, years=10): - file = CACHE_DIR / f"{ticker}.csv" - - if file.exists(): - df = pd.read_csv(file, parse_dates=["Date"], index_col="Date") - return df["Close"] - - df = yf.download( - ticker, - period=f"{years}y", - auto_adjust=True, - progress=False, - timeout=5, - ) - - if df.empty: - raise RuntimeError(f"No history for {ticker}") - - series = df["Close"].resample("M").last() - series.to_csv(file, header=["Close"]) - - return series diff --git a/indian_paper_trading_strategy_1/engine/ledger.py b/indian_paper_trading_strategy_1/engine/ledger.py deleted file mode 100644 index 874a5fa..0000000 --- a/indian_paper_trading_strategy_1/engine/ledger.py +++ /dev/null @@ -1,113 +0,0 @@ -# engine/ledger.py -from datetime import datetime, timezone - -from indian_paper_trading_strategy.engine.db import insert_engine_event, run_with_retry, get_context -from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time - - -def _event_exists_in_tx(cur, event, logical_time, user_id: str | None = None, run_id: str | None = None): - scope_user, scope_run = get_context(user_id, run_id) - logical_ts = normalize_logical_time(logical_time) - cur.execute( - """ - SELECT 1 - FROM event_ledger - WHERE user_id = %s AND run_id = %s AND event = %s AND logical_time = %s - LIMIT 1 - """, - (scope_user, scope_run, event, logical_ts), - ) - return cur.fetchone() is not None - - -def event_exists(event, logical_time, *, cur=None, user_id: str | None = None, run_id: str | None = None): - if cur is not None: - return _event_exists_in_tx(cur, event, logical_time, user_id=user_id, run_id=run_id) - - def _op(cur, _conn): - return _event_exists_in_tx(cur, event, logical_time, user_id=user_id, run_id=run_id) - - return run_with_retry(_op) - - -def _log_event_in_tx( - cur, - event, - payload, - ts, - logical_time=None, - user_id: str | None = None, - run_id: str | None = None, -): - scope_user, scope_run = get_context(user_id, run_id) - logical_ts = normalize_logical_time(logical_time or ts) - cur.execute( - """ - INSERT INTO event_ledger ( - user_id, - run_id, - timestamp, - logical_time, - event, - nifty_units, - gold_units, - nifty_price, - gold_price, - amount - ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT DO NOTHING - """, - ( - scope_user, - scope_run, - ts, - logical_ts, - event, - payload.get("nifty_units"), - payload.get("gold_units"), - payload.get("nifty_price"), - payload.get("gold_price"), - payload.get("amount"), - ), - ) - if cur.rowcount: - insert_engine_event(cur, event, data=payload, ts=ts) - - -def log_event( - event, - payload, - *, - cur=None, - ts=None, - logical_time=None, - user_id: str | None = None, - run_id: str | None = None, -): - now = ts or logical_time or datetime.utcnow().replace(tzinfo=timezone.utc) - if cur is not None: - _log_event_in_tx( - cur, - event, - payload, - now, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - return - - def _op(cur, _conn): - _log_event_in_tx( - cur, - event, - payload, - now, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - return run_with_retry(_op) - diff --git a/indian_paper_trading_strategy_1/engine/market.py b/indian_paper_trading_strategy_1/engine/market.py deleted file mode 100644 index c16f5de..0000000 --- a/indian_paper_trading_strategy_1/engine/market.py +++ /dev/null @@ -1,42 +0,0 @@ -# engine/market.py -from datetime import datetime, time as dtime, timedelta -import pytz - -_MARKET_TZ = pytz.timezone("Asia/Kolkata") -_OPEN_T = dtime(9, 15) -_CLOSE_T = dtime(15, 30) - -def _as_market_tz(value: datetime) -> datetime: - if value.tzinfo is None: - return _MARKET_TZ.localize(value) - return value.astimezone(_MARKET_TZ) - -def is_market_open(now: datetime) -> bool: - now = _as_market_tz(now) - return now.weekday() < 5 and _OPEN_T <= now.time() <= _CLOSE_T - -def india_market_status(): - now = datetime.now(_MARKET_TZ) - - return is_market_open(now), now - -def next_market_open_after(value: datetime) -> datetime: - current = _as_market_tz(value) - while current.weekday() >= 5: - current = current + timedelta(days=1) - current = current.replace(hour=_OPEN_T.hour, minute=_OPEN_T.minute, second=0, microsecond=0) - if current.time() < _OPEN_T: - return current.replace(hour=_OPEN_T.hour, minute=_OPEN_T.minute, second=0, microsecond=0) - if current.time() > _CLOSE_T: - current = current + timedelta(days=1) - while current.weekday() >= 5: - current = current + timedelta(days=1) - return current.replace(hour=_OPEN_T.hour, minute=_OPEN_T.minute, second=0, microsecond=0) - return current - -def align_to_market_open(value: datetime) -> datetime: - current = _as_market_tz(value) - aligned = current if is_market_open(current) else next_market_open_after(current) - if value.tzinfo is None: - return aligned.replace(tzinfo=None) - return aligned diff --git a/indian_paper_trading_strategy_1/engine/mtm.py b/indian_paper_trading_strategy_1/engine/mtm.py deleted file mode 100644 index 90d6b1a..0000000 --- a/indian_paper_trading_strategy_1/engine/mtm.py +++ /dev/null @@ -1,154 +0,0 @@ -from datetime import datetime, timezone -from pathlib import Path - -from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_context -from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time - -ENGINE_ROOT = Path(__file__).resolve().parents[1] -STORAGE_DIR = ENGINE_ROOT / "storage" -MTM_FILE = STORAGE_DIR / "mtm_ledger.csv" - -MTM_INTERVAL_SECONDS = 60 - -def _log_mtm_in_tx( - cur, - nifty_units, - gold_units, - nifty_price, - gold_price, - total_invested, - ts, - logical_time=None, - user_id: str | None = None, - run_id: str | None = None, -): - scope_user, scope_run = get_context(user_id, run_id) - logical_ts = normalize_logical_time(logical_time or ts) - nifty_value = nifty_units * nifty_price - gold_value = gold_units * gold_price - portfolio_value = nifty_value + gold_value - pnl = portfolio_value - total_invested - - row = { - "timestamp": ts.isoformat(), - "logical_time": logical_ts.isoformat(), - "nifty_units": nifty_units, - "gold_units": gold_units, - "nifty_price": nifty_price, - "gold_price": gold_price, - "nifty_value": nifty_value, - "gold_value": gold_value, - "portfolio_value": portfolio_value, - "total_invested": total_invested, - "pnl": pnl, - } - cur.execute( - """ - INSERT INTO mtm_ledger ( - user_id, - run_id, - timestamp, - logical_time, - nifty_units, - gold_units, - nifty_price, - gold_price, - nifty_value, - gold_value, - portfolio_value, - total_invested, - pnl - ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT DO NOTHING - """, - ( - scope_user, - scope_run, - ts, - logical_ts, - row["nifty_units"], - row["gold_units"], - row["nifty_price"], - row["gold_price"], - row["nifty_value"], - row["gold_value"], - row["portfolio_value"], - row["total_invested"], - row["pnl"], - ), - ) - if cur.rowcount: - insert_engine_event(cur, "MTM_UPDATED", data=row, ts=ts) - return portfolio_value, pnl - -def log_mtm( - nifty_units, - gold_units, - nifty_price, - gold_price, - total_invested, - *, - cur=None, - logical_time=None, - user_id: str | None = None, - run_id: str | None = None, -): - ts = logical_time or datetime.now(timezone.utc) - if cur is not None: - return _log_mtm_in_tx( - cur, - nifty_units, - gold_units, - nifty_price, - gold_price, - total_invested, - ts, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - def _op(cur, _conn): - return _log_mtm_in_tx( - cur, - nifty_units, - gold_units, - nifty_price, - gold_price, - total_invested, - ts, - logical_time=logical_time, - user_id=user_id, - run_id=run_id, - ) - - return run_with_retry(_op) - -def _get_last_mtm_ts(user_id: str | None = None, run_id: str | None = None): - scope_user, scope_run = get_context(user_id, run_id) - with db_connection() as conn: - with conn.cursor() as cur: - cur.execute( - "SELECT MAX(timestamp) FROM mtm_ledger WHERE user_id = %s AND run_id = %s", - (scope_user, scope_run), - ) - row = cur.fetchone() - if not row or row[0] is None: - return None - return row[0].astimezone().replace(tzinfo=None) - -def should_log_mtm(df, now, user_id: str | None = None, run_id: str | None = None): - if df is None: - last_ts = _get_last_mtm_ts(user_id=user_id, run_id=run_id) - if last_ts is None: - return True - return (now - last_ts).total_seconds() >= MTM_INTERVAL_SECONDS - if getattr(df, "empty", False): - return True - try: - last_ts = datetime.fromisoformat(str(df.iloc[-1]["timestamp"])) - except Exception: - return True - return (now - last_ts).total_seconds() >= MTM_INTERVAL_SECONDS - diff --git a/indian_paper_trading_strategy_1/engine/runner.py b/indian_paper_trading_strategy_1/engine/runner.py deleted file mode 100644 index e0e6a6c..0000000 --- a/indian_paper_trading_strategy_1/engine/runner.py +++ /dev/null @@ -1,518 +0,0 @@ -import os -import threading -import time -from datetime import datetime, timedelta, timezone - -from indian_paper_trading_strategy.engine.market import is_market_open, align_to_market_open -from indian_paper_trading_strategy.engine.execution import try_execute_sip -from indian_paper_trading_strategy.engine.broker import PaperBroker -from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm -from indian_paper_trading_strategy.engine.state import load_state -from indian_paper_trading_strategy.engine.data import fetch_live_price -from indian_paper_trading_strategy.engine.history import load_monthly_close -from indian_paper_trading_strategy.engine.strategy import allocation -from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time - -from indian_paper_trading_strategy.engine.db import db_transaction, insert_engine_event, run_with_retry, get_context, set_context - - -def _update_engine_status(user_id: str, run_id: str, status: str): - now = datetime.utcnow().replace(tzinfo=timezone.utc) - - def _op(cur, _conn): - cur.execute( - """ - INSERT INTO engine_status (user_id, run_id, status, last_updated) - VALUES (%s, %s, %s, %s) - ON CONFLICT (user_id, run_id) DO UPDATE - SET status = EXCLUDED.status, - last_updated = EXCLUDED.last_updated - """, - (user_id, run_id, status, now), - ) - - run_with_retry(_op) - -NIFTY = "NIFTYBEES.NS" -GOLD = "GOLDBEES.NS" -SMA_MONTHS = 36 - -_DEFAULT_ENGINE_STATE = { - "state": "STOPPED", - "run_id": None, - "user_id": None, - "last_heartbeat_ts": None, -} - -_ENGINE_STATES = {} -_ENGINE_STATES_LOCK = threading.Lock() - -_RUNNERS = {} -_RUNNERS_LOCK = threading.Lock() - -engine_state = _ENGINE_STATES - - -def _state_key(user_id: str, run_id: str): - return (user_id, run_id) - - -def _get_state(user_id: str, run_id: str): - key = _state_key(user_id, run_id) - with _ENGINE_STATES_LOCK: - state = _ENGINE_STATES.get(key) - if state is None: - state = dict(_DEFAULT_ENGINE_STATE) - state["user_id"] = user_id - state["run_id"] = run_id - _ENGINE_STATES[key] = state - return state - - -def _set_state(user_id: str, run_id: str, **updates): - key = _state_key(user_id, run_id) - with _ENGINE_STATES_LOCK: - state = _ENGINE_STATES.get(key) - if state is None: - state = dict(_DEFAULT_ENGINE_STATE) - state["user_id"] = user_id - state["run_id"] = run_id - _ENGINE_STATES[key] = state - state.update(updates) - - -def get_engine_state(user_id: str, run_id: str): - state = _get_state(user_id, run_id) - return dict(state) - -def log_event( - event: str, - data: dict | None = None, - message: str | None = None, - meta: dict | None = None, -): - entry = { - "ts": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(), - "event": event, - } - if message is not None or meta is not None: - entry["message"] = message or "" - entry["meta"] = meta or {} - else: - entry["data"] = data or {} - event_ts = datetime.fromisoformat(entry["ts"].replace("Z", "+00:00")) - data = entry.get("data") if "data" in entry else None - meta = entry.get("meta") if "meta" in entry else None - - def _op(cur, _conn): - insert_engine_event( - cur, - entry.get("event"), - data=data, - message=entry.get("message"), - meta=meta, - ts=event_ts, - ) - - run_with_retry(_op) - -def sleep_with_heartbeat( - total_seconds: int, - stop_event: threading.Event, - user_id: str, - run_id: str, - step_seconds: int = 5, -): - remaining = total_seconds - while remaining > 0 and not stop_event.is_set(): - time.sleep(min(step_seconds, remaining)) - _set_state(user_id, run_id, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z") - remaining -= step_seconds - -def _clear_runner(user_id: str, run_id: str): - key = _state_key(user_id, run_id) - with _RUNNERS_LOCK: - _RUNNERS.pop(key, None) - -def can_execute(now: datetime) -> tuple[bool, str]: - if not is_market_open(now): - return False, "MARKET_CLOSED" - return True, "OK" - -def _engine_loop(config, stop_event: threading.Event): - print("Strategy engine started with config:", config) - - user_id = config.get("user_id") - run_id = config.get("run_id") - scope_user, scope_run = get_context(user_id, run_id) - set_context(scope_user, scope_run) - - strategy_name = config.get("strategy_name") or config.get("strategy") or "golden_nifty" - sip_amount = config["sip_amount"] - configured_frequency = config.get("sip_frequency") or {} - if not isinstance(configured_frequency, dict): - configured_frequency = {} - frequency_value = int(configured_frequency.get("value", config.get("frequency", 0))) - frequency_unit = configured_frequency.get("unit", config.get("unit", "days")) - frequency_label = f"{frequency_value} {frequency_unit}" - emit_event_cb = config.get("emit_event") - if not callable(emit_event_cb): - emit_event_cb = None - debug_enabled = os.getenv("ENGINE_DEBUG", "1").strip().lower() not in {"0", "false", "no"} - - def debug_event(event: str, message: str, meta: dict | None = None): - if not debug_enabled: - return - try: - log_event(event=event, message=message, meta=meta or {}) - except Exception: - pass - if emit_event_cb: - emit_event_cb(event=event, message=message, meta=meta or {}) - print(f"[ENGINE] {event} {message} {meta or {}}", flush=True) - mode = (config.get("mode") or "LIVE").strip().upper() - if mode not in {"PAPER", "LIVE"}: - mode = "LIVE" - broker_type = config.get("broker") or "paper" - if broker_type != "paper": - broker_type = "paper" - if broker_type == "paper": - mode = "PAPER" - initial_cash = float(config.get("initial_cash", 0)) - broker = PaperBroker(initial_cash=initial_cash) - log_event( - event="DEBUG_PAPER_STORE_PATH", - message="Paper broker store path", - meta={ - "cwd": os.getcwd(), - "paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH", - "abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A", - }, - ) - if emit_event_cb: - emit_event_cb( - event="DEBUG_PAPER_STORE_PATH", - message="Paper broker store path", - meta={ - "cwd": os.getcwd(), - "paper_store_path": str(broker.store_path) if hasattr(broker, "store_path") else "NO_STORE_PATH", - "abs_store_path": os.path.abspath(str(broker.store_path)) if hasattr(broker, "store_path") else "N/A", - }, - ) - - log_event("ENGINE_START", { - "strategy": strategy_name, - "sip_amount": sip_amount, - "frequency": frequency_label, - }) - debug_event("ENGINE_START_DEBUG", "engine loop started", {"run_id": scope_run, "user_id": scope_user}) - - _set_state( - scope_user, - scope_run, - state="RUNNING", - last_heartbeat_ts=datetime.utcnow().isoformat() + "Z", - ) - _update_engine_status(scope_user, scope_run, "RUNNING") - - try: - while not stop_event.is_set(): - _set_state(scope_user, scope_run, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z") - _update_engine_status(scope_user, scope_run, "RUNNING") - - state = load_state(mode=mode) - debug_event( - "STATE_LOADED", - "loaded engine state", - { - "last_sip_ts": state.get("last_sip_ts"), - "last_run": state.get("last_run"), - "cash": state.get("cash"), - "total_invested": state.get("total_invested"), - }, - ) - state_frequency = state.get("sip_frequency") - if not isinstance(state_frequency, dict): - state_frequency = {"value": frequency_value, "unit": frequency_unit} - freq = int(state_frequency.get("value", frequency_value)) - unit = state_frequency.get("unit", frequency_unit) - frequency_label = f"{freq} {unit}" - if unit == "minutes": - delta = timedelta(minutes=freq) - else: - delta = timedelta(days=freq) - - # Gate 2: time to SIP - last_run = state.get("last_run") or state.get("last_sip_ts") - is_first_run = last_run is None - now = datetime.now() - debug_event( - "ENGINE_LOOP_TICK", - "engine loop tick", - {"now": now.isoformat(), "frequency": frequency_label}, - ) - - if last_run and not is_first_run: - next_run = datetime.fromisoformat(last_run) + delta - next_run = align_to_market_open(next_run) - if now < next_run: - log_event( - event="SIP_WAITING", - message="Waiting for next SIP window", - meta={ - "last_run": last_run, - "next_eligible": next_run.isoformat(), - "now": now.isoformat(), - "frequency": frequency_label, - }, - ) - if emit_event_cb: - emit_event_cb( - event="SIP_WAITING", - message="Waiting for next SIP window", - meta={ - "last_run": last_run, - "next_eligible": next_run.isoformat(), - "now": now.isoformat(), - "frequency": frequency_label, - }, - ) - sleep_with_heartbeat(60, stop_event, scope_user, scope_run) - continue - - try: - debug_event("PRICE_FETCH_START", "fetching live prices", {"tickers": [NIFTY, GOLD]}) - nifty_price = fetch_live_price(NIFTY) - gold_price = fetch_live_price(GOLD) - debug_event( - "PRICE_FETCHED", - "fetched live prices", - {"nifty_price": float(nifty_price), "gold_price": float(gold_price)}, - ) - except Exception as exc: - debug_event("PRICE_FETCH_ERROR", "live price fetch failed", {"error": str(exc)}) - sleep_with_heartbeat(30, stop_event, scope_user, scope_run) - continue - - try: - nifty_hist = load_monthly_close(NIFTY) - gold_hist = load_monthly_close(GOLD) - except Exception as exc: - debug_event("HISTORY_LOAD_ERROR", "history load failed", {"error": str(exc)}) - sleep_with_heartbeat(30, stop_event, scope_user, scope_run) - continue - - nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1] - gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1] - - eq_w, gd_w = allocation( - sp_price=nifty_price, - gd_price=gold_price, - sp_sma=nifty_sma, - gd_sma=gold_sma - ) - debug_event( - "WEIGHTS_COMPUTED", - "computed allocation weights", - {"equity_weight": float(eq_w), "gold_weight": float(gd_w)}, - ) - - weights = {"equity": eq_w, "gold": gd_w} - allowed, reason = can_execute(now) - executed = False - if not allowed: - log_event( - event="EXECUTION_BLOCKED", - message="Execution blocked by market gate", - meta={ - "reason": reason, - "eligible_since": last_run, - "checked_at": now.isoformat(), - }, - ) - debug_event("MARKET_GATE", "market closed", {"reason": reason}) - if emit_event_cb: - emit_event_cb( - event="EXECUTION_BLOCKED", - message="Execution blocked by market gate", - meta={ - "reason": reason, - "eligible_since": last_run, - "checked_at": now.isoformat(), - }, - ) - else: - log_event( - event="DEBUG_BEFORE_TRY_EXECUTE", - message="About to call try_execute_sip", - meta={ - "last_run": last_run, - "frequency": frequency_label, - "allowed": allowed, - "reason": reason, - "sip_amount": sip_amount, - "broker": type(broker).__name__, - "now": now.isoformat(), - }, - ) - if emit_event_cb: - emit_event_cb( - event="DEBUG_BEFORE_TRY_EXECUTE", - message="About to call try_execute_sip", - meta={ - "last_run": last_run, - "frequency": frequency_label, - "allowed": allowed, - "reason": reason, - "sip_amount": sip_amount, - "broker": type(broker).__name__, - "now": now.isoformat(), - }, - ) - debug_event( - "TRY_EXECUTE_START", - "calling try_execute_sip", - {"sip_interval_sec": delta.total_seconds(), "sip_amount": sip_amount}, - ) - state, executed = try_execute_sip( - now=now, - market_open=True, - sip_interval=delta.total_seconds(), - sip_amount=sip_amount, - sp_price=nifty_price, - gd_price=gold_price, - eq_w=eq_w, - gd_w=gd_w, - broker=broker, - mode=mode, - ) - log_event( - event="DEBUG_AFTER_TRY_EXECUTE", - message="Returned from try_execute_sip", - meta={ - "executed": executed, - "state_last_run": state.get("last_run"), - "state_last_sip_ts": state.get("last_sip_ts"), - }, - ) - if emit_event_cb: - emit_event_cb( - event="DEBUG_AFTER_TRY_EXECUTE", - message="Returned from try_execute_sip", - meta={ - "executed": executed, - "state_last_run": state.get("last_run"), - "state_last_sip_ts": state.get("last_sip_ts"), - }, - ) - debug_event( - "TRY_EXECUTE_DONE", - "try_execute_sip finished", - {"executed": executed, "last_run": state.get("last_run")}, - ) - - if executed: - log_event("SIP_TRIGGERED", { - "date": now.date().isoformat(), - "allocation": weights, - "cash_used": sip_amount - }) - debug_event("SIP_TRIGGERED", "sip executed", {"cash_used": sip_amount}) - portfolio_value = ( - state["nifty_units"] * nifty_price - + state["gold_units"] * gold_price - ) - log_event("PORTFOLIO_UPDATED", { - "nifty_units": state["nifty_units"], - "gold_units": state["gold_units"], - "portfolio_value": portfolio_value - }) - print("SIP executed at", now) - - if should_log_mtm(None, now): - logical_time = normalize_logical_time(now) - with db_transaction() as cur: - log_mtm( - nifty_units=state["nifty_units"], - gold_units=state["gold_units"], - nifty_price=nifty_price, - gold_price=gold_price, - total_invested=state["total_invested"], - cur=cur, - logical_time=logical_time, - ) - broker.update_equity( - {NIFTY: nifty_price, GOLD: gold_price}, - now, - cur=cur, - logical_time=logical_time, - ) - - sleep_with_heartbeat(30, stop_event, scope_user, scope_run) - except Exception as e: - _set_state(scope_user, scope_run, state="ERROR", last_heartbeat_ts=datetime.utcnow().isoformat() + "Z") - _update_engine_status(scope_user, scope_run, "ERROR") - log_event("ENGINE_ERROR", {"error": str(e)}) - raise - - log_event("ENGINE_STOP") - _set_state( - scope_user, - scope_run, - state="STOPPED", - last_heartbeat_ts=datetime.utcnow().isoformat() + "Z", - ) - _update_engine_status(scope_user, scope_run, "STOPPED") - print("Strategy engine stopped") - _clear_runner(scope_user, scope_run) - -def start_engine(config): - user_id = config.get("user_id") - run_id = config.get("run_id") - if not user_id: - raise ValueError("user_id is required to start engine") - if not run_id: - raise ValueError("run_id is required to start engine") - - with _RUNNERS_LOCK: - key = _state_key(user_id, run_id) - runner = _RUNNERS.get(key) - if runner and runner["thread"].is_alive(): - return False - - stop_event = threading.Event() - thread = threading.Thread( - target=_engine_loop, - args=(config, stop_event), - daemon=True, - ) - _RUNNERS[key] = {"thread": thread, "stop_event": stop_event} - thread.start() - return True - -def stop_engine(user_id: str, run_id: str | None = None, timeout: float | None = 10.0): - runners = [] - with _RUNNERS_LOCK: - if run_id: - key = _state_key(user_id, run_id) - runner = _RUNNERS.get(key) - if runner: - runners.append((key, runner)) - else: - for key, runner in list(_RUNNERS.items()): - if key[0] == user_id: - runners.append((key, runner)) - for _key, runner in runners: - runner["stop_event"].set() - stopped_all = True - for key, runner in runners: - thread = runner["thread"] - if timeout is not None: - thread.join(timeout=timeout) - stopped = not thread.is_alive() - if stopped: - _clear_runner(key[0], key[1]) - else: - stopped_all = False - return stopped_all - diff --git a/indian_paper_trading_strategy_1/engine/state.py b/indian_paper_trading_strategy_1/engine/state.py deleted file mode 100644 index 9ec4ccc..0000000 --- a/indian_paper_trading_strategy_1/engine/state.py +++ /dev/null @@ -1,303 +0,0 @@ -# engine/state.py -from datetime import datetime, timezone - -from indian_paper_trading_strategy.engine.db import db_connection, insert_engine_event, run_with_retry, get_context - -DEFAULT_STATE = { - "initial_cash": 0.0, - "cash": 0.0, - "total_invested": 0.0, - "nifty_units": 0.0, - "gold_units": 0.0, - "last_sip_ts": None, - "last_run": None, - "sip_frequency": None, -} - -DEFAULT_PAPER_STATE = { - **DEFAULT_STATE, - "initial_cash": 1_000_000.0, - "cash": 1_000_000.0, - "sip_frequency": {"value": 30, "unit": "days"}, -} - -def _state_key(mode: str | None): - key = (mode or "LIVE").strip().upper() - return "PAPER" if key == "PAPER" else "LIVE" - -def _default_state(mode: str | None): - if _state_key(mode) == "PAPER": - return DEFAULT_PAPER_STATE.copy() - return DEFAULT_STATE.copy() - -def _local_tz(): - return datetime.now().astimezone().tzinfo - -def _format_local_ts(value: datetime | None): - if value is None: - return None - return value.astimezone(_local_tz()).replace(tzinfo=None).isoformat() - -def _parse_ts(value): - if value is None: - return None - if isinstance(value, datetime): - if value.tzinfo is None: - return value.replace(tzinfo=_local_tz()) - return value - if isinstance(value, str): - text = value.strip() - if not text: - return None - try: - parsed = datetime.fromisoformat(text.replace("Z", "+00:00")) - except ValueError: - return None - if parsed.tzinfo is None: - parsed = parsed.replace(tzinfo=_local_tz()) - return parsed - return None - -def _resolve_scope(user_id: str | None, run_id: str | None): - return get_context(user_id, run_id) - - -def load_state( - mode: str | None = "LIVE", - *, - cur=None, - for_update: bool = False, - user_id: str | None = None, - run_id: str | None = None, -): - scope_user, scope_run = _resolve_scope(user_id, run_id) - key = _state_key(mode) - if key == "PAPER": - if cur is None: - with db_connection() as conn: - with conn.cursor() as cur: - return load_state( - mode=mode, - cur=cur, - for_update=for_update, - user_id=scope_user, - run_id=scope_run, - ) - lock_clause = " FOR UPDATE" if for_update else "" - cur.execute( - f""" - SELECT initial_cash, cash, total_invested, nifty_units, gold_units, - last_sip_ts, last_run, sip_frequency_value, sip_frequency_unit - FROM engine_state_paper - WHERE user_id = %s AND run_id = %s{lock_clause} - LIMIT 1 - """, - (scope_user, scope_run), - ) - row = cur.fetchone() - if not row: - return _default_state(mode) - merged = _default_state(mode) - merged.update( - { - "initial_cash": float(row[0]) if row[0] is not None else merged["initial_cash"], - "cash": float(row[1]) if row[1] is not None else merged["cash"], - "total_invested": float(row[2]) if row[2] is not None else merged["total_invested"], - "nifty_units": float(row[3]) if row[3] is not None else merged["nifty_units"], - "gold_units": float(row[4]) if row[4] is not None else merged["gold_units"], - "last_sip_ts": _format_local_ts(row[5]), - "last_run": _format_local_ts(row[6]), - } - ) - if row[7] is not None or row[8] is not None: - merged["sip_frequency"] = {"value": row[7], "unit": row[8]} - return merged - - if cur is None: - with db_connection() as conn: - with conn.cursor() as cur: - return load_state( - mode=mode, - cur=cur, - for_update=for_update, - user_id=scope_user, - run_id=scope_run, - ) - lock_clause = " FOR UPDATE" if for_update else "" - cur.execute( - f""" - SELECT total_invested, nifty_units, gold_units, last_sip_ts, last_run - FROM engine_state - WHERE user_id = %s AND run_id = %s{lock_clause} - LIMIT 1 - """, - (scope_user, scope_run), - ) - row = cur.fetchone() - if not row: - return _default_state(mode) - merged = _default_state(mode) - merged.update( - { - "total_invested": float(row[0]) if row[0] is not None else merged["total_invested"], - "nifty_units": float(row[1]) if row[1] is not None else merged["nifty_units"], - "gold_units": float(row[2]) if row[2] is not None else merged["gold_units"], - "last_sip_ts": _format_local_ts(row[3]), - "last_run": _format_local_ts(row[4]), - } - ) - return merged - -def init_paper_state( - initial_cash: float, - sip_frequency: dict | None = None, - *, - cur=None, - user_id: str | None = None, - run_id: str | None = None, -): - state = DEFAULT_PAPER_STATE.copy() - state.update( - { - "initial_cash": float(initial_cash), - "cash": float(initial_cash), - "total_invested": 0.0, - "nifty_units": 0.0, - "gold_units": 0.0, - "last_sip_ts": None, - "last_run": None, - "sip_frequency": sip_frequency or state.get("sip_frequency"), - } - ) - save_state(state, mode="PAPER", cur=cur, emit_event=True, user_id=user_id, run_id=run_id) - return state - -def save_state( - state, - mode: str | None = "LIVE", - *, - cur=None, - emit_event: bool = False, - event_meta: dict | None = None, - user_id: str | None = None, - run_id: str | None = None, -): - scope_user, scope_run = _resolve_scope(user_id, run_id) - key = _state_key(mode) - last_sip_ts = _parse_ts(state.get("last_sip_ts")) - last_run = _parse_ts(state.get("last_run")) - if key == "PAPER": - sip_frequency = state.get("sip_frequency") - sip_value = None - sip_unit = None - if isinstance(sip_frequency, dict): - sip_value = sip_frequency.get("value") - sip_unit = sip_frequency.get("unit") - def _save(cur): - cur.execute( - """ - INSERT INTO engine_state_paper ( - user_id, run_id, initial_cash, cash, total_invested, nifty_units, gold_units, - last_sip_ts, last_run, sip_frequency_value, sip_frequency_unit - ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (user_id, run_id) DO UPDATE - SET initial_cash = EXCLUDED.initial_cash, - cash = EXCLUDED.cash, - total_invested = EXCLUDED.total_invested, - nifty_units = EXCLUDED.nifty_units, - gold_units = EXCLUDED.gold_units, - last_sip_ts = EXCLUDED.last_sip_ts, - last_run = EXCLUDED.last_run, - sip_frequency_value = EXCLUDED.sip_frequency_value, - sip_frequency_unit = EXCLUDED.sip_frequency_unit - """, - ( - scope_user, - scope_run, - float(state.get("initial_cash", 0.0)), - float(state.get("cash", 0.0)), - float(state.get("total_invested", 0.0)), - float(state.get("nifty_units", 0.0)), - float(state.get("gold_units", 0.0)), - last_sip_ts, - last_run, - sip_value, - sip_unit, - ), - ) - if emit_event: - insert_engine_event( - cur, - "STATE_UPDATED", - data={ - "mode": "PAPER", - "cash": state.get("cash"), - "total_invested": state.get("total_invested"), - "nifty_units": state.get("nifty_units"), - "gold_units": state.get("gold_units"), - "last_sip_ts": state.get("last_sip_ts"), - "last_run": state.get("last_run"), - }, - meta=event_meta, - ts=datetime.utcnow().replace(tzinfo=timezone.utc), - ) - - if cur is not None: - _save(cur) - return - - def _op(cur, _conn): - _save(cur) - - return run_with_retry(_op) - - def _save(cur): - cur.execute( - """ - INSERT INTO engine_state ( - user_id, run_id, total_invested, nifty_units, gold_units, last_sip_ts, last_run - ) - VALUES (%s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (user_id, run_id) DO UPDATE - SET total_invested = EXCLUDED.total_invested, - nifty_units = EXCLUDED.nifty_units, - gold_units = EXCLUDED.gold_units, - last_sip_ts = EXCLUDED.last_sip_ts, - last_run = EXCLUDED.last_run - """, - ( - scope_user, - scope_run, - float(state.get("total_invested", 0.0)), - float(state.get("nifty_units", 0.0)), - float(state.get("gold_units", 0.0)), - last_sip_ts, - last_run, - ), - ) - if emit_event: - insert_engine_event( - cur, - "STATE_UPDATED", - data={ - "mode": "LIVE", - "total_invested": state.get("total_invested"), - "nifty_units": state.get("nifty_units"), - "gold_units": state.get("gold_units"), - "last_sip_ts": state.get("last_sip_ts"), - "last_run": state.get("last_run"), - }, - meta=event_meta, - ts=datetime.utcnow().replace(tzinfo=timezone.utc), - ) - - if cur is not None: - _save(cur) - return - - def _op(cur, _conn): - _save(cur) - - return run_with_retry(_op) - diff --git a/indian_paper_trading_strategy_1/engine/strategy.py b/indian_paper_trading_strategy_1/engine/strategy.py deleted file mode 100644 index 504034e..0000000 --- a/indian_paper_trading_strategy_1/engine/strategy.py +++ /dev/null @@ -1,12 +0,0 @@ -# engine/strategy.py -import numpy as np - -def allocation(sp_price, gd_price, sp_sma, gd_sma, - base=0.6, tilt_mult=1.5, - max_tilt=0.25, min_eq=0.2, max_eq=0.9): - - rd = (sp_price / sp_sma) - (gd_price / gd_sma) - tilt = np.clip(-rd * tilt_mult, -max_tilt, max_tilt) - - eq_w = np.clip(base * (1 + tilt), min_eq, max_eq) - return eq_w, 1 - eq_w diff --git a/indian_paper_trading_strategy_1/engine/time_utils.py b/indian_paper_trading_strategy_1/engine/time_utils.py deleted file mode 100644 index 3315338..0000000 --- a/indian_paper_trading_strategy_1/engine/time_utils.py +++ /dev/null @@ -1,41 +0,0 @@ -from datetime import datetime, timedelta - - -def frequency_to_timedelta(freq: dict) -> timedelta: - value = int(freq.get("value", 0)) - unit = freq.get("unit") - - if value <= 0: - raise ValueError("Frequency value must be > 0") - - if unit == "minutes": - return timedelta(minutes=value) - if unit == "days": - return timedelta(days=value) - raise ValueError(f"Unsupported frequency unit: {unit}") - - -def normalize_logical_time(ts: datetime) -> datetime: - return ts.replace(microsecond=0) - - -def compute_logical_time( - now: datetime, - last_run: str | None, - interval_seconds: float | None, -) -> datetime: - base = now - if last_run and interval_seconds: - try: - parsed = datetime.fromisoformat(last_run.replace("Z", "+00:00")) - except ValueError: - parsed = None - if parsed is not None: - if now.tzinfo and parsed.tzinfo is None: - parsed = parsed.replace(tzinfo=now.tzinfo) - elif now.tzinfo is None and parsed.tzinfo: - parsed = parsed.replace(tzinfo=None) - candidate = parsed + timedelta(seconds=interval_seconds) - if now >= candidate: - base = candidate - return normalize_logical_time(base)