Add Alpha Shield strategy dispatch (JuniorBees + 60-month SMA allocation)

Introduces STRATEGY_REGISTRY, alpha_shield_allocation(), and compute_weights()
in strategy.py. Updates runner.py to dynamically load equity symbol, gold
symbol, and SMA window from the registry based on strategy_name, enabling
Alpha Shield (JUNIORBEES.NS + GOLDBEES.NS, 60M SMA) alongside Golden Nifty.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thigazhezhilan J 2026-05-03 02:41:59 +05:30
parent 6027dd3c6f
commit 3580e123e4
2 changed files with 336 additions and 283 deletions

View File

@ -23,10 +23,10 @@ from indian_paper_trading_strategy.engine.mtm import log_mtm, should_log_mtm
from indian_paper_trading_strategy.engine.state import load_state
from indian_paper_trading_strategy.engine.data import fetch_live_price
from indian_paper_trading_strategy.engine.history import load_monthly_close
from indian_paper_trading_strategy.engine.strategy import allocation
from indian_paper_trading_strategy.engine.strategy import compute_weights, get_strategy_config
from indian_paper_trading_strategy.engine.time_utils import normalize_logical_time
from app.services.zerodha_service import KiteTokenError
from indian_paper_trading_strategy.engine.db import (
acquire_run_lease,
db_transaction,
@ -37,42 +37,42 @@ from indian_paper_trading_strategy.engine.db import (
get_context,
set_context,
)
def _update_engine_status(user_id: str, run_id: str, status: str):
now = datetime.utcnow().replace(tzinfo=timezone.utc)
def _op(cur, _conn):
cur.execute(
"""
INSERT INTO engine_status (user_id, run_id, status, last_updated)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id, run_id) DO UPDATE
SET status = EXCLUDED.status,
last_updated = EXCLUDED.last_updated
""",
(user_id, run_id, status, now),
)
run_with_retry(_op)
NIFTY = "NIFTYBEES.NS"
GOLD = "GOLDBEES.NS"
SMA_MONTHS = 36
_DEFAULT_ENGINE_STATE = {
"state": "STOPPED",
"run_id": None,
"user_id": None,
"last_heartbeat_ts": None,
}
def _update_engine_status(user_id: str, run_id: str, status: str):
now = datetime.utcnow().replace(tzinfo=timezone.utc)
def _op(cur, _conn):
cur.execute(
"""
INSERT INTO engine_status (user_id, run_id, status, last_updated)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id, run_id) DO UPDATE
SET status = EXCLUDED.status,
last_updated = EXCLUDED.last_updated
""",
(user_id, run_id, status, now),
)
run_with_retry(_op)
NIFTY = "NIFTYBEES.NS"
GOLD = "GOLDBEES.NS"
SMA_MONTHS = 36 # default for golden_nifty; overridden per strategy
_DEFAULT_ENGINE_STATE = {
"state": "STOPPED",
"run_id": None,
"user_id": None,
"last_heartbeat_ts": None,
}
_ENGINE_STATES = {}
_ENGINE_STATES_LOCK = threading.Lock()
_RUNNERS = {}
_RUNNERS_LOCK = threading.Lock()
engine_state = _ENGINE_STATES
RUNNER_OWNER_ID = os.getenv("RUNNER_OWNER_ID") or f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex}"
@ -85,8 +85,8 @@ class RunLeaseNotAcquiredError(RuntimeError):
self.run_id = run_id
self.owner_id = owner_id
self.details = details or {}
def _state_key(user_id: str, run_id: str):
return (user_id, run_id)
@ -118,35 +118,35 @@ def _set_state(user_id: str, run_id: str, **updates):
def get_engine_state(user_id: str, run_id: str):
state = _get_state(user_id, run_id)
return dict(state)
def log_event(
event: str,
data: dict | None = None,
message: str | None = None,
meta: dict | None = None,
):
entry = {
"ts": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
"event": event,
}
if message is not None or meta is not None:
entry["message"] = message or ""
entry["meta"] = meta or {}
else:
entry["data"] = data or {}
event_ts = datetime.fromisoformat(entry["ts"].replace("Z", "+00:00"))
data = entry.get("data") if "data" in entry else None
meta = entry.get("meta") if "meta" in entry else None
def _op(cur, _conn):
insert_engine_event(
cur,
entry.get("event"),
data=data,
message=entry.get("message"),
meta=meta,
ts=event_ts,
)
event: str,
data: dict | None = None,
message: str | None = None,
meta: dict | None = None,
):
entry = {
"ts": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
"event": event,
}
if message is not None or meta is not None:
entry["message"] = message or ""
entry["meta"] = meta or {}
else:
entry["data"] = data or {}
event_ts = datetime.fromisoformat(entry["ts"].replace("Z", "+00:00"))
data = entry.get("data") if "data" in entry else None
meta = entry.get("meta") if "meta" in entry else None
def _op(cur, _conn):
insert_engine_event(
cur,
entry.get("event"),
data=data,
message=entry.get("message"),
meta=meta,
ts=event_ts,
)
run_with_retry(_op)
@ -228,7 +228,7 @@ def _clear_runner(user_id: str, run_id: str):
key = _state_key(user_id, run_id)
with _RUNNERS_LOCK:
_RUNNERS.pop(key, None)
def can_execute(now: datetime) -> tuple[bool, str]:
session = market_session(now)
status = str(session.get("status") or "CLOSED").upper()
@ -331,30 +331,34 @@ def _engine_loop(config, stop_event: threading.Event):
owner_id = config.get("runner_owner_id") or RUNNER_OWNER_ID
scope_user, scope_run = get_context(user_id, run_id)
set_context(scope_user, scope_run)
strategy_name = config.get("strategy_name") or config.get("strategy") or "golden_nifty"
sip_amount = config["sip_amount"]
configured_frequency = config.get("sip_frequency") or {}
if not isinstance(configured_frequency, dict):
configured_frequency = {}
frequency_value = int(configured_frequency.get("value", config.get("frequency", 0)))
frequency_unit = configured_frequency.get("unit", config.get("unit", "days"))
frequency_label = f"{frequency_value} {frequency_unit}"
emit_event_cb = config.get("emit_event")
if not callable(emit_event_cb):
emit_event_cb = None
debug_enabled = os.getenv("ENGINE_DEBUG", "1").strip().lower() not in {"0", "false", "no"}
def debug_event(event: str, message: str, meta: dict | None = None):
if not debug_enabled:
return
try:
log_event(event=event, message=message, meta=meta or {})
except Exception:
pass
if emit_event_cb:
emit_event_cb(event=event, message=message, meta=meta or {})
print(f"[ENGINE] {event} {message} {meta or {}}", flush=True)
strategy_name = config.get("strategy_name") or config.get("strategy") or "golden_nifty"
_strat_cfg = get_strategy_config(strategy_name)
_EQUITY_SYM = _strat_cfg["equity_symbol"]
_GOLD_SYM = _strat_cfg["gold_symbol"]
_SMA_MONTHS = _strat_cfg["sma_months"]
sip_amount = config["sip_amount"]
configured_frequency = config.get("sip_frequency") or {}
if not isinstance(configured_frequency, dict):
configured_frequency = {}
frequency_value = int(configured_frequency.get("value", config.get("frequency", 0)))
frequency_unit = configured_frequency.get("unit", config.get("unit", "days"))
frequency_label = f"{frequency_value} {frequency_unit}"
emit_event_cb = config.get("emit_event")
if not callable(emit_event_cb):
emit_event_cb = None
debug_enabled = os.getenv("ENGINE_DEBUG", "1").strip().lower() not in {"0", "false", "no"}
def debug_event(event: str, message: str, meta: dict | None = None):
if not debug_enabled:
return
try:
log_event(event=event, message=message, meta=meta or {})
except Exception:
pass
if emit_event_cb:
emit_event_cb(event=event, message=message, meta=meta or {})
print(f"[ENGINE] {event} {message} {meta or {}}", flush=True)
mode = (config.get("mode") or "LIVE").strip().upper()
if mode not in {"PAPER", "LIVE"}:
mode = "LIVE"
@ -389,14 +393,14 @@ def _engine_loop(config, stop_event: threading.Event):
else:
raise ValueError(f"Unsupported broker: {broker_type}")
market_data_provider = "yfinance"
log_event("ENGINE_START", {
"strategy": strategy_name,
"sip_amount": sip_amount,
"frequency": frequency_label,
})
debug_event("ENGINE_START_DEBUG", "engine loop started", {"run_id": scope_run, "user_id": scope_user})
log_event("ENGINE_START", {
"strategy": strategy_name,
"sip_amount": sip_amount,
"frequency": frequency_label,
})
debug_event("ENGINE_START_DEBUG", "engine loop started", {"run_id": scope_run, "user_id": scope_user})
_set_state(
scope_user,
scope_run,
@ -413,30 +417,30 @@ def _engine_loop(config, stop_event: threading.Event):
break
_set_state(scope_user, scope_run, last_heartbeat_ts=datetime.utcnow().isoformat() + "Z")
_update_engine_status(scope_user, scope_run, "RUNNING")
state = load_state(mode=mode)
debug_event(
"STATE_LOADED",
"loaded engine state",
{
"last_sip_ts": state.get("last_sip_ts"),
"last_run": state.get("last_run"),
"cash": state.get("cash"),
"total_invested": state.get("total_invested"),
},
)
state_frequency = state.get("sip_frequency")
if not isinstance(state_frequency, dict):
state_frequency = {"value": frequency_value, "unit": frequency_unit}
freq = int(state_frequency.get("value", frequency_value))
unit = state_frequency.get("unit", frequency_unit)
frequency_label = f"{freq} {unit}"
if unit == "minutes":
delta = timedelta(minutes=freq)
else:
delta = timedelta(days=freq)
# Gate 2: time to SIP
state = load_state(mode=mode)
debug_event(
"STATE_LOADED",
"loaded engine state",
{
"last_sip_ts": state.get("last_sip_ts"),
"last_run": state.get("last_run"),
"cash": state.get("cash"),
"total_invested": state.get("total_invested"),
},
)
state_frequency = state.get("sip_frequency")
if not isinstance(state_frequency, dict):
state_frequency = {"value": frequency_value, "unit": frequency_unit}
freq = int(state_frequency.get("value", frequency_value))
unit = state_frequency.get("unit", frequency_unit)
frequency_label = f"{freq} {unit}"
if unit == "minutes":
delta = timedelta(minutes=freq)
else:
delta = timedelta(days=freq)
# Gate 2: time to SIP
last_run = _last_execution_anchor(state, mode)
is_first_run = last_run is None
now = market_now()
@ -484,11 +488,11 @@ def _engine_loop(config, stop_event: threading.Event):
message="Waiting for next SIP window",
meta={
"last_run": last_run,
"next_eligible": next_run.isoformat(),
"now": now.isoformat(),
"frequency": frequency_label,
},
)
"next_eligible": next_run.isoformat(),
"now": now.isoformat(),
"frequency": frequency_label,
},
)
if emit_event_cb:
emit_event_cb(
event="SIP_WAITING",
@ -504,17 +508,17 @@ def _engine_loop(config, stop_event: threading.Event):
exit_reason = "LEASE_LOST"
break
continue
try:
debug_event("PRICE_FETCH_START", "fetching live prices", {"tickers": [NIFTY, GOLD]})
debug_event("PRICE_FETCH_START", "fetching live prices", {"tickers": [_EQUITY_SYM, _GOLD_SYM]})
nifty_price = fetch_live_price(
NIFTY,
_EQUITY_SYM,
provider=market_data_provider,
user_id=scope_user,
run_id=scope_run,
)
gold_price = fetch_live_price(
GOLD,
_GOLD_SYM,
provider=market_data_provider,
user_id=scope_user,
run_id=scope_run,
@ -522,7 +526,7 @@ def _engine_loop(config, stop_event: threading.Event):
debug_event(
"PRICE_FETCHED",
"fetched live prices",
{"nifty_price": float(nifty_price), "gold_price": float(gold_price)},
{"equity_price": float(nifty_price), "gold_price": float(gold_price)},
)
except KiteTokenError as exc:
_pause_for_auth_expiry(scope_user, scope_run, str(exc), emit_event_cb=emit_event_cb)
@ -536,13 +540,13 @@ def _engine_loop(config, stop_event: threading.Event):
try:
nifty_hist = load_monthly_close(
NIFTY,
_EQUITY_SYM,
provider=market_data_provider,
user_id=scope_user,
run_id=scope_run,
)
gold_hist = load_monthly_close(
GOLD,
_GOLD_SYM,
provider=market_data_provider,
user_id=scope_user,
run_id=scope_run,
@ -556,153 +560,152 @@ def _engine_loop(config, stop_event: threading.Event):
exit_reason = "LEASE_LOST"
break
continue
nifty_sma = nifty_hist.rolling(SMA_MONTHS).mean().iloc[-1]
gold_sma = gold_hist.rolling(SMA_MONTHS).mean().iloc[-1]
eq_w, gd_w = allocation(
sp_price=nifty_price,
gd_price=gold_price,
sp_sma=nifty_sma,
gd_sma=gold_sma
)
debug_event(
"WEIGHTS_COMPUTED",
"computed allocation weights",
{"equity_weight": float(eq_w), "gold_weight": float(gd_w)},
)
weights = {"equity": eq_w, "gold": gd_w}
allowed, reason = can_execute(now)
executed = False
if not allowed:
log_event(
event="EXECUTION_BLOCKED",
message="Execution blocked by market gate",
meta={
"reason": reason,
"eligible_since": last_run,
"checked_at": now.isoformat(),
},
)
debug_event("MARKET_GATE", "market closed", {"reason": reason})
if emit_event_cb:
emit_event_cb(
event="EXECUTION_BLOCKED",
message="Execution blocked by market gate",
meta={
"reason": reason,
"eligible_since": last_run,
"checked_at": now.isoformat(),
},
)
else:
log_event(
event="DEBUG_BEFORE_TRY_EXECUTE",
message="About to call try_execute_sip",
meta={
"last_run": last_run,
"frequency": frequency_label,
"allowed": allowed,
"reason": reason,
"sip_amount": sip_amount,
"broker": type(broker).__name__,
"now": now.isoformat(),
},
)
if emit_event_cb:
emit_event_cb(
event="DEBUG_BEFORE_TRY_EXECUTE",
message="About to call try_execute_sip",
meta={
"last_run": last_run,
"frequency": frequency_label,
"allowed": allowed,
"reason": reason,
"sip_amount": sip_amount,
"broker": type(broker).__name__,
"now": now.isoformat(),
},
)
debug_event(
"TRY_EXECUTE_START",
"calling try_execute_sip",
{"sip_interval_sec": delta.total_seconds(), "sip_amount": sip_amount},
)
state, executed = try_execute_sip(
now=now,
market_open=True,
sip_interval=delta.total_seconds(),
sip_amount=sip_amount,
sp_price=nifty_price,
gd_price=gold_price,
eq_w=eq_w,
gd_w=gd_w,
broker=broker,
mode=mode,
)
log_event(
event="DEBUG_AFTER_TRY_EXECUTE",
message="Returned from try_execute_sip",
meta={
"executed": executed,
"state_last_run": state.get("last_run"),
"state_last_sip_ts": state.get("last_sip_ts"),
},
)
if emit_event_cb:
emit_event_cb(
event="DEBUG_AFTER_TRY_EXECUTE",
message="Returned from try_execute_sip",
meta={
"executed": executed,
"state_last_run": state.get("last_run"),
"state_last_sip_ts": state.get("last_sip_ts"),
},
)
debug_event(
"TRY_EXECUTE_DONE",
"try_execute_sip finished",
{"executed": executed, "last_run": state.get("last_run")},
)
if executed:
log_event("SIP_TRIGGERED", {
"date": now.date().isoformat(),
"allocation": weights,
"cash_used": sip_amount
})
debug_event("SIP_TRIGGERED", "sip executed", {"cash_used": sip_amount})
portfolio_value = (
state["nifty_units"] * nifty_price
+ state["gold_units"] * gold_price
)
log_event("PORTFOLIO_UPDATED", {
"nifty_units": state["nifty_units"],
"gold_units": state["gold_units"],
"portfolio_value": portfolio_value
})
print("SIP executed at", now)
if should_log_mtm(None, now):
logical_time = normalize_logical_time(now)
with db_transaction() as cur:
log_mtm(
nifty_units=state["nifty_units"],
gold_units=state["gold_units"],
nifty_price=nifty_price,
gold_price=gold_price,
total_invested=state["total_invested"],
cur=cur,
logical_time=logical_time,
)
broker.update_equity(
{NIFTY: nifty_price, GOLD: gold_price},
now,
cur=cur,
logical_time=logical_time,
)
eq_w, gd_w = compute_weights(
strategy_name=strategy_name,
equity_price=nifty_price,
gold_price=gold_price,
equity_hist=nifty_hist,
gold_hist=gold_hist,
sma_months=_SMA_MONTHS,
)
debug_event(
"WEIGHTS_COMPUTED",
"computed allocation weights",
{"equity_weight": float(eq_w), "gold_weight": float(gd_w), "strategy": strategy_name},
)
weights = {"equity": eq_w, "gold": gd_w}
allowed, reason = can_execute(now)
executed = False
if not allowed:
log_event(
event="EXECUTION_BLOCKED",
message="Execution blocked by market gate",
meta={
"reason": reason,
"eligible_since": last_run,
"checked_at": now.isoformat(),
},
)
debug_event("MARKET_GATE", "market closed", {"reason": reason})
if emit_event_cb:
emit_event_cb(
event="EXECUTION_BLOCKED",
message="Execution blocked by market gate",
meta={
"reason": reason,
"eligible_since": last_run,
"checked_at": now.isoformat(),
},
)
else:
log_event(
event="DEBUG_BEFORE_TRY_EXECUTE",
message="About to call try_execute_sip",
meta={
"last_run": last_run,
"frequency": frequency_label,
"allowed": allowed,
"reason": reason,
"sip_amount": sip_amount,
"broker": type(broker).__name__,
"now": now.isoformat(),
},
)
if emit_event_cb:
emit_event_cb(
event="DEBUG_BEFORE_TRY_EXECUTE",
message="About to call try_execute_sip",
meta={
"last_run": last_run,
"frequency": frequency_label,
"allowed": allowed,
"reason": reason,
"sip_amount": sip_amount,
"broker": type(broker).__name__,
"now": now.isoformat(),
},
)
debug_event(
"TRY_EXECUTE_START",
"calling try_execute_sip",
{"sip_interval_sec": delta.total_seconds(), "sip_amount": sip_amount},
)
state, executed = try_execute_sip(
now=now,
market_open=True,
sip_interval=delta.total_seconds(),
sip_amount=sip_amount,
sp_price=nifty_price,
gd_price=gold_price,
eq_w=eq_w,
gd_w=gd_w,
broker=broker,
mode=mode,
)
log_event(
event="DEBUG_AFTER_TRY_EXECUTE",
message="Returned from try_execute_sip",
meta={
"executed": executed,
"state_last_run": state.get("last_run"),
"state_last_sip_ts": state.get("last_sip_ts"),
},
)
if emit_event_cb:
emit_event_cb(
event="DEBUG_AFTER_TRY_EXECUTE",
message="Returned from try_execute_sip",
meta={
"executed": executed,
"state_last_run": state.get("last_run"),
"state_last_sip_ts": state.get("last_sip_ts"),
},
)
debug_event(
"TRY_EXECUTE_DONE",
"try_execute_sip finished",
{"executed": executed, "last_run": state.get("last_run")},
)
if executed:
log_event("SIP_TRIGGERED", {
"date": now.date().isoformat(),
"allocation": weights,
"cash_used": sip_amount
})
debug_event("SIP_TRIGGERED", "sip executed", {"cash_used": sip_amount})
portfolio_value = (
state["nifty_units"] * nifty_price
+ state["gold_units"] * gold_price
)
log_event("PORTFOLIO_UPDATED", {
"nifty_units": state["nifty_units"],
"gold_units": state["gold_units"],
"portfolio_value": portfolio_value
})
print("SIP executed at", now)
if should_log_mtm(None, now):
logical_time = normalize_logical_time(now)
with db_transaction() as cur:
log_mtm(
nifty_units=state["nifty_units"],
gold_units=state["gold_units"],
nifty_price=nifty_price,
gold_price=gold_price,
total_invested=state["total_invested"],
cur=cur,
logical_time=logical_time,
)
broker.update_equity(
{_EQUITY_SYM: nifty_price, _GOLD_SYM: gold_price},
now,
cur=cur,
logical_time=logical_time,
)
if not sleep_with_heartbeat(30, stop_event, scope_user, scope_run, owner_id):
exit_reason = "LEASE_LOST"
break
@ -746,7 +749,7 @@ def _engine_loop(config, stop_event: threading.Event):
last_heartbeat_ts=datetime.utcnow().isoformat() + "Z",
)
_clear_runner(scope_user, scope_run)
def start_engine(config):
user_id = config.get("user_id")
run_id = config.get("run_id")
@ -835,4 +838,4 @@ def stop_engine(user_id: str, run_id: str | None = None, timeout: float | None =
else:
stopped_all = False
return stopped_all

View File

@ -1,12 +1,62 @@
# engine/strategy.py
import numpy as np
def allocation(sp_price, gd_price, sp_sma, gd_sma,
base=0.6, tilt_mult=1.5,
max_tilt=0.25, min_eq=0.2, max_eq=0.9):
"""Golden Nifty: SMA-momentum tilt between NiftyBees and GoldBees."""
rd = (sp_price / sp_sma) - (gd_price / gd_sma)
tilt = np.clip(-rd * tilt_mult, -max_tilt, max_tilt)
eq_w = np.clip(base * (1 + tilt), min_eq, max_eq)
return eq_w, 1 - eq_w
def alpha_shield_allocation(midcap_price, midcap_sma60):
"""
Alpha Shield: Dynamic 70/30 Midcap+Gold based on 60-month SMA valuation.
When midcap is expensive (price >> 5yr SMA) reduce midcap, increase gold.
When midcap is cheap (price << 5yr SMA) increase midcap aggressively.
Formula: midcap% = clip(70% - (price/sma60 - 1) × 60%, 40%, 92%)
Backtested XIRR: ~16.9% p.a. over 12+ years (vs 15.6% static 70/30).
"""
ratio = midcap_price / midcap_sma60
eq_w = float(np.clip(0.70 - (ratio - 1.0) * 0.60, 0.40, 0.92))
return eq_w, 1 - eq_w
# Strategy registry: maps strategy_name → engine configuration
STRATEGY_REGISTRY = {
"golden_nifty": {
"equity_symbol": "NIFTYBEES.NS",
"gold_symbol": "GOLDBEES.NS",
"sma_months": 36,
"allocation_fn": "golden_nifty",
},
"alpha_shield": {
"equity_symbol": "JUNIORBEES.NS",
"gold_symbol": "GOLDBEES.NS",
"sma_months": 60,
"allocation_fn": "alpha_shield",
},
}
DEFAULT_STRATEGY = "golden_nifty"
def get_strategy_config(strategy_name: str) -> dict:
return STRATEGY_REGISTRY.get(strategy_name) or STRATEGY_REGISTRY[DEFAULT_STRATEGY]
def compute_weights(strategy_name: str, equity_price: float, gold_price: float,
equity_hist, gold_hist, sma_months: int):
"""Dispatch allocation to the correct strategy function."""
if strategy_name == "alpha_shield":
sma60 = equity_hist.rolling(sma_months).mean().iloc[-1]
return alpha_shield_allocation(equity_price, sma60)
# default: golden_nifty
eq_sma = equity_hist.rolling(sma_months).mean().iloc[-1]
gd_sma = gold_hist.rolling(sma_months).mean().iloc[-1]
return allocation(equity_price, gold_price, eq_sma, gd_sma)