commit e21fcd58aa86f353011090c2f707001b9db38832 Author: thigazhezhilan Date: Sun Feb 1 14:14:57 2026 +0000 Initial database files diff --git a/db_migrations/000_schema_migrations.sql b/db_migrations/000_schema_migrations.sql new file mode 100644 index 0000000..f21e4b8 --- /dev/null +++ b/db_migrations/000_schema_migrations.sql @@ -0,0 +1,8 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS schema_migrations ( + version TEXT PRIMARY KEY, + applied_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMIT; diff --git a/db_migrations/20260110_hardening.sql b/db_migrations/20260110_hardening.sql new file mode 100644 index 0000000..ccdd5f8 --- /dev/null +++ b/db_migrations/20260110_hardening.sql @@ -0,0 +1,115 @@ +BEGIN; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_order_qty_positive' + ) THEN + ALTER TABLE paper_order + ADD CONSTRAINT chk_paper_order_qty_positive CHECK (qty > 0) NOT VALID; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_order_price_non_negative' + ) THEN + ALTER TABLE paper_order + ADD CONSTRAINT chk_paper_order_price_non_negative CHECK (price >= 0) NOT VALID; + END IF; +END $$; + +ALTER TABLE paper_order + ALTER COLUMN timestamp SET NOT NULL; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_trade_qty_positive' + ) THEN + ALTER TABLE paper_trade + ADD CONSTRAINT chk_paper_trade_qty_positive CHECK (qty > 0) NOT VALID; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_trade_price_non_negative' + ) THEN + ALTER TABLE paper_trade + ADD CONSTRAINT chk_paper_trade_price_non_negative CHECK (price >= 0) NOT VALID; + END IF; +END $$; + +ALTER TABLE paper_trade + ALTER COLUMN timestamp SET NOT NULL; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_position_qty_positive' + ) THEN + ALTER TABLE paper_position + ADD CONSTRAINT chk_paper_position_qty_positive CHECK (qty > 0) NOT VALID; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_broker_cash_non_negative' + ) THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT chk_paper_broker_cash_non_negative CHECK (cash >= 0) NOT VALID; + END IF; +END $$; + +ALTER TABLE paper_equity_curve + ALTER COLUMN timestamp SET NOT NULL; + +ALTER TABLE mtm_ledger + ALTER COLUMN timestamp SET NOT NULL; + +ALTER TABLE event_ledger + ALTER COLUMN timestamp SET NOT NULL; + +ALTER TABLE engine_event + ALTER COLUMN ts SET NOT NULL; + +ALTER TABLE strategy_log + ALTER COLUMN ts SET NOT NULL; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'chk_engine_state_paper_cash_non_negative' + ) THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT chk_engine_state_paper_cash_non_negative CHECK (cash >= 0) NOT VALID; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_order' + ) THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_order + FOREIGN KEY (order_id) REFERENCES paper_order(id) + ON DELETE SET NULL; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS idx_paper_order_ts ON paper_order(timestamp); +CREATE INDEX IF NOT EXISTS idx_paper_trade_ts ON paper_trade(timestamp); +CREATE INDEX IF NOT EXISTS idx_paper_equity_curve_ts ON paper_equity_curve(timestamp); +CREATE INDEX IF NOT EXISTS idx_mtm_ledger_ts ON mtm_ledger(timestamp); +CREATE INDEX IF NOT EXISTS idx_event_ledger_ts ON event_ledger(timestamp); +CREATE INDEX IF NOT EXISTS idx_strategy_log_ts ON strategy_log(ts); +CREATE INDEX IF NOT EXISTS idx_engine_event_ts ON engine_event(ts); +CREATE INDEX IF NOT EXISTS idx_app_session_user_expires ON app_session(user_id, expires_at); + +COMMIT; diff --git a/db_migrations/20260111_multiuser_multirun.sql b/db_migrations/20260111_multiuser_multirun.sql new file mode 100644 index 0000000..716b086 --- /dev/null +++ b/db_migrations/20260111_multiuser_multirun.sql @@ -0,0 +1,532 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS strategy_run ( + run_id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL, + started_at TIMESTAMPTZ, + stopped_at TIMESTAMPTZ, + status TEXT NOT NULL, + strategy TEXT, + mode TEXT, + broker TEXT, + meta JSONB +); + +CREATE INDEX IF NOT EXISTS idx_strategy_run_user_created ON strategy_run(user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_strategy_run_user_status ON strategy_run(user_id, status); + +ALTER TABLE strategy_log ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE strategy_log ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_status ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_status ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_event ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_event ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_broker_account ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_broker_account ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_position ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_position ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_order ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_order ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_trade ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_trade ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_equity_curve ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_equity_curve ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_state_paper ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_state_paper ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_state ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_state ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE mtm_ledger ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE mtm_ledger ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE event_ledger ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE event_ledger ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE strategy_config ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE strategy_config ADD COLUMN IF NOT EXISTS run_id TEXT; + +DO $$ +DECLARE + v_user_id TEXT; +BEGIN + SELECT id INTO v_user_id FROM app_user ORDER BY username LIMIT 1; + IF v_user_id IS NULL THEN + RAISE EXCEPTION 'No users available for backfill'; + END IF; + + INSERT INTO strategy_run ( + run_id, user_id, created_at, started_at, stopped_at, status, strategy, mode, broker, meta + ) + VALUES ( + 'default_run', + v_user_id, + now(), + NULL, + NULL, + 'STOPPED', + NULL, + NULL, + NULL, + '{}'::jsonb + ) + ON CONFLICT (run_id) DO NOTHING; + + UPDATE strategy_run + SET user_id = v_user_id + WHERE run_id = 'default_run' AND user_id IS NULL; + + UPDATE strategy_log SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_status SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_event SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_broker_account SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_position SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_order SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_trade SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_equity_curve SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_state_paper SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_state SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE mtm_ledger SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE event_ledger SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE strategy_config SET user_id = v_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; +END $$; + +ALTER TABLE strategy_log ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE strategy_log ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_status ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_status ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_event ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_event ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_broker_account ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_broker_account ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_position ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_position ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_order ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_order ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_trade ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_trade ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_equity_curve ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_equity_curve ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_state_paper ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_state_paper ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_state ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_state ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE mtm_ledger ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE mtm_ledger ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE event_ledger ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE event_ledger ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE strategy_config ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE strategy_config ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_trade DROP CONSTRAINT IF EXISTS paper_trade_order_id_fkey; +ALTER TABLE paper_trade DROP CONSTRAINT IF EXISTS fk_paper_trade_order; + +DO $$ +DECLARE + rec record; +BEGIN + FOR rec IN + SELECT conrelid::regclass AS tbl, conname + FROM pg_constraint + WHERE contype = 'p' + AND conrelid::regclass::text IN ( + 'paper_broker_account', + 'paper_position', + 'paper_equity_curve', + 'mtm_ledger', + 'engine_state_paper', + 'engine_state', + 'engine_status', + 'strategy_config' + ) + LOOP + EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %I', rec.tbl, rec.conname); + END LOOP; +END $$; + +ALTER TABLE paper_broker_account ADD CONSTRAINT paper_broker_account_pkey PRIMARY KEY (user_id, run_id); +ALTER TABLE paper_position ADD CONSTRAINT paper_position_pkey PRIMARY KEY (user_id, run_id, symbol); +ALTER TABLE paper_equity_curve ADD CONSTRAINT paper_equity_curve_pkey PRIMARY KEY (user_id, run_id, timestamp); +ALTER TABLE mtm_ledger ADD CONSTRAINT mtm_ledger_pkey PRIMARY KEY (user_id, run_id, timestamp); +ALTER TABLE engine_state_paper ADD CONSTRAINT engine_state_paper_pkey PRIMARY KEY (user_id, run_id); +ALTER TABLE engine_state ADD CONSTRAINT engine_state_pkey PRIMARY KEY (user_id, run_id); +ALTER TABLE engine_status ADD CONSTRAINT engine_status_pkey PRIMARY KEY (user_id, run_id); +ALTER TABLE strategy_config ADD CONSTRAINT strategy_config_pkey PRIMARY KEY (user_id, run_id); + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_order_scope_id' + ) THEN + ALTER TABLE paper_order ADD CONSTRAINT uq_paper_order_scope_id UNIQUE (user_id, run_id, id); + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_trade_scope_id' + ) THEN + ALTER TABLE paper_trade ADD CONSTRAINT uq_paper_trade_scope_id UNIQUE (user_id, run_id, id); + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_log_user' + ) THEN + ALTER TABLE strategy_log + ADD CONSTRAINT fk_strategy_log_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_log_run' + ) THEN + ALTER TABLE strategy_log + ADD CONSTRAINT fk_strategy_log_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_status_user' + ) THEN + ALTER TABLE engine_status + ADD CONSTRAINT fk_engine_status_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_status_run' + ) THEN + ALTER TABLE engine_status + ADD CONSTRAINT fk_engine_status_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_event_user' + ) THEN + ALTER TABLE engine_event + ADD CONSTRAINT fk_engine_event_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_event_run' + ) THEN + ALTER TABLE engine_event + ADD CONSTRAINT fk_engine_event_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_broker_user' + ) THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT fk_paper_broker_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_broker_run' + ) THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT fk_paper_broker_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_position_user' + ) THEN + ALTER TABLE paper_position + ADD CONSTRAINT fk_paper_position_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_position_run' + ) THEN + ALTER TABLE paper_position + ADD CONSTRAINT fk_paper_position_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_order_user' + ) THEN + ALTER TABLE paper_order + ADD CONSTRAINT fk_paper_order_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_order_run' + ) THEN + ALTER TABLE paper_order + ADD CONSTRAINT fk_paper_order_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_user' + ) THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_run' + ) THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_scope_order' + ) THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_scope_order + FOREIGN KEY (user_id, run_id, order_id) + REFERENCES paper_order(user_id, run_id, id) + ON DELETE SET NULL; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_equity_user' + ) THEN + ALTER TABLE paper_equity_curve + ADD CONSTRAINT fk_paper_equity_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_equity_run' + ) THEN + ALTER TABLE paper_equity_curve + ADD CONSTRAINT fk_paper_equity_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_paper_user' + ) THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT fk_engine_state_paper_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_paper_run' + ) THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT fk_engine_state_paper_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_user' + ) THEN + ALTER TABLE engine_state + ADD CONSTRAINT fk_engine_state_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_run' + ) THEN + ALTER TABLE engine_state + ADD CONSTRAINT fk_engine_state_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_mtm_ledger_user' + ) THEN + ALTER TABLE mtm_ledger + ADD CONSTRAINT fk_mtm_ledger_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_mtm_ledger_run' + ) THEN + ALTER TABLE mtm_ledger + ADD CONSTRAINT fk_mtm_ledger_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_event_ledger_user' + ) THEN + ALTER TABLE event_ledger + ADD CONSTRAINT fk_event_ledger_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_event_ledger_run' + ) THEN + ALTER TABLE event_ledger + ADD CONSTRAINT fk_event_ledger_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_config_user' + ) THEN + ALTER TABLE strategy_config + ADD CONSTRAINT fk_strategy_config_user + FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_config_run' + ) THEN + ALTER TABLE strategy_config + ADD CONSTRAINT fk_strategy_config_run + FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS idx_strategy_log_user_run_ts ON strategy_log(user_id, run_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_engine_event_user_run_ts ON engine_event(user_id, run_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_paper_order_user_run_ts ON paper_order(user_id, run_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_paper_trade_user_run_ts ON paper_trade(user_id, run_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_paper_equity_user_run_ts ON paper_equity_curve(user_id, run_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_mtm_ledger_user_run_ts ON mtm_ledger(user_id, run_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_event_ledger_user_run_ts ON event_ledger(user_id, run_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_engine_state_paper_user_run ON engine_state_paper(user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_state_user_run ON engine_state(user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_status_user_run ON engine_status(user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_position_user_run ON paper_position(user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_broker_user_run ON paper_broker_account(user_id, run_id); + +COMMIT; diff --git a/db_migrations/20260112_multiuser_multirun.sql b/db_migrations/20260112_multiuser_multirun.sql new file mode 100644 index 0000000..9fd2a3c --- /dev/null +++ b/db_migrations/20260112_multiuser_multirun.sql @@ -0,0 +1,350 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS strategy_run ( + run_id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + started_at TIMESTAMPTZ, + stopped_at TIMESTAMPTZ, + status TEXT NOT NULL, + strategy TEXT, + mode TEXT, + broker TEXT, + meta JSONB +); + +ALTER TABLE strategy_log ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE strategy_log ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_status ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_status ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_event ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_event ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_broker_account ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_broker_account ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_position ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_position ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_order ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_order ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_trade ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_trade ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE paper_equity_curve ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE paper_equity_curve ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_state ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_state ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE engine_state_paper ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE engine_state_paper ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE mtm_ledger ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE mtm_ledger ADD COLUMN IF NOT EXISTS run_id TEXT; + +ALTER TABLE event_ledger ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE event_ledger ADD COLUMN IF NOT EXISTS run_id TEXT; + +-- Temporarily disable user triggers to allow backfill on STOPPED runs. +ALTER TABLE engine_status DISABLE TRIGGER USER; +ALTER TABLE engine_state DISABLE TRIGGER USER; +ALTER TABLE engine_state_paper DISABLE TRIGGER USER; +ALTER TABLE paper_broker_account DISABLE TRIGGER USER; +ALTER TABLE paper_order DISABLE TRIGGER USER; +ALTER TABLE paper_trade DISABLE TRIGGER USER; +ALTER TABLE mtm_ledger DISABLE TRIGGER USER; +ALTER TABLE event_ledger DISABLE TRIGGER USER; +ALTER TABLE paper_position DISABLE TRIGGER USER; +ALTER TABLE paper_equity_curve DISABLE TRIGGER USER; + +DO $$ +DECLARE + default_user_id TEXT; +BEGIN + SELECT id INTO default_user_id FROM app_user ORDER BY username LIMIT 1; + IF default_user_id IS NULL THEN + RAISE EXCEPTION 'No app_user rows exist for default_user_id'; + END IF; + + INSERT INTO strategy_run (run_id, user_id, status) + VALUES ('default_run', default_user_id, 'STOPPED') + ON CONFLICT (run_id) DO NOTHING; + + UPDATE strategy_log + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_status + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_event + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_broker_account + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_position + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_order + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_trade + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_equity_curve + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_state + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_state_paper + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE mtm_ledger + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE event_ledger + SET user_id = default_user_id, + run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; +END $$; + +ALTER TABLE strategy_log ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE strategy_log ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_status ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_status ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_event ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_event ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_broker_account ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_broker_account ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_position ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_position ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_order ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_order ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_trade ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_trade ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE paper_equity_curve ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE paper_equity_curve ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_state ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_state ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE engine_state_paper ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE engine_state_paper ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE mtm_ledger ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE mtm_ledger ALTER COLUMN run_id SET NOT NULL; + +ALTER TABLE event_ledger ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE event_ledger ALTER COLUMN run_id SET NOT NULL; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_log_user_id') THEN + ALTER TABLE strategy_log + ADD CONSTRAINT fk_strategy_log_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_log_run_id') THEN + ALTER TABLE strategy_log + ADD CONSTRAINT fk_strategy_log_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_status_user_id') THEN + ALTER TABLE engine_status + ADD CONSTRAINT fk_engine_status_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_status_run_id') THEN + ALTER TABLE engine_status + ADD CONSTRAINT fk_engine_status_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_event_user_id') THEN + ALTER TABLE engine_event + ADD CONSTRAINT fk_engine_event_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_event_run_id') THEN + ALTER TABLE engine_event + ADD CONSTRAINT fk_engine_event_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_broker_account_user_id') THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT fk_paper_broker_account_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_broker_account_run_id') THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT fk_paper_broker_account_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_position_user_id') THEN + ALTER TABLE paper_position + ADD CONSTRAINT fk_paper_position_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_position_run_id') THEN + ALTER TABLE paper_position + ADD CONSTRAINT fk_paper_position_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_order_user_id') THEN + ALTER TABLE paper_order + ADD CONSTRAINT fk_paper_order_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_order_run_id') THEN + ALTER TABLE paper_order + ADD CONSTRAINT fk_paper_order_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_user_id') THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_run_id') THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_equity_curve_user_id') THEN + ALTER TABLE paper_equity_curve + ADD CONSTRAINT fk_paper_equity_curve_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_equity_curve_run_id') THEN + ALTER TABLE paper_equity_curve + ADD CONSTRAINT fk_paper_equity_curve_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_user_id') THEN + ALTER TABLE engine_state + ADD CONSTRAINT fk_engine_state_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_run_id') THEN + ALTER TABLE engine_state + ADD CONSTRAINT fk_engine_state_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_paper_user_id') THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT fk_engine_state_paper_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_paper_run_id') THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT fk_engine_state_paper_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_mtm_ledger_user_id') THEN + ALTER TABLE mtm_ledger + ADD CONSTRAINT fk_mtm_ledger_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_mtm_ledger_run_id') THEN + ALTER TABLE mtm_ledger + ADD CONSTRAINT fk_mtm_ledger_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_event_ledger_user_id') THEN + ALTER TABLE event_ledger + ADD CONSTRAINT fk_event_ledger_user_id FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_event_ledger_run_id') THEN + ALTER TABLE event_ledger + ADD CONSTRAINT fk_event_ledger_run_id FOREIGN KEY (run_id) REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS idx_strategy_log_user_run ON strategy_log (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_status_user_run ON engine_status (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_event_user_run ON engine_event (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_broker_account_user_run ON paper_broker_account (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_position_user_run ON paper_position (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_order_user_run ON paper_order (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_trade_user_run ON paper_trade (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_paper_equity_curve_user_run ON paper_equity_curve (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_state_user_run ON engine_state (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_state_paper_user_run ON engine_state_paper (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_mtm_ledger_user_run ON mtm_ledger (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_event_ledger_user_run ON event_ledger (user_id, run_id); + +CREATE INDEX IF NOT EXISTS idx_strategy_log_user_run_ts ON strategy_log (user_id, run_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_engine_event_user_run_ts ON engine_event (user_id, run_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_paper_order_user_run_ts ON paper_order (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_paper_trade_user_run_ts ON paper_trade (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_paper_equity_curve_user_run_ts ON paper_equity_curve (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_mtm_ledger_user_run_ts ON mtm_ledger (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_event_ledger_user_run_ts ON event_ledger (user_id, run_id, "timestamp" DESC); + +ALTER TABLE engine_status ENABLE TRIGGER USER; +ALTER TABLE engine_state ENABLE TRIGGER USER; +ALTER TABLE engine_state_paper ENABLE TRIGGER USER; +ALTER TABLE paper_broker_account ENABLE TRIGGER USER; +ALTER TABLE paper_order ENABLE TRIGGER USER; +ALTER TABLE paper_trade ENABLE TRIGGER USER; +ALTER TABLE mtm_ledger ENABLE TRIGGER USER; +ALTER TABLE event_ledger ENABLE TRIGGER USER; +ALTER TABLE paper_position ENABLE TRIGGER USER; +ALTER TABLE paper_equity_curve ENABLE TRIGGER USER; + +COMMIT; diff --git a/db_migrations/20260112_multiuser_multirun_fix.sql b/db_migrations/20260112_multiuser_multirun_fix.sql new file mode 100644 index 0000000..f01cd5a --- /dev/null +++ b/db_migrations/20260112_multiuser_multirun_fix.sql @@ -0,0 +1,70 @@ +BEGIN; + +DO $$ +DECLARE + default_user_id TEXT; +BEGIN + SELECT id INTO default_user_id FROM app_user ORDER BY username LIMIT 1; + IF default_user_id IS NULL THEN + RAISE EXCEPTION 'No app_user rows exist for default_user_id'; + END IF; + + INSERT INTO strategy_run (run_id, user_id, status) + VALUES ('default_run', default_user_id, 'STOPPED') + ON CONFLICT (run_id) DO NOTHING; + + UPDATE strategy_log SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE engine_status SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE engine_event SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE paper_broker_account SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE paper_position SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE paper_order SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE paper_trade SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE paper_equity_curve SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE engine_state SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE engine_state_paper SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + + UPDATE mtm_ledger SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; + UPDATE event_ledger SET user_id = default_user_id, run_id = 'default_run' + WHERE user_id IS NULL OR run_id IS NULL; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'chk_strategy_run_status') THEN + ALTER TABLE strategy_run + ADD CONSTRAINT chk_strategy_run_status + CHECK (status IN ('RUNNING','STOPPED','ERROR')); + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_order_scope') THEN + ALTER TABLE paper_order + ADD CONSTRAINT uq_paper_order_scope UNIQUE (user_id, run_id, id); + END IF; + + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_trade_order_scope') THEN + ALTER TABLE paper_trade + ADD CONSTRAINT fk_paper_trade_order_scope + FOREIGN KEY (user_id, run_id, order_id) + REFERENCES paper_order (user_id, run_id, id) + ON DELETE CASCADE; + END IF; +END $$; + +COMMIT; diff --git a/db_migrations/20260112_scope_constraints.sql b/db_migrations/20260112_scope_constraints.sql new file mode 100644 index 0000000..b4b253d --- /dev/null +++ b/db_migrations/20260112_scope_constraints.sql @@ -0,0 +1,188 @@ +BEGIN; + +ALTER TABLE strategy_config ADD COLUMN IF NOT EXISTS user_id TEXT; +ALTER TABLE strategy_config ADD COLUMN IF NOT EXISTS run_id TEXT; + +INSERT INTO strategy_run (run_id, user_id, status) +SELECT 'default_' || id, id, 'STOPPED' +FROM app_user +ON CONFLICT (run_id) DO NOTHING; + +DO $$ +DECLARE + default_user_id TEXT; +BEGIN + SELECT id INTO default_user_id FROM app_user ORDER BY username LIMIT 1; + IF default_user_id IS NULL THEN + RAISE EXCEPTION 'No app_user rows exist for default_user_id'; + END IF; + + UPDATE strategy_config + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE strategy_log + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE engine_status + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE engine_event + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE paper_broker_account + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE paper_position + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE paper_order + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE paper_trade + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE paper_equity_curve + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE engine_state + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE engine_state_paper + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE mtm_ledger + SET user_id = default_user_id + WHERE user_id IS NULL; + UPDATE event_ledger + SET user_id = default_user_id + WHERE user_id IS NULL; + + UPDATE strategy_config + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE strategy_log + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE engine_status + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE engine_event + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE paper_broker_account + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE paper_position + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE paper_order + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE paper_trade + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE paper_equity_curve + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE engine_state + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE engine_state_paper + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE mtm_ledger + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; + UPDATE event_ledger + SET run_id = 'default_' || user_id + WHERE run_id IS NULL OR run_id = 'default_run'; +END $$; + +ALTER TABLE strategy_config ALTER COLUMN user_id SET NOT NULL; +ALTER TABLE strategy_config ALTER COLUMN run_id SET NOT NULL; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'strategy_config_pkey') THEN + ALTER TABLE strategy_config DROP CONSTRAINT strategy_config_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'strategy_config_user_run_pk') THEN + ALTER TABLE strategy_config ADD CONSTRAINT strategy_config_user_run_pk PRIMARY KEY (user_id, run_id); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'engine_status_pkey') THEN + ALTER TABLE engine_status DROP CONSTRAINT engine_status_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'engine_status_user_run_pk') THEN + ALTER TABLE engine_status ADD CONSTRAINT engine_status_user_run_pk PRIMARY KEY (user_id, run_id); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'engine_state_pkey') THEN + ALTER TABLE engine_state DROP CONSTRAINT engine_state_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'engine_state_user_run_pk') THEN + ALTER TABLE engine_state ADD CONSTRAINT engine_state_user_run_pk PRIMARY KEY (user_id, run_id); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'engine_state_paper_pkey') THEN + ALTER TABLE engine_state_paper DROP CONSTRAINT engine_state_paper_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'engine_state_paper_user_run_pk') THEN + ALTER TABLE engine_state_paper ADD CONSTRAINT engine_state_paper_user_run_pk PRIMARY KEY (user_id, run_id); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'paper_broker_account_pkey') THEN + ALTER TABLE paper_broker_account DROP CONSTRAINT paper_broker_account_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'paper_broker_account_user_run_pk') THEN + ALTER TABLE paper_broker_account ADD CONSTRAINT paper_broker_account_user_run_pk PRIMARY KEY (user_id, run_id); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'paper_position_pkey') THEN + ALTER TABLE paper_position DROP CONSTRAINT paper_position_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'paper_position_user_run_pk') THEN + ALTER TABLE paper_position ADD CONSTRAINT paper_position_user_run_pk PRIMARY KEY (user_id, run_id, symbol); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'paper_equity_curve_pkey') THEN + ALTER TABLE paper_equity_curve DROP CONSTRAINT paper_equity_curve_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'paper_equity_curve_user_run_pk') THEN + ALTER TABLE paper_equity_curve ADD CONSTRAINT paper_equity_curve_user_run_pk PRIMARY KEY (user_id, run_id, "timestamp"); + END IF; +END $$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'mtm_ledger_pkey') THEN + ALTER TABLE mtm_ledger DROP CONSTRAINT mtm_ledger_pkey; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'mtm_ledger_user_run_pk') THEN + ALTER TABLE mtm_ledger ADD CONSTRAINT mtm_ledger_user_run_pk PRIMARY KEY (user_id, run_id, "timestamp"); + END IF; +END $$; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_strategy_run_one_running_per_user + ON strategy_run (user_id) + WHERE status = 'RUNNING'; + +COMMIT; diff --git a/db_migrations/20260113_runtime_scope_fix.sql b/db_migrations/20260113_runtime_scope_fix.sql new file mode 100644 index 0000000..6e8880b --- /dev/null +++ b/db_migrations/20260113_runtime_scope_fix.sql @@ -0,0 +1,173 @@ +BEGIN; + +INSERT INTO strategy_run (run_id, user_id, status) +SELECT 'default_' || id, id, 'STOPPED' +FROM app_user +ON CONFLICT (run_id) DO NOTHING; + +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'engine_status'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE engine_status ALTER COLUMN id DROP DEFAULT; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'engine_state'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE engine_state ALTER COLUMN id DROP DEFAULT; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'engine_state_paper'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE engine_state_paper ALTER COLUMN id DROP DEFAULT; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'paper_broker_account'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE paper_broker_account ALTER COLUMN id DROP DEFAULT; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_engine_status_user_run') THEN + ALTER TABLE engine_status + ADD CONSTRAINT uq_engine_status_user_run UNIQUE (user_id, run_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_engine_state_user_run') THEN + ALTER TABLE engine_state + ADD CONSTRAINT uq_engine_state_user_run UNIQUE (user_id, run_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_engine_state_paper_user_run') THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT uq_engine_state_paper_user_run UNIQUE (user_id, run_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_broker_account_user_run') THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT uq_paper_broker_account_user_run UNIQUE (user_id, run_id); + END IF; +END $$; + +ALTER TABLE strategy_config ADD COLUMN IF NOT EXISTS user_id text; +ALTER TABLE strategy_config ADD COLUMN IF NOT EXISTS run_id text; + +ALTER TABLE strategy_config DROP CONSTRAINT IF EXISTS strategy_config_pkey; +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'strategy_config'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE strategy_config ALTER COLUMN id DROP DEFAULT; + END IF; +END $$; + +INSERT INTO strategy_config ( + id, strategy, sip_amount, sip_frequency_value, sip_frequency_unit, + mode, broker, active, frequency, frequency_days, unit, next_run, + user_id, run_id +) +SELECT + sc.id, sc.strategy, sc.sip_amount, sc.sip_frequency_value, sc.sip_frequency_unit, + sc.mode, sc.broker, sc.active, sc.frequency, sc.frequency_days, sc.unit, sc.next_run, + sr.user_id, sr.run_id +FROM strategy_config sc +CROSS JOIN strategy_run sr +WHERE sc.user_id IS NULL AND sc.run_id IS NULL + AND NOT EXISTS ( + SELECT 1 + FROM strategy_config sc2 + WHERE sc2.user_id = sr.user_id AND sc2.run_id = sr.run_id + ); + +DELETE FROM strategy_config +WHERE user_id IS NULL AND run_id IS NULL; + +ALTER TABLE strategy_config + ALTER COLUMN user_id SET NOT NULL, + ALTER COLUMN run_id SET NOT NULL; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_strategy_config_user_run') THEN + ALTER TABLE strategy_config + ADD CONSTRAINT uq_strategy_config_user_run UNIQUE (user_id, run_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_config_user') THEN + ALTER TABLE strategy_config + ADD CONSTRAINT fk_strategy_config_user FOREIGN KEY (user_id) + REFERENCES app_user(id) ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_strategy_config_run') THEN + ALTER TABLE strategy_config + ADD CONSTRAINT fk_strategy_config_run FOREIGN KEY (run_id) + REFERENCES strategy_run(run_id) ON DELETE CASCADE; + END IF; +END $$; + +UPDATE strategy_log + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; + +ALTER TABLE engine_status DISABLE TRIGGER USER; +ALTER TABLE engine_state DISABLE TRIGGER USER; +ALTER TABLE engine_state_paper DISABLE TRIGGER USER; +ALTER TABLE paper_broker_account DISABLE TRIGGER USER; +ALTER TABLE paper_order DISABLE TRIGGER USER; +ALTER TABLE paper_trade DISABLE TRIGGER USER; +ALTER TABLE mtm_ledger DISABLE TRIGGER USER; +ALTER TABLE event_ledger DISABLE TRIGGER USER; +ALTER TABLE paper_position DISABLE TRIGGER USER; +ALTER TABLE paper_equity_curve DISABLE TRIGGER USER; + +UPDATE engine_status + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE engine_event + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE paper_broker_account + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE paper_position + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE paper_order + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE paper_trade + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE paper_equity_curve + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE engine_state + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE engine_state_paper + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE mtm_ledger + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; +UPDATE event_ledger + SET run_id = 'default_' || user_id +WHERE run_id IS NULL OR run_id = 'default_run'; + +ALTER TABLE engine_status ENABLE TRIGGER USER; +ALTER TABLE engine_state ENABLE TRIGGER USER; +ALTER TABLE engine_state_paper ENABLE TRIGGER USER; +ALTER TABLE paper_broker_account ENABLE TRIGGER USER; +ALTER TABLE paper_order ENABLE TRIGGER USER; +ALTER TABLE paper_trade ENABLE TRIGGER USER; +ALTER TABLE mtm_ledger ENABLE TRIGGER USER; +ALTER TABLE event_ledger ENABLE TRIGGER USER; +ALTER TABLE paper_position ENABLE TRIGGER USER; +ALTER TABLE paper_equity_curve ENABLE TRIGGER USER; + +COMMIT; diff --git a/db_migrations/20260114_identity_fk_fix.sql b/db_migrations/20260114_identity_fk_fix.sql new file mode 100644 index 0000000..96868b6 --- /dev/null +++ b/db_migrations/20260114_identity_fk_fix.sql @@ -0,0 +1,150 @@ +BEGIN; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_strategy_run_user_run') THEN + ALTER TABLE strategy_run + ADD CONSTRAINT uq_strategy_run_user_run UNIQUE (user_id, run_id); + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_status_user_run') THEN + ALTER TABLE engine_status + ADD CONSTRAINT fk_engine_status_user_run + FOREIGN KEY (user_id, run_id) + REFERENCES strategy_run(user_id, run_id) + ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_user_run') THEN + ALTER TABLE engine_state + ADD CONSTRAINT fk_engine_state_user_run + FOREIGN KEY (user_id, run_id) + REFERENCES strategy_run(user_id, run_id) + ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_engine_state_paper_user_run') THEN + ALTER TABLE engine_state_paper + ADD CONSTRAINT fk_engine_state_paper_user_run + FOREIGN KEY (user_id, run_id) + REFERENCES strategy_run(user_id, run_id) + ON DELETE CASCADE; + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_paper_broker_account_user_run') THEN + ALTER TABLE paper_broker_account + ADD CONSTRAINT fk_paper_broker_account_user_run + FOREIGN KEY (user_id, run_id) + REFERENCES strategy_run(user_id, run_id) + ON DELETE CASCADE; + END IF; +END $$; + +ALTER TABLE strategy_config ALTER COLUMN id TYPE bigint; +ALTER TABLE engine_state ALTER COLUMN id TYPE bigint; +ALTER TABLE engine_status ALTER COLUMN id TYPE bigint; +ALTER TABLE engine_state_paper ALTER COLUMN id TYPE bigint; +ALTER TABLE paper_broker_account ALTER COLUMN id TYPE bigint; + +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'strategy_config'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE strategy_config ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'engine_state'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE engine_state ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'engine_status'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE engine_status ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'engine_state_paper'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE engine_state_paper ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY; + END IF; + IF EXISTS ( + SELECT 1 FROM pg_attribute + WHERE attrelid = 'paper_broker_account'::regclass AND attname = 'id' AND attidentity = '' + ) THEN + ALTER TABLE paper_broker_account ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY; + END IF; +END $$; + +DO $$ +DECLARE + seq_name text; + max_id bigint; +BEGIN + seq_name := pg_get_serial_sequence('strategy_config', 'id'); + IF seq_name IS NOT NULL THEN + SELECT MAX(id) INTO max_id FROM strategy_config; + IF max_id IS NULL OR max_id < 1 THEN + EXECUTE format('SELECT setval(%L, 1, false)', seq_name); + ELSE + EXECUTE format('SELECT setval(%L, %s, true)', seq_name, max_id); + END IF; + END IF; + seq_name := pg_get_serial_sequence('engine_state', 'id'); + IF seq_name IS NOT NULL THEN + SELECT MAX(id) INTO max_id FROM engine_state; + IF max_id IS NULL OR max_id < 1 THEN + EXECUTE format('SELECT setval(%L, 1, false)', seq_name); + ELSE + EXECUTE format('SELECT setval(%L, %s, true)', seq_name, max_id); + END IF; + END IF; + seq_name := pg_get_serial_sequence('engine_status', 'id'); + IF seq_name IS NOT NULL THEN + SELECT MAX(id) INTO max_id FROM engine_status; + IF max_id IS NULL OR max_id < 1 THEN + EXECUTE format('SELECT setval(%L, 1, false)', seq_name); + ELSE + EXECUTE format('SELECT setval(%L, %s, true)', seq_name, max_id); + END IF; + END IF; + seq_name := pg_get_serial_sequence('engine_state_paper', 'id'); + IF seq_name IS NOT NULL THEN + SELECT MAX(id) INTO max_id FROM engine_state_paper; + IF max_id IS NULL OR max_id < 1 THEN + EXECUTE format('SELECT setval(%L, 1, false)', seq_name); + ELSE + EXECUTE format('SELECT setval(%L, %s, true)', seq_name, max_id); + END IF; + END IF; + seq_name := pg_get_serial_sequence('paper_broker_account', 'id'); + IF seq_name IS NOT NULL THEN + SELECT MAX(id) INTO max_id FROM paper_broker_account; + IF max_id IS NULL OR max_id < 1 THEN + EXECUTE format('SELECT setval(%L, 1, false)', seq_name); + ELSE + EXECUTE format('SELECT setval(%L, %s, true)', seq_name, max_id); + END IF; + END IF; +END $$; + +DROP INDEX IF EXISTS idx_engine_state_user_run; +DROP INDEX IF EXISTS idx_engine_status_user_run; +DROP INDEX IF EXISTS idx_engine_state_paper_user_run; +DROP INDEX IF EXISTS idx_paper_broker_account_user_run; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'chk_engine_state_paper_cash_non_negative') THEN + ALTER TABLE engine_state_paper VALIDATE CONSTRAINT chk_engine_state_paper_cash_non_negative; + END IF; + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'chk_paper_broker_cash_non_negative') THEN + ALTER TABLE paper_broker_account VALIDATE CONSTRAINT chk_paper_broker_cash_non_negative; + END IF; +END $$; + +COMMIT; diff --git a/db_migrations/20260115_run_lifecycle_and_idempotency.sql b/db_migrations/20260115_run_lifecycle_and_idempotency.sql new file mode 100644 index 0000000..e754a89 --- /dev/null +++ b/db_migrations/20260115_run_lifecycle_and_idempotency.sql @@ -0,0 +1,165 @@ +BEGIN; + +ALTER TABLE event_ledger ADD COLUMN IF NOT EXISTS logical_time timestamptz; +ALTER TABLE mtm_ledger ADD COLUMN IF NOT EXISTS logical_time timestamptz; +ALTER TABLE paper_order ADD COLUMN IF NOT EXISTS logical_time timestamptz; +ALTER TABLE paper_trade ADD COLUMN IF NOT EXISTS logical_time timestamptz; +ALTER TABLE paper_equity_curve ADD COLUMN IF NOT EXISTS logical_time timestamptz; + +UPDATE event_ledger + SET logical_time = "timestamp" +WHERE logical_time IS NULL; +UPDATE mtm_ledger + SET logical_time = "timestamp" +WHERE logical_time IS NULL; +UPDATE paper_order + SET logical_time = "timestamp" +WHERE logical_time IS NULL; +UPDATE paper_trade + SET logical_time = "timestamp" +WHERE logical_time IS NULL; +UPDATE paper_equity_curve + SET logical_time = "timestamp" +WHERE logical_time IS NULL; + +ALTER TABLE event_ledger ALTER COLUMN logical_time SET NOT NULL; +ALTER TABLE mtm_ledger ALTER COLUMN logical_time SET NOT NULL; +ALTER TABLE paper_order ALTER COLUMN logical_time SET NOT NULL; +ALTER TABLE paper_trade ALTER COLUMN logical_time SET NOT NULL; +ALTER TABLE paper_equity_curve ALTER COLUMN logical_time SET NOT NULL; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_event_ledger_event_time') THEN + ALTER TABLE event_ledger + ADD CONSTRAINT uq_event_ledger_event_time UNIQUE (user_id, run_id, event, logical_time); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_mtm_ledger_logical_time') THEN + ALTER TABLE mtm_ledger + ADD CONSTRAINT uq_mtm_ledger_logical_time UNIQUE (user_id, run_id, logical_time); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_order_logical_key') THEN + ALTER TABLE paper_order + ADD CONSTRAINT uq_paper_order_logical_key UNIQUE (user_id, run_id, logical_time, symbol, side); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_trade_logical_key') THEN + ALTER TABLE paper_trade + ADD CONSTRAINT uq_paper_trade_logical_key UNIQUE (user_id, run_id, logical_time, symbol, side); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_equity_curve_logical_time') THEN + ALTER TABLE paper_equity_curve + ADD CONSTRAINT uq_paper_equity_curve_logical_time UNIQUE (user_id, run_id, logical_time); + END IF; +END $$; + +CREATE OR REPLACE FUNCTION assert_run_is_running() +RETURNS trigger AS $$ +DECLARE + run_status TEXT; +BEGIN + SELECT status INTO run_status + FROM strategy_run + WHERE user_id = NEW.user_id AND run_id = NEW.run_id; + + IF run_status IS NULL THEN + RAISE EXCEPTION 'run not found for user_id %, run_id %', NEW.user_id, NEW.run_id; + END IF; + + IF run_status <> 'RUNNING' THEN + RAISE EXCEPTION 'run %/% not RUNNING (status=%)', NEW.user_id, NEW.run_id, run_status; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION enforce_strategy_run_transition() +RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'UPDATE' THEN + IF OLD.status IN ('STOPPED','ERROR') AND NEW.status <> OLD.status THEN + RAISE EXCEPTION 'run status cannot transition from % to %', OLD.status, NEW.status; + END IF; + IF OLD.status <> 'RUNNING' AND NEW.status = 'RUNNING' THEN + RAISE EXCEPTION 'cannot transition to RUNNING from %', OLD.status; + END IF; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_strategy_run_transition') THEN + CREATE TRIGGER trg_strategy_run_transition + BEFORE UPDATE ON strategy_run + FOR EACH ROW + EXECUTE FUNCTION enforce_strategy_run_transition(); + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_engine_state_running') THEN + CREATE TRIGGER trg_engine_state_running + BEFORE INSERT OR UPDATE ON engine_state + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_engine_status_running') THEN + CREATE TRIGGER trg_engine_status_running + BEFORE INSERT OR UPDATE ON engine_status + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_engine_state_paper_running') THEN + CREATE TRIGGER trg_engine_state_paper_running + BEFORE INSERT OR UPDATE ON engine_state_paper + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_paper_broker_account_running') THEN + CREATE TRIGGER trg_paper_broker_account_running + BEFORE INSERT OR UPDATE ON paper_broker_account + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_paper_order_running') THEN + CREATE TRIGGER trg_paper_order_running + BEFORE INSERT OR UPDATE ON paper_order + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_paper_trade_running') THEN + CREATE TRIGGER trg_paper_trade_running + BEFORE INSERT OR UPDATE ON paper_trade + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_mtm_ledger_running') THEN + CREATE TRIGGER trg_mtm_ledger_running + BEFORE INSERT OR UPDATE ON mtm_ledger + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_event_ledger_running') THEN + CREATE TRIGGER trg_event_ledger_running + BEFORE INSERT OR UPDATE ON event_ledger + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_paper_position_running') THEN + CREATE TRIGGER trg_paper_position_running + BEFORE INSERT OR UPDATE ON paper_position + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_paper_equity_curve_running') THEN + CREATE TRIGGER trg_paper_equity_curve_running + BEFORE INSERT OR UPDATE ON paper_equity_curve + FOR EACH ROW + EXECUTE FUNCTION assert_run_is_running(); + END IF; +END $$; + +COMMIT; diff --git a/db_migrations/20260116_paper_position_pk_scope.sql b/db_migrations/20260116_paper_position_pk_scope.sql new file mode 100644 index 0000000..7009892 --- /dev/null +++ b/db_migrations/20260116_paper_position_pk_scope.sql @@ -0,0 +1,18 @@ +BEGIN; + +DO $$ +DECLARE + rec record; +BEGIN + FOR rec IN + SELECT conname + FROM pg_constraint + WHERE contype = 'p' AND conrelid = 'paper_position'::regclass + LOOP + EXECUTE format('ALTER TABLE paper_position DROP CONSTRAINT %I', rec.conname); + END LOOP; + ALTER TABLE paper_position + ADD CONSTRAINT paper_position_pkey PRIMARY KEY (user_id, run_id, symbol); +END $$; + +COMMIT; diff --git a/db_migrations/20260116_paper_position_scope_unique.sql b/db_migrations/20260116_paper_position_scope_unique.sql new file mode 100644 index 0000000..b417f79 --- /dev/null +++ b/db_migrations/20260116_paper_position_scope_unique.sql @@ -0,0 +1,11 @@ +BEGIN; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_paper_position_scope') THEN + ALTER TABLE paper_position + ADD CONSTRAINT uq_paper_position_scope UNIQUE (user_id, run_id, symbol); + END IF; +END $$; + +COMMIT; diff --git a/db_migrations/20260117_one_running_run_per_user.sql b/db_migrations/20260117_one_running_run_per_user.sql new file mode 100644 index 0000000..93e6229 --- /dev/null +++ b/db_migrations/20260117_one_running_run_per_user.sql @@ -0,0 +1,3 @@ +CREATE UNIQUE INDEX IF NOT EXISTS uq_one_running_run_per_user +ON strategy_run (user_id) +WHERE status = 'RUNNING'; diff --git a/db_migrations/20260117_timeseries_pk_scope.sql b/db_migrations/20260117_timeseries_pk_scope.sql new file mode 100644 index 0000000..2016aae --- /dev/null +++ b/db_migrations/20260117_timeseries_pk_scope.sql @@ -0,0 +1,33 @@ +BEGIN; + +DO $$ +DECLARE + rec record; +BEGIN + FOR rec IN + SELECT conname + FROM pg_constraint + WHERE contype = 'p' AND conrelid = 'mtm_ledger'::regclass + LOOP + EXECUTE format('ALTER TABLE mtm_ledger DROP CONSTRAINT %I', rec.conname); + END LOOP; + ALTER TABLE mtm_ledger + ADD CONSTRAINT mtm_ledger_pkey PRIMARY KEY (user_id, run_id, logical_time); +END $$; + +DO $$ +DECLARE + rec record; +BEGIN + FOR rec IN + SELECT conname + FROM pg_constraint + WHERE contype = 'p' AND conrelid = 'paper_equity_curve'::regclass + LOOP + EXECUTE format('ALTER TABLE paper_equity_curve DROP CONSTRAINT %I', rec.conname); + END LOOP; + ALTER TABLE paper_equity_curve + ADD CONSTRAINT paper_equity_curve_pkey PRIMARY KEY (user_id, run_id, logical_time); +END $$; + +COMMIT; diff --git a/db_migrations/20260118_admin_rbac_views.sql b/db_migrations/20260118_admin_rbac_views.sql new file mode 100644 index 0000000..90a8355 --- /dev/null +++ b/db_migrations/20260118_admin_rbac_views.sql @@ -0,0 +1,235 @@ +BEGIN; + +ALTER TABLE app_user ADD COLUMN IF NOT EXISTS is_admin boolean NOT NULL DEFAULT false; +CREATE INDEX IF NOT EXISTS idx_app_user_is_admin ON app_user (is_admin); + +CREATE INDEX IF NOT EXISTS idx_strategy_run_user_status ON strategy_run (user_id, status); +CREATE INDEX IF NOT EXISTS idx_strategy_run_user_created ON strategy_run (user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_engine_event_user_run_ts ON engine_event (user_id, run_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_strategy_log_user_run_ts ON strategy_log (user_id, run_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_event_ledger_user_run_logical ON event_ledger (user_id, run_id, logical_time); +CREATE INDEX IF NOT EXISTS idx_paper_order_user_run_ts ON paper_order (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_paper_trade_user_run_ts ON paper_trade (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_mtm_ledger_user_run_ts ON mtm_ledger (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_paper_equity_curve_user_run_ts ON paper_equity_curve (user_id, run_id, "timestamp" DESC); +CREATE INDEX IF NOT EXISTS idx_engine_status_user_run ON engine_status (user_id, run_id); +CREATE INDEX IF NOT EXISTS idx_engine_state_paper_user_run ON engine_state_paper (user_id, run_id); + +CREATE OR REPLACE VIEW admin_user_metrics AS +WITH session_stats AS ( + SELECT + user_id, + MIN(created_at) AS first_session_at, + MAX(COALESCE(last_seen_at, created_at)) AS last_login_at + FROM app_session + GROUP BY user_id +), +run_stats AS ( + SELECT + user_id, + COUNT(*) AS runs_count, + MAX(CASE WHEN status = 'RUNNING' THEN run_id END) AS active_run_id, + MAX(CASE WHEN status = 'RUNNING' THEN status END) AS active_run_status, + MIN(created_at) AS first_run_at + FROM strategy_run + GROUP BY user_id +), +broker_stats AS ( + SELECT user_id, BOOL_OR(connected) AS broker_connected + FROM user_broker + GROUP BY user_id +) +SELECT + u.id AS user_id, + u.username, + u.is_admin, + COALESCE(session_stats.first_session_at, run_stats.first_run_at) AS created_at, + session_stats.last_login_at, + COALESCE(run_stats.runs_count, 0) AS runs_count, + run_stats.active_run_id, + run_stats.active_run_status, + COALESCE(broker_stats.broker_connected, FALSE) AS broker_connected +FROM app_user u +LEFT JOIN session_stats ON session_stats.user_id = u.id +LEFT JOIN run_stats ON run_stats.user_id = u.id +LEFT JOIN broker_stats ON broker_stats.user_id = u.id; + +CREATE OR REPLACE VIEW admin_run_metrics AS +WITH order_stats AS ( + SELECT user_id, run_id, COUNT(*) AS order_count, MAX("timestamp") AS last_order_time + FROM paper_order + GROUP BY user_id, run_id +), +trade_stats AS ( + SELECT user_id, run_id, COUNT(*) AS trade_count, MAX("timestamp") AS last_trade_time + FROM paper_trade + GROUP BY user_id, run_id +), +event_stats AS ( + SELECT + user_id, + run_id, + MAX("timestamp") AS last_event_time, + MAX(CASE WHEN event = 'SIP_EXECUTED' THEN "timestamp" END) AS last_sip_time + FROM event_ledger + GROUP BY user_id, run_id +), +equity_latest AS ( + SELECT DISTINCT ON (user_id, run_id) + user_id, + run_id, + equity AS equity_latest, + pnl AS pnl_latest, + "timestamp" AS equity_ts + FROM paper_equity_curve + ORDER BY user_id, run_id, "timestamp" DESC +), +mtm_latest AS ( + SELECT DISTINCT ON (user_id, run_id) + user_id, + run_id, + "timestamp" AS mtm_ts + FROM mtm_ledger + ORDER BY user_id, run_id, "timestamp" DESC +), +log_latest AS ( + SELECT user_id, run_id, MAX(ts) AS last_log_time + FROM strategy_log + GROUP BY user_id, run_id +), +engine_latest AS ( + SELECT user_id, run_id, MAX(ts) AS last_engine_time + FROM engine_event + GROUP BY user_id, run_id +), +activity AS ( + SELECT user_id, run_id, MAX(ts) AS last_event_time + FROM ( + SELECT user_id, run_id, ts FROM engine_event + UNION ALL + SELECT user_id, run_id, ts FROM strategy_log + UNION ALL + SELECT user_id, run_id, "timestamp" AS ts FROM paper_order + UNION ALL + SELECT user_id, run_id, "timestamp" AS ts FROM paper_trade + UNION ALL + SELECT user_id, run_id, "timestamp" AS ts FROM mtm_ledger + UNION ALL + SELECT user_id, run_id, "timestamp" AS ts FROM paper_equity_curve + UNION ALL + SELECT user_id, run_id, "timestamp" AS ts FROM event_ledger + ) t + GROUP BY user_id, run_id +) +SELECT + sr.run_id, + sr.user_id, + sr.status, + sr.created_at, + sr.started_at, + sr.stopped_at, + sr.strategy, + sr.mode, + sr.broker, + sc.sip_amount, + sc.sip_frequency_value, + sc.sip_frequency_unit, + sc.next_run AS next_sip_time, + activity.last_event_time, + event_stats.last_sip_time, + COALESCE(order_stats.order_count, 0) AS order_count, + COALESCE(trade_stats.trade_count, 0) AS trade_count, + equity_latest.equity_latest, + equity_latest.pnl_latest +FROM strategy_run sr +LEFT JOIN strategy_config sc + ON sc.user_id = sr.user_id AND sc.run_id = sr.run_id +LEFT JOIN order_stats + ON order_stats.user_id = sr.user_id AND order_stats.run_id = sr.run_id +LEFT JOIN trade_stats + ON trade_stats.user_id = sr.user_id AND trade_stats.run_id = sr.run_id +LEFT JOIN event_stats + ON event_stats.user_id = sr.user_id AND event_stats.run_id = sr.run_id +LEFT JOIN equity_latest + ON equity_latest.user_id = sr.user_id AND equity_latest.run_id = sr.run_id +LEFT JOIN mtm_latest + ON mtm_latest.user_id = sr.user_id AND mtm_latest.run_id = sr.run_id +LEFT JOIN log_latest + ON log_latest.user_id = sr.user_id AND log_latest.run_id = sr.run_id +LEFT JOIN engine_latest + ON engine_latest.user_id = sr.user_id AND engine_latest.run_id = sr.run_id +LEFT JOIN activity + ON activity.user_id = sr.user_id AND activity.run_id = sr.run_id; + +CREATE OR REPLACE VIEW admin_engine_health AS +WITH activity AS ( + SELECT user_id, run_id, MAX(ts) AS last_event_time + FROM ( + SELECT user_id, run_id, ts FROM engine_event + UNION ALL + SELECT user_id, run_id, ts FROM strategy_log + UNION ALL + SELECT user_id, run_id, "timestamp" AS ts FROM event_ledger + ) t + GROUP BY user_id, run_id +) +SELECT + sr.run_id, + sr.user_id, + sr.status, + activity.last_event_time, + es.status AS engine_status, + es.last_updated AS engine_status_ts +FROM strategy_run sr +LEFT JOIN activity + ON activity.user_id = sr.user_id AND activity.run_id = sr.run_id +LEFT JOIN engine_status es + ON es.user_id = sr.user_id AND es.run_id = sr.run_id; + +CREATE OR REPLACE VIEW admin_order_stats AS +SELECT + user_id, + run_id, + COUNT(*) AS total_orders, + COUNT(*) FILTER (WHERE "timestamp" >= now() - interval '24 hours') AS orders_last_24h, + COUNT(*) FILTER (WHERE status = 'FILLED') AS filled_orders +FROM paper_order +GROUP BY user_id, run_id; + +CREATE OR REPLACE VIEW admin_ledger_stats AS +WITH mtm_latest AS ( + SELECT DISTINCT ON (user_id, run_id) + user_id, + run_id, + portfolio_value, + pnl, + "timestamp" AS mtm_ts + FROM mtm_ledger + ORDER BY user_id, run_id, "timestamp" DESC +), +equity_latest AS ( + SELECT DISTINCT ON (user_id, run_id) + user_id, + run_id, + equity, + pnl, + "timestamp" AS equity_ts + FROM paper_equity_curve + ORDER BY user_id, run_id, "timestamp" DESC +) +SELECT + sr.user_id, + sr.run_id, + mtm_latest.portfolio_value AS mtm_value, + mtm_latest.pnl AS mtm_pnl, + mtm_latest.mtm_ts, + equity_latest.equity AS equity_value, + equity_latest.pnl AS equity_pnl, + equity_latest.equity_ts +FROM strategy_run sr +LEFT JOIN mtm_latest + ON mtm_latest.user_id = sr.user_id AND mtm_latest.run_id = sr.run_id +LEFT JOIN equity_latest + ON equity_latest.user_id = sr.user_id AND equity_latest.run_id = sr.run_id; + +COMMIT; diff --git a/db_migrations/20260119_admin_hard_delete.sql b/db_migrations/20260119_admin_hard_delete.sql new file mode 100644 index 0000000..804172a --- /dev/null +++ b/db_migrations/20260119_admin_hard_delete.sql @@ -0,0 +1,86 @@ +BEGIN; + +ALTER TABLE app_user ADD COLUMN IF NOT EXISTS is_super_admin boolean NOT NULL DEFAULT false; +CREATE INDEX IF NOT EXISTS idx_app_user_is_super_admin ON app_user (is_super_admin); + +CREATE TABLE IF NOT EXISTS admin_audit_log ( + id bigserial PRIMARY KEY, + ts timestamptz NOT NULL DEFAULT now(), + actor_user_hash text NOT NULL, + target_user_hash text NOT NULL, + target_username_hash text, + action text NOT NULL, + meta jsonb +); + +CREATE OR REPLACE FUNCTION prevent_super_admin_delete() +RETURNS trigger AS $$ +BEGIN + IF OLD.is_super_admin THEN + RAISE EXCEPTION 'cannot delete super admin user'; + END IF; + RETURN OLD; +END; +$$ LANGUAGE plpgsql; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_prevent_super_admin_delete') THEN + CREATE TRIGGER trg_prevent_super_admin_delete + BEFORE DELETE ON app_user + FOR EACH ROW + EXECUTE FUNCTION prevent_super_admin_delete(); + END IF; +END $$; + +DO $$ +DECLARE + tbl text; + c record; + fk_name text; +BEGIN + FOREACH tbl IN ARRAY ARRAY[ + 'app_session', + 'user_broker', + 'zerodha_session', + 'zerodha_request_token', + 'strategy_run', + 'strategy_config', + 'strategy_log', + 'engine_status', + 'engine_state', + 'engine_state_paper', + 'engine_event', + 'paper_broker_account', + 'paper_position', + 'paper_order', + 'paper_trade', + 'paper_equity_curve', + 'mtm_ledger', + 'event_ledger' + ] + LOOP + IF to_regclass(tbl) IS NOT NULL THEN + FOR c IN + SELECT conname + FROM pg_constraint + WHERE contype = 'f' + AND conrelid = tbl::regclass + AND confrelid = 'app_user'::regclass + LOOP + EXECUTE format('ALTER TABLE %I DROP CONSTRAINT %I', tbl, c.conname); + END LOOP; + + fk_name := format('fk_%s_user', tbl); + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = fk_name) THEN + EXECUTE format( + 'ALTER TABLE %I ADD CONSTRAINT %I FOREIGN KEY (user_id) REFERENCES app_user(id) ON DELETE CASCADE', + tbl, + fk_name + ); + END IF; + END IF; + END LOOP; +END $$; + +COMMIT; diff --git a/db_migrations/20260120_add_roles.sql b/db_migrations/20260120_add_roles.sql new file mode 100644 index 0000000..fc79c02 --- /dev/null +++ b/db_migrations/20260120_add_roles.sql @@ -0,0 +1,105 @@ +BEGIN; + +ALTER TABLE app_user + ADD COLUMN IF NOT EXISTS role TEXT NOT NULL DEFAULT 'USER'; + +CREATE INDEX IF NOT EXISTS idx_app_user_role ON app_user(role); + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'chk_app_user_role') THEN + ALTER TABLE app_user + ADD CONSTRAINT chk_app_user_role + CHECK (role IN ('USER','ADMIN','SUPER_ADMIN')); + END IF; +END $$; + +UPDATE app_user + SET role = 'SUPER_ADMIN' +WHERE role <> 'SUPER_ADMIN' AND COALESCE(is_super_admin, false) = true; + +UPDATE app_user + SET role = 'ADMIN' +WHERE role = 'USER' AND COALESCE(is_admin, false) = true; + +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'app_user' AND column_name = 'is_admin' + ) THEN + UPDATE app_user + SET is_admin = (role IN ('ADMIN','SUPER_ADMIN')) + WHERE is_admin IS DISTINCT FROM (role IN ('ADMIN','SUPER_ADMIN')); + END IF; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'app_user' AND column_name = 'is_super_admin' + ) THEN + UPDATE app_user + SET is_super_admin = (role = 'SUPER_ADMIN') + WHERE is_super_admin IS DISTINCT FROM (role = 'SUPER_ADMIN'); + END IF; +END $$; + +CREATE TABLE IF NOT EXISTS admin_role_audit ( + id BIGSERIAL PRIMARY KEY, + actor_user_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + old_role TEXT NOT NULL, + new_role TEXT NOT NULL, + changed_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE OR REPLACE FUNCTION prevent_super_admin_delete() +RETURNS trigger AS $$ +BEGIN + IF OLD.role = 'SUPER_ADMIN' OR OLD.is_super_admin THEN + RAISE EXCEPTION 'cannot delete super admin user'; + END IF; + RETURN OLD; +END; +$$ LANGUAGE plpgsql; + +DROP VIEW IF EXISTS admin_user_metrics; +CREATE OR REPLACE VIEW admin_user_metrics AS +WITH session_stats AS ( + SELECT + user_id, + MIN(created_at) AS first_session_at, + MAX(COALESCE(last_seen_at, created_at)) AS last_login_at + FROM app_session + GROUP BY user_id +), +run_stats AS ( + SELECT + user_id, + COUNT(*) AS runs_count, + MAX(CASE WHEN status = 'RUNNING' THEN run_id END) AS active_run_id, + MAX(CASE WHEN status = 'RUNNING' THEN status END) AS active_run_status, + MIN(created_at) AS first_run_at + FROM strategy_run + GROUP BY user_id +), +broker_stats AS ( + SELECT user_id, BOOL_OR(connected) AS broker_connected + FROM user_broker + GROUP BY user_id +) +SELECT + u.id AS user_id, + u.username, + u.role, + (u.role IN ('ADMIN','SUPER_ADMIN')) AS is_admin, + COALESCE(session_stats.first_session_at, run_stats.first_run_at) AS created_at, + session_stats.last_login_at, + COALESCE(run_stats.runs_count, 0) AS runs_count, + run_stats.active_run_id, + run_stats.active_run_status, + COALESCE(broker_stats.broker_connected, FALSE) AS broker_connected +FROM app_user u +LEFT JOIN session_stats ON session_stats.user_id = u.id +LEFT JOIN run_stats ON run_stats.user_id = u.id +LEFT JOIN broker_stats ON broker_stats.user_id = u.id; + +COMMIT; diff --git a/db_migrations/20260201_system_arm.sql b/db_migrations/20260201_system_arm.sql new file mode 100644 index 0000000..f4a7e5b --- /dev/null +++ b/db_migrations/20260201_system_arm.sql @@ -0,0 +1,46 @@ +BEGIN; + +ALTER TABLE user_broker ADD COLUMN IF NOT EXISTS api_secret TEXT; +ALTER TABLE user_broker ADD COLUMN IF NOT EXISTS auth_state TEXT; + +UPDATE user_broker +SET auth_state = 'UNKNOWN' +WHERE auth_state IS NULL; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'chk_strategy_run_status') THEN + ALTER TABLE strategy_run DROP CONSTRAINT chk_strategy_run_status; + END IF; + ALTER TABLE strategy_run + ADD CONSTRAINT chk_strategy_run_status + CHECK (status IN ('RUNNING','STOPPED','ERROR','PAUSED_AUTH_EXPIRED')); +END $$; + +CREATE OR REPLACE FUNCTION enforce_strategy_run_transition() +RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'UPDATE' THEN + IF OLD.status = NEW.status THEN + RETURN NEW; + END IF; + + IF OLD.status = 'ERROR' THEN + RAISE EXCEPTION 'run status cannot transition from % to %', OLD.status, NEW.status; + END IF; + + IF OLD.status IN ('STOPPED','PAUSED_AUTH_EXPIRED') AND NEW.status <> 'RUNNING' THEN + RAISE EXCEPTION 'run status cannot transition from % to %', OLD.status, NEW.status; + END IF; + + IF OLD.status = 'RUNNING' AND NEW.status NOT IN ('STOPPED','ERROR','PAUSED_AUTH_EXPIRED') THEN + RAISE EXCEPTION 'run status cannot transition from % to %', OLD.status, NEW.status; + END IF; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP INDEX IF EXISTS uq_one_running_run_per_user; + +COMMIT; diff --git a/db_migrations/20260203_password_reset_and_session_meta.sql b/db_migrations/20260203_password_reset_and_session_meta.sql new file mode 100644 index 0000000..171a30b --- /dev/null +++ b/db_migrations/20260203_password_reset_and_session_meta.sql @@ -0,0 +1,24 @@ +BEGIN; + +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +ALTER TABLE app_session + ADD COLUMN IF NOT EXISTS ip TEXT, + ADD COLUMN IF NOT EXISTS user_agent TEXT; + +CREATE TABLE IF NOT EXISTS password_reset_otp ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + email TEXT NOT NULL, + otp_hash TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + expires_at TIMESTAMPTZ NOT NULL, + used_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_password_reset_otp_email + ON password_reset_otp(email); + +CREATE INDEX IF NOT EXISTS idx_password_reset_otp_expires_at + ON password_reset_otp(expires_at); + +COMMIT; diff --git a/db_migrations/20260203_support_tickets.sql b/db_migrations/20260203_support_tickets.sql new file mode 100644 index 0000000..df44710 --- /dev/null +++ b/db_migrations/20260203_support_tickets.sql @@ -0,0 +1,22 @@ +BEGIN; + +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE IF NOT EXISTS support_ticket ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + email TEXT NOT NULL, + subject TEXT NOT NULL, + message TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'NEW', + created_at TIMESTAMPTZ DEFAULT now(), + updated_at TIMESTAMPTZ DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_support_ticket_email + ON support_ticket(email); + +CREATE INDEX IF NOT EXISTS idx_support_ticket_created_at + ON support_ticket(created_at DESC); + +COMMIT; diff --git a/indian_paper_trading_strategy/engine/db.py b/indian_paper_trading_strategy/engine/db.py new file mode 100644 index 0000000..117885e --- /dev/null +++ b/indian_paper_trading_strategy/engine/db.py @@ -0,0 +1,388 @@ +import os +import threading +import time +import smtplib +import ssl +from email.message import EmailMessage +from contextlib import contextmanager +from datetime import datetime, timezone +from contextvars import ContextVar + +import psycopg2 +from psycopg2 import pool +from psycopg2 import OperationalError, InterfaceError +from psycopg2.extras import Json + +_POOL = None +_POOL_LOCK = threading.Lock() +_DEFAULT_USER_ID = None +_DEFAULT_LOCK = threading.Lock() + +_USER_ID = ContextVar("engine_user_id", default=None) +_RUN_ID = ContextVar("engine_run_id", default=None) + + +def _db_config(): + url = os.getenv("DATABASE_URL") + if url: + return {"dsn": url} + + schema = os.getenv("DB_SCHEMA") or os.getenv("PGSCHEMA") or "quant_app" + + return { + "host": os.getenv("DB_HOST") or os.getenv("PGHOST") or "localhost", + "port": int(os.getenv("DB_PORT") or os.getenv("PGPORT") or "5432"), + "dbname": os.getenv("DB_NAME") or os.getenv("PGDATABASE") or "trading_db", + "user": os.getenv("DB_USER") or os.getenv("PGUSER") or "trader", + "password": os.getenv("DB_PASSWORD") or os.getenv("PGPASSWORD") or "traderpass", + "connect_timeout": int(os.getenv("DB_CONNECT_TIMEOUT", "5")), + "options": f"-csearch_path={schema},public" if schema else None, + } + + +def _init_pool(): + config = _db_config() + config = {k: v for k, v in config.items() if v is not None} + minconn = int(os.getenv("DB_POOL_MIN", "1")) + maxconn = int(os.getenv("DB_POOL_MAX", "10")) + if "dsn" in config: + return pool.ThreadedConnectionPool(minconn, maxconn, dsn=config["dsn"]) + return pool.ThreadedConnectionPool(minconn, maxconn, **config) + + +def get_pool(): + global _POOL + if _POOL is None: + with _POOL_LOCK: + if _POOL is None: + _POOL = _init_pool() + return _POOL + + +def _get_connection(): + return get_pool().getconn() + + +def _put_connection(conn, close=False): + try: + get_pool().putconn(conn, close=close) + except Exception: + try: + conn.close() + except Exception: + pass + + +@contextmanager +def db_connection(retries: int | None = None, delay: float | None = None): + attempts = retries if retries is not None else int(os.getenv("DB_RETRY_COUNT", "3")) + backoff = delay if delay is not None else float(os.getenv("DB_RETRY_DELAY", "0.2")) + last_error = None + for attempt in range(attempts): + conn = None + try: + conn = _get_connection() + conn.autocommit = False + yield conn + return + except (OperationalError, InterfaceError) as exc: + last_error = exc + if conn is not None: + _put_connection(conn, close=True) + conn = None + time.sleep(backoff * (2 ** attempt)) + continue + finally: + if conn is not None: + _put_connection(conn, close=conn.closed != 0) + if last_error: + raise last_error + + +def run_with_retry(operation, retries: int | None = None, delay: float | None = None): + attempts = retries if retries is not None else int(os.getenv("DB_RETRY_COUNT", "3")) + backoff = delay if delay is not None else float(os.getenv("DB_RETRY_DELAY", "0.2")) + last_error = None + for attempt in range(attempts): + with db_connection(retries=1) as conn: + try: + with conn.cursor() as cur: + result = operation(cur, conn) + conn.commit() + return result + except (OperationalError, InterfaceError) as exc: + conn.rollback() + last_error = exc + time.sleep(backoff * (2 ** attempt)) + continue + except Exception: + conn.rollback() + raise + if last_error: + raise last_error + + +@contextmanager +def db_transaction(): + with db_connection() as conn: + try: + with conn.cursor() as cur: + yield cur + conn.commit() + except Exception: + conn.rollback() + raise + + +def _utc_now(): + return datetime.utcnow().replace(tzinfo=timezone.utc) + + +def set_context(user_id: str | None, run_id: str | None): + token_user = _USER_ID.set(user_id) + token_run = _RUN_ID.set(run_id) + return token_user, token_run + + +def reset_context(token_user, token_run): + _USER_ID.reset(token_user) + _RUN_ID.reset(token_run) + + +@contextmanager +def engine_context(user_id: str, run_id: str): + token_user, token_run = set_context(user_id, run_id) + try: + yield + finally: + reset_context(token_user, token_run) + + +def _resolve_context(user_id: str | None = None, run_id: str | None = None): + ctx_user = user_id or _USER_ID.get() + ctx_run = run_id or _RUN_ID.get() + if ctx_user and ctx_run: + return ctx_user, ctx_run + env_user = os.getenv("ENGINE_USER_ID") + env_run = os.getenv("ENGINE_RUN_ID") + if not ctx_user and env_user: + ctx_user = env_user + if not ctx_run and env_run: + ctx_run = env_run + if ctx_user and ctx_run: + return ctx_user, ctx_run + if not ctx_user: + ctx_user = get_default_user_id() + if ctx_user and not ctx_run: + ctx_run = get_active_run_id(ctx_user) + if not ctx_user or not ctx_run: + raise ValueError("engine context missing user_id/run_id") + return ctx_user, ctx_run + + +def get_context(user_id: str | None = None, run_id: str | None = None): + return _resolve_context(user_id, run_id) + + +def get_default_user_id(): + global _DEFAULT_USER_ID + if _DEFAULT_USER_ID: + return _DEFAULT_USER_ID + + def _op(cur, _conn): + cur.execute("SELECT id FROM app_user ORDER BY username LIMIT 1") + row = cur.fetchone() + return row[0] if row else None + + user_id = run_with_retry(_op) + if user_id: + with _DEFAULT_LOCK: + _DEFAULT_USER_ID = user_id + return user_id + + +def _default_run_id(user_id: str) -> str: + return f"default_{user_id}" + + +def ensure_default_run(user_id: str): + run_id = _default_run_id(user_id) + + def _op(cur, _conn): + now = _utc_now() + cur.execute( + """ + INSERT INTO strategy_run ( + run_id, user_id, created_at, started_at, stopped_at, status, strategy, mode, broker, meta + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (run_id) DO NOTHING + """, + ( + run_id, + user_id, + now, + None, + None, + "STOPPED", + None, + None, + None, + Json({}), + ), + ) + return run_id + + return run_with_retry(_op) + + +def get_active_run_id(user_id: str): + def _op(cur, _conn): + cur.execute( + """ + SELECT run_id + FROM strategy_run + WHERE user_id = %s AND status = 'RUNNING' + ORDER BY created_at DESC + LIMIT 1 + """, + (user_id,), + ) + row = cur.fetchone() + if row: + return row[0] + cur.execute( + """ + SELECT run_id + FROM strategy_run + WHERE user_id = %s + ORDER BY created_at DESC + LIMIT 1 + """, + (user_id,), + ) + row = cur.fetchone() + if row: + return row[0] + return None + + run_id = run_with_retry(_op) + if run_id: + return run_id + return ensure_default_run(user_id) + + +def get_running_runs(user_id: str | None = None): + def _op(cur, _conn): + if user_id: + cur.execute( + """ + SELECT user_id, run_id + FROM strategy_run + WHERE user_id = %s AND status = 'RUNNING' + ORDER BY created_at DESC + """, + (user_id,), + ) + else: + cur.execute( + """ + SELECT user_id, run_id + FROM strategy_run + WHERE status = 'RUNNING' + ORDER BY created_at DESC + """ + ) + return cur.fetchall() + + return run_with_retry(_op) + + +def insert_engine_event( + cur, + event: str, + data=None, + message: str | None = None, + meta=None, + ts=None, + user_id: str | None = None, + run_id: str | None = None, +): + when = ts or _utc_now() + scope_user, scope_run = _resolve_context(user_id, run_id) + cur.execute( + """ + INSERT INTO engine_event (user_id, run_id, ts, event, data, message, meta) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """, + ( + scope_user, + scope_run, + when, + event, + Json(data) if data is not None else None, + message, + Json(meta) if meta is not None else None, + ), + ) + + if event in {"ORDER_PLACED", "ORDER_FILLED"}: + try: + _send_order_email(scope_user, event, data or {}) + except Exception: + pass + + +def _get_user_email(user_id: str | None): + if not user_id: + return None + + def _op(cur, _conn): + cur.execute("SELECT username FROM app_user WHERE id = %s", (user_id,)) + row = cur.fetchone() + return row[0] if row else None + + return run_with_retry(_op) + + +def _send_email(to_email: str, subject: str, body_text: str) -> bool: + smtp_user = (os.getenv("SMTP_USER") or "").strip() + smtp_pass = (os.getenv("SMTP_PASS") or "").replace(" ", "").strip() + smtp_host = (os.getenv("SMTP_HOST") or "smtp.gmail.com").strip() + smtp_port = int((os.getenv("SMTP_PORT") or "587").strip()) + from_name = (os.getenv("SMTP_FROM_NAME") or "Quantfortune Support").strip() + + if not smtp_user or not smtp_pass or not to_email: + return False + + msg = EmailMessage() + msg["From"] = f"{from_name} <{smtp_user}>" + msg["To"] = to_email + msg["Subject"] = subject + msg.set_content(body_text) + + context = ssl.create_default_context() + with smtplib.SMTP(smtp_host, smtp_port) as server: + server.starttls(context=context) + server.login(smtp_user, smtp_pass) + server.send_message(msg) + return True + + +def _send_order_email(user_id: str | None, event: str, data: dict): + email = _get_user_email(user_id) + if not email: + return + symbol = data.get("symbol") or "N/A" + side = data.get("side") or "N/A" + qty = data.get("qty") or data.get("quantity") or "N/A" + status = data.get("status") or ("FILLED" if event == "ORDER_FILLED" else "PLACED") + subject = f"Order {status}: {symbol}" + body = ( + "Order update from Quantfortune.\n\n" + f"Symbol: {symbol}\n" + f"Side: {side}\n" + f"Qty: {qty}\n" + f"Status: {status}\n" + ) + _send_email(email, subject, body) diff --git a/migrate_to_db.py b/migrate_to_db.py new file mode 100644 index 0000000..e63b53d --- /dev/null +++ b/migrate_to_db.py @@ -0,0 +1,914 @@ +import json +import math +from datetime import datetime, timezone, date +from pathlib import Path + +import pandas as pd +import psycopg2 +from psycopg2.extras import execute_values, Json + + +DB_CONFIG = { + "host": "localhost", + "port": 5432, + "dbname": "trading_db", + "user": "trader", + "password": "traderpass", +} + + +ROOT = Path(__file__).resolve().parent +BACKEND_STORAGE = ROOT / "backend" / "storage" +BACKEND_LOGS = ROOT / "backend" / "logs" +ENGINE_STORAGE = ROOT / "indian_paper_trading_strategy" / "storage" +ENGINE_HISTORY = ROOT / "indian_paper_trading_strategy" / "history" +ROOT_STORAGE = ROOT / "storage" + + +def load_json(path, default): + if not path.exists(): + return default + try: + raw = path.read_text(encoding="utf-8").strip() + if not raw: + return default + return json.loads(raw) + except Exception: + return default + + +def parse_ts(value): + if value is None: + return None + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value.astimezone(timezone.utc) + if isinstance(value, date) and not isinstance(value, datetime): + return datetime(value.year, value.month, value.day, tzinfo=timezone.utc) + try: + ts = pd.to_datetime(value, utc=True, errors="coerce") + except Exception: + return None + if pd.isna(ts): + return None + return ts.to_pydatetime() + + +def parse_date(value): + if value is None: + return None + if isinstance(value, date) and not isinstance(value, datetime): + return value + try: + ts = pd.to_datetime(value, utc=True, errors="coerce") + except Exception: + return None + if pd.isna(ts): + return None + return ts.date() + + +def to_number(value): + if value is None: + return None + if isinstance(value, (int, float)): + if isinstance(value, float) and (math.isnan(value) or math.isinf(value)): + return None + return float(value) + if isinstance(value, str): + text = value.strip() + if not text: + return None + try: + return float(text) + except Exception: + return None + if pd.isna(value): + return None + return value + + +def to_int(value): + if value is None: + return None + if isinstance(value, bool): + return int(value) + if isinstance(value, (int, float)): + if isinstance(value, float) and math.isnan(value): + return None + return int(value) + if isinstance(value, str): + text = value.strip() + if not text: + return None + try: + return int(float(text)) + except Exception: + return None + return None + + +def normalize_text(value): + if value is None: + return None + if isinstance(value, str): + return value + return json.dumps(value) + + +def run_domain(conn, name, func): + with conn: + with conn.cursor() as cur: + func(cur) + + +def migrate_users_sessions(cur): + users = load_json(BACKEND_STORAGE / "users.json", []) + if isinstance(users, list) and users: + rows = [] + for user in users: + if not isinstance(user, dict): + continue + user_id = user.get("id") + username = user.get("username") + password_hash = user.get("password") + if not user_id or not username or password_hash is None: + continue + rows.append((user_id, username, password_hash)) + if rows: + execute_values( + cur, + """ + INSERT INTO app_user (id, username, password_hash) + VALUES %s + ON CONFLICT (id) DO UPDATE + SET username = EXCLUDED.username, + password_hash = EXCLUDED.password_hash + """, + rows, + ) + + sessions = load_json(BACKEND_STORAGE / "sessions.json", []) + if isinstance(sessions, list) and sessions: + rows = [] + for session in sessions: + if not isinstance(session, dict): + continue + session_id = session.get("id") + user_id = session.get("user_id") + created_at = parse_ts(session.get("created_at")) + last_seen_at = parse_ts(session.get("last_seen_at")) + expires_at = parse_ts(session.get("expires_at")) + if not session_id or not user_id or created_at is None or expires_at is None: + continue + rows.append((session_id, user_id, created_at, last_seen_at, expires_at)) + if rows: + execute_values( + cur, + """ + INSERT INTO app_session (id, user_id, created_at, last_seen_at, expires_at) + VALUES %s + ON CONFLICT (id) DO UPDATE + SET user_id = EXCLUDED.user_id, + created_at = EXCLUDED.created_at, + last_seen_at = EXCLUDED.last_seen_at, + expires_at = EXCLUDED.expires_at + """, + rows, + ) + + +def migrate_user_brokers(cur): + brokers = load_json(BACKEND_STORAGE / "user_brokers.json", {}) + if not isinstance(brokers, dict) or not brokers: + return + rows = [] + for user_id, entry in brokers.items(): + if not isinstance(entry, dict): + continue + pending = entry.get("pending") if isinstance(entry.get("pending"), dict) else {} + rows.append( + ( + user_id, + entry.get("broker"), + bool(entry.get("connected")) if entry.get("connected") is not None else False, + entry.get("access_token"), + parse_ts(entry.get("connected_at")), + entry.get("api_key"), + entry.get("user_name"), + entry.get("broker_user_id"), + pending.get("broker"), + pending.get("api_key"), + pending.get("api_secret"), + parse_ts(pending.get("started_at")), + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO user_broker ( + user_id, + broker, + connected, + access_token, + connected_at, + api_key, + user_name, + broker_user_id, + pending_broker, + pending_api_key, + pending_api_secret, + pending_started_at + ) + VALUES %s + ON CONFLICT (user_id) DO UPDATE + SET broker = EXCLUDED.broker, + connected = EXCLUDED.connected, + access_token = EXCLUDED.access_token, + connected_at = EXCLUDED.connected_at, + api_key = EXCLUDED.api_key, + user_name = EXCLUDED.user_name, + broker_user_id = EXCLUDED.broker_user_id, + pending_broker = EXCLUDED.pending_broker, + pending_api_key = EXCLUDED.pending_api_key, + pending_api_secret = EXCLUDED.pending_api_secret, + pending_started_at = EXCLUDED.pending_started_at + """, + rows, + ) + + +def migrate_zerodha(cur): + sessions = load_json(BACKEND_STORAGE / "zerodha_sessions.json", []) + if isinstance(sessions, list): + for item in sessions: + if not isinstance(item, dict): + continue + user_id = item.get("user_id") + linked_at = parse_ts(item.get("linked_at")) + if not user_id or linked_at is None: + continue + api_key = item.get("api_key") + access_token = item.get("access_token") + request_token = item.get("request_token") + user_name = item.get("user_name") + broker_user_id = item.get("broker_user_id") + cur.execute( + """ + INSERT INTO zerodha_session ( + user_id, linked_at, api_key, access_token, request_token, user_name, broker_user_id + ) + SELECT %s, %s, %s, %s, %s, %s, %s + WHERE NOT EXISTS ( + SELECT 1 FROM zerodha_session + WHERE user_id = %s + AND linked_at = %s + AND api_key IS NOT DISTINCT FROM %s + AND access_token IS NOT DISTINCT FROM %s + AND request_token IS NOT DISTINCT FROM %s + AND user_name IS NOT DISTINCT FROM %s + AND broker_user_id IS NOT DISTINCT FROM %s + ) + """, + ( + user_id, + linked_at, + api_key, + access_token, + request_token, + user_name, + broker_user_id, + user_id, + linked_at, + api_key, + access_token, + request_token, + user_name, + broker_user_id, + ), + ) + + tokens = load_json(BACKEND_STORAGE / "zerodha_request_tokens.json", []) + if isinstance(tokens, list) and tokens: + rows = [] + for item in tokens: + if not isinstance(item, dict): + continue + user_id = item.get("user_id") + request_token = item.get("request_token") + if not user_id or request_token is None: + continue + rows.append((user_id, request_token)) + if rows: + execute_values( + cur, + """ + INSERT INTO zerodha_request_token (user_id, request_token) + VALUES %s + ON CONFLICT (user_id) DO UPDATE + SET request_token = EXCLUDED.request_token + """, + rows, + ) + + +def migrate_strategy_config(cur): + cfg = load_json(BACKEND_STORAGE / "strategy_config.json", {}) + if not isinstance(cfg, dict) or not cfg: + return + + sip_frequency = cfg.get("sip_frequency") + sip_frequency_value = None + sip_frequency_unit = None + if isinstance(sip_frequency, dict): + sip_frequency_value = to_int(sip_frequency.get("value")) + sip_frequency_unit = sip_frequency.get("unit") + + frequency = cfg.get("frequency") + frequency_text = normalize_text(frequency) + frequency_days = to_int(cfg.get("frequency_days")) + unit = cfg.get("unit") + next_run = parse_ts(cfg.get("next_run")) + + cur.execute( + """ + INSERT INTO strategy_config ( + id, + strategy, + sip_amount, + sip_frequency_value, + sip_frequency_unit, + mode, + broker, + active, + frequency, + frequency_days, + unit, + next_run + ) + VALUES ( + 1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (id) DO UPDATE + SET strategy = EXCLUDED.strategy, + sip_amount = EXCLUDED.sip_amount, + sip_frequency_value = EXCLUDED.sip_frequency_value, + sip_frequency_unit = EXCLUDED.sip_frequency_unit, + mode = EXCLUDED.mode, + broker = EXCLUDED.broker, + active = EXCLUDED.active, + frequency = EXCLUDED.frequency, + frequency_days = EXCLUDED.frequency_days, + unit = EXCLUDED.unit, + next_run = EXCLUDED.next_run + """, + ( + cfg.get("strategy"), + to_number(cfg.get("sip_amount")), + sip_frequency_value, + sip_frequency_unit, + cfg.get("mode"), + cfg.get("broker"), + cfg.get("active"), + frequency_text, + frequency_days, + unit, + next_run, + ), + ) + + +def migrate_engine_status(cur): + status = load_json(BACKEND_STORAGE / "engine_status.json", {}) + if not isinstance(status, dict) or not status: + return + cur.execute( + """ + INSERT INTO engine_status (id, status, last_updated) + VALUES (1, %s, %s) + ON CONFLICT (id) DO UPDATE + SET status = EXCLUDED.status, + last_updated = EXCLUDED.last_updated + """, + ( + status.get("status"), + parse_ts(status.get("last_updated")), + ), + ) + + +def migrate_strategy_logs(cur): + logs = load_json(BACKEND_STORAGE / "strategy_logs.json", []) + if not isinstance(logs, list) or not logs: + return + rows = [] + for entry in logs: + if not isinstance(entry, dict): + continue + seq = entry.get("seq") + ts = parse_ts(entry.get("ts")) + if seq is None or ts is None: + continue + rows.append( + ( + int(seq), + ts, + entry.get("level"), + entry.get("category"), + entry.get("event"), + entry.get("message"), + entry.get("run_id"), + Json(entry.get("meta")) if isinstance(entry.get("meta"), dict) else None, + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO strategy_log ( + seq, ts, level, category, event, message, run_id, meta + ) + VALUES %s + ON CONFLICT (seq) DO NOTHING + """, + rows, + ) + + +def migrate_engine_log(cur): + log_paths = [ + BACKEND_STORAGE / "engine.log", + BACKEND_LOGS / "engine.log", + ] + for path in log_paths: + if not path.exists(): + continue + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + try: + payload = json.loads(line) + except Exception: + continue + if not isinstance(payload, dict): + continue + ts = parse_ts(payload.get("ts")) + if ts is None: + continue + event = payload.get("event") + data = payload.get("data") + message = payload.get("message") + meta = payload.get("meta") + cur.execute( + """ + INSERT INTO engine_event (ts, event, data, message, meta) + SELECT %s, %s, %s, %s, %s + WHERE NOT EXISTS ( + SELECT 1 FROM engine_event + WHERE ts = %s + AND event IS NOT DISTINCT FROM %s + AND data IS NOT DISTINCT FROM %s + AND message IS NOT DISTINCT FROM %s + AND meta IS NOT DISTINCT FROM %s + ) + """, + ( + ts, + event, + Json(data) if isinstance(data, dict) else Json(data) if data is not None else None, + message, + Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None, + ts, + event, + Json(data) if isinstance(data, dict) else Json(data) if data is not None else None, + message, + Json(meta) if isinstance(meta, dict) else Json(meta) if meta is not None else None, + ), + ) + + +def migrate_engine_state(cur): + state_paper = load_json(ENGINE_STORAGE / "state_paper.json", {}) + if isinstance(state_paper, dict) and state_paper: + sip_freq = state_paper.get("sip_frequency") + sip_value = None + sip_unit = None + if isinstance(sip_freq, dict): + sip_value = to_int(sip_freq.get("value")) + sip_unit = sip_freq.get("unit") + cur.execute( + """ + INSERT INTO engine_state_paper ( + id, + initial_cash, + cash, + total_invested, + nifty_units, + gold_units, + last_sip_ts, + last_run, + sip_frequency_value, + sip_frequency_unit + ) + VALUES (1, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE + SET initial_cash = EXCLUDED.initial_cash, + cash = EXCLUDED.cash, + total_invested = EXCLUDED.total_invested, + nifty_units = EXCLUDED.nifty_units, + gold_units = EXCLUDED.gold_units, + last_sip_ts = EXCLUDED.last_sip_ts, + last_run = EXCLUDED.last_run, + sip_frequency_value = EXCLUDED.sip_frequency_value, + sip_frequency_unit = EXCLUDED.sip_frequency_unit + """, + ( + to_number(state_paper.get("initial_cash")), + to_number(state_paper.get("cash")), + to_number(state_paper.get("total_invested")), + to_number(state_paper.get("nifty_units")), + to_number(state_paper.get("gold_units")), + parse_ts(state_paper.get("last_sip_ts")), + parse_ts(state_paper.get("last_run")), + sip_value, + sip_unit, + ), + ) + + state = load_json(ENGINE_STORAGE / "state.json", {}) + if isinstance(state, dict) and state: + cur.execute( + """ + INSERT INTO engine_state ( + id, + total_invested, + nifty_units, + gold_units, + last_sip_ts, + last_run + ) + VALUES (1, %s, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE + SET total_invested = EXCLUDED.total_invested, + nifty_units = EXCLUDED.nifty_units, + gold_units = EXCLUDED.gold_units, + last_sip_ts = EXCLUDED.last_sip_ts, + last_run = EXCLUDED.last_run + """, + ( + to_number(state.get("total_invested")), + to_number(state.get("nifty_units")), + to_number(state.get("gold_units")), + parse_ts(state.get("last_sip_ts")), + parse_ts(state.get("last_run")), + ), + ) + + +def migrate_paper_broker(cur): + broker = load_json(ENGINE_STORAGE / "paper_broker.json", {}) + if not isinstance(broker, dict) or not broker: + return + + cash = to_number(broker.get("cash")) + if cash is not None: + cur.execute( + """ + INSERT INTO paper_broker_account (id, cash) + VALUES (1, %s) + ON CONFLICT (id) DO UPDATE + SET cash = EXCLUDED.cash + """, + (cash,), + ) + + now_utc = datetime.now(timezone.utc) + positions = broker.get("positions") + if isinstance(positions, dict) and positions: + rows = [] + for symbol, data in positions.items(): + if not isinstance(data, dict): + continue + rows.append( + ( + symbol, + to_number(data.get("qty")), + to_number(data.get("avg_price")), + to_number(data.get("last_price")), + now_utc, + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO paper_position ( + symbol, qty, avg_price, last_price, updated_at + ) + VALUES %s + ON CONFLICT (symbol) DO UPDATE + SET qty = EXCLUDED.qty, + avg_price = EXCLUDED.avg_price, + last_price = EXCLUDED.last_price, + updated_at = EXCLUDED.updated_at + """, + rows, + ) + + orders = broker.get("orders") + if isinstance(orders, list) and orders: + rows = [] + for order in orders: + if not isinstance(order, dict): + continue + order_id = order.get("id") + if not order_id: + continue + rows.append( + ( + order_id, + order.get("symbol"), + order.get("side"), + to_number(order.get("qty")), + to_number(order.get("price")), + order.get("status"), + parse_ts(order.get("timestamp")), + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO paper_order ( + id, symbol, side, qty, price, status, timestamp + ) + VALUES %s + ON CONFLICT (id) DO UPDATE + SET symbol = EXCLUDED.symbol, + side = EXCLUDED.side, + qty = EXCLUDED.qty, + price = EXCLUDED.price, + status = EXCLUDED.status, + timestamp = EXCLUDED.timestamp + """, + rows, + ) + + trades = broker.get("trades") + if isinstance(trades, list) and trades: + rows = [] + for trade in trades: + if not isinstance(trade, dict): + continue + trade_id = trade.get("id") + if not trade_id: + continue + rows.append( + ( + trade_id, + trade.get("order_id"), + trade.get("symbol"), + trade.get("side"), + to_number(trade.get("qty")), + to_number(trade.get("price")), + parse_ts(trade.get("timestamp")), + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO paper_trade ( + id, order_id, symbol, side, qty, price, timestamp + ) + VALUES %s + ON CONFLICT (id) DO UPDATE + SET order_id = EXCLUDED.order_id, + symbol = EXCLUDED.symbol, + side = EXCLUDED.side, + qty = EXCLUDED.qty, + price = EXCLUDED.price, + timestamp = EXCLUDED.timestamp + """, + rows, + ) + + equity_curve = broker.get("equity_curve") + if isinstance(equity_curve, list) and equity_curve: + rows = [] + for point in equity_curve: + if not isinstance(point, dict): + continue + ts = parse_ts(point.get("timestamp")) + if ts is None: + continue + rows.append( + ( + ts, + to_number(point.get("equity")), + to_number(point.get("pnl")), + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO paper_equity_curve (timestamp, equity, pnl) + VALUES %s + ON CONFLICT (timestamp) DO UPDATE + SET equity = EXCLUDED.equity, + pnl = EXCLUDED.pnl + """, + rows, + ) + + +def migrate_mtm_ledger(cur): + path = ENGINE_STORAGE / "mtm_ledger.csv" + if not path.exists(): + return + df = pd.read_csv(path) + if df.empty: + return + rows = [] + for _, row in df.iterrows(): + ts = parse_ts(row.get("timestamp")) + if ts is None: + continue + rows.append( + ( + ts, + to_number(row.get("nifty_units")), + to_number(row.get("gold_units")), + to_number(row.get("nifty_price")), + to_number(row.get("gold_price")), + to_number(row.get("nifty_value")), + to_number(row.get("gold_value")), + to_number(row.get("portfolio_value")), + to_number(row.get("total_invested")), + to_number(row.get("pnl")), + ) + ) + if rows: + execute_values( + cur, + """ + INSERT INTO mtm_ledger ( + timestamp, + nifty_units, + gold_units, + nifty_price, + gold_price, + nifty_value, + gold_value, + portfolio_value, + total_invested, + pnl + ) + VALUES %s + ON CONFLICT (timestamp) DO NOTHING + """, + rows, + ) + + +def migrate_event_ledger(cur): + path = ENGINE_STORAGE / "ledger.csv" + if not path.exists(): + return + df = pd.read_csv(path) + if df.empty: + return + for _, row in df.iterrows(): + ts = parse_ts(row.get("timestamp")) + if ts is None: + continue + event = row.get("event") + cur.execute( + """ + INSERT INTO event_ledger ( + timestamp, event, nifty_units, gold_units, nifty_price, gold_price, amount + ) + SELECT %s, %s, %s, %s, %s, %s, %s + WHERE NOT EXISTS ( + SELECT 1 FROM event_ledger + WHERE timestamp = %s + AND event IS NOT DISTINCT FROM %s + AND nifty_units IS NOT DISTINCT FROM %s + AND gold_units IS NOT DISTINCT FROM %s + AND nifty_price IS NOT DISTINCT FROM %s + AND gold_price IS NOT DISTINCT FROM %s + AND amount IS NOT DISTINCT FROM %s + ) + """, + ( + ts, + event, + to_number(row.get("nifty_units")), + to_number(row.get("gold_units")), + to_number(row.get("nifty_price")), + to_number(row.get("gold_price")), + to_number(row.get("amount")), + ts, + event, + to_number(row.get("nifty_units")), + to_number(row.get("gold_units")), + to_number(row.get("nifty_price")), + to_number(row.get("gold_price")), + to_number(row.get("amount")), + ), + ) + + +def migrate_market_history(cur): + history_paths = set() + for base in (ENGINE_HISTORY, ENGINE_STORAGE / "history", ROOT_STORAGE / "history"): + if base.exists(): + history_paths.update(base.glob("*.csv")) + if not history_paths: + return + + rows = [] + for path in sorted(history_paths): + symbol = path.stem + try: + df = pd.read_csv(path) + except Exception: + continue + if df.empty or "Date" not in df.columns or "Close" not in df.columns: + continue + for _, row in df.iterrows(): + dt = parse_date(row.get("Date")) + close = to_number(row.get("Close")) + if dt is None or close is None: + continue + rows.append((symbol, dt, close)) + + if rows: + execute_values( + cur, + """ + INSERT INTO market_close (symbol, date, close) + VALUES %s + ON CONFLICT (symbol, date) DO NOTHING + """, + rows, + ) + + +def print_counts(cur): + tables = [ + "app_user", + "app_session", + "user_broker", + "zerodha_session", + "zerodha_request_token", + "strategy_config", + "engine_status", + "strategy_log", + "engine_event", + "engine_state_paper", + "engine_state", + "paper_broker_account", + "paper_position", + "paper_order", + "paper_trade", + "paper_equity_curve", + "mtm_ledger", + "event_ledger", + "market_close", + ] + for table in tables: + cur.execute(f"SELECT COUNT(*) FROM {table}") + count = cur.fetchone()[0] + print(f"{table}: {count}") + + +def main(): + conn = psycopg2.connect(**DB_CONFIG) + try: + run_domain(conn, "users_sessions", migrate_users_sessions) + run_domain(conn, "user_brokers", migrate_user_brokers) + run_domain(conn, "zerodha", migrate_zerodha) + run_domain(conn, "strategy_config", migrate_strategy_config) + run_domain(conn, "engine_status", migrate_engine_status) + run_domain(conn, "strategy_logs", migrate_strategy_logs) + run_domain(conn, "engine_log", migrate_engine_log) + run_domain(conn, "engine_state", migrate_engine_state) + run_domain(conn, "paper_broker", migrate_paper_broker) + run_domain(conn, "mtm_ledger", migrate_mtm_ledger) + run_domain(conn, "event_ledger", migrate_event_ledger) + run_domain(conn, "market_history", migrate_market_history) + + with conn: + with conn.cursor() as cur: + print_counts(cur) + finally: + conn.close() + + +if __name__ == "__main__": + main() diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..01f0a5c --- /dev/null +++ b/schema.sql @@ -0,0 +1,290 @@ +-- ========================================= +-- Extensions (optional but useful) +-- ========================================= +CREATE EXTENSION IF NOT EXISTS pgcrypto; -- for gen_random_uuid() if you want it later + +-- ========================================= +-- 1) Identity & Sessions +-- ========================================= +CREATE TABLE IF NOT EXISTS app_user ( + id TEXT PRIMARY KEY, + username TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS app_session ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL, + last_seen_at TIMESTAMPTZ, + expires_at TIMESTAMPTZ NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_app_session_user_id ON app_session(user_id); +CREATE INDEX IF NOT EXISTS idx_app_session_expires_at ON app_session(expires_at); + +-- ========================================= +-- 2) Broker links + Zerodha auth artifacts +-- Mirrors: user_brokers.json, zerodha_sessions.json, zerodha_request_tokens.json +-- ========================================= + +-- Current connected broker record per user (plus "pending" fields) +CREATE TABLE IF NOT EXISTS user_broker ( + user_id TEXT PRIMARY KEY REFERENCES app_user(id) ON DELETE CASCADE, + broker TEXT, + connected BOOLEAN NOT NULL DEFAULT FALSE, + + access_token TEXT, + connected_at TIMESTAMPTZ, + + api_key TEXT, + user_name TEXT, + broker_user_id TEXT, + + -- pending fields (as you described) + pending_broker TEXT, + pending_api_key TEXT, + pending_api_secret TEXT, + pending_started_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_user_broker_broker ON user_broker(broker); +CREATE INDEX IF NOT EXISTS idx_user_broker_connected ON user_broker(connected); + +-- Zerodha session history (can be multiple per user) +CREATE TABLE IF NOT EXISTS zerodha_session ( + id BIGSERIAL PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES app_user(id) ON DELETE CASCADE, + linked_at TIMESTAMPTZ NOT NULL, + + api_key TEXT, + access_token TEXT, + request_token TEXT, + + user_name TEXT, + broker_user_id TEXT +); + +CREATE INDEX IF NOT EXISTS idx_zerodha_session_user_id ON zerodha_session(user_id); +CREATE INDEX IF NOT EXISTS idx_zerodha_session_linked_at ON zerodha_session(linked_at); + +-- Latest/active request token per user (mirrors zerodha_request_tokens.json) +CREATE TABLE IF NOT EXISTS zerodha_request_token ( + user_id TEXT PRIMARY KEY REFERENCES app_user(id) ON DELETE CASCADE, + request_token TEXT NOT NULL +); + +-- ========================================= +-- 3) Strategy config + engine status +-- Mirrors: strategy_config.json, engine_status.json +-- ========================================= + +-- Keep it simple: single-row table (config "current") +CREATE TABLE IF NOT EXISTS strategy_config ( + id SMALLINT PRIMARY KEY DEFAULT 1, + strategy TEXT, + sip_amount NUMERIC, + + sip_frequency_value INTEGER, + sip_frequency_unit TEXT, + + mode TEXT, -- paper/live + broker TEXT, + + -- legacy/optional fields + active BOOLEAN, + frequency TEXT, + frequency_days INTEGER, + unit TEXT, + next_run TIMESTAMPTZ +); + +-- single-row engine status +CREATE TABLE IF NOT EXISTS engine_status ( + id SMALLINT PRIMARY KEY DEFAULT 1, + status TEXT NOT NULL, + last_updated TIMESTAMPTZ NOT NULL +); + +-- ========================================= +-- 4) Logs / event streams +-- Mirrors: strategy_logs.json, engine.log (JSONL) +-- ========================================= + +-- strategy_logs.json +CREATE TABLE IF NOT EXISTS strategy_log ( + seq BIGINT PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL, + level TEXT, + category TEXT, + event TEXT, + message TEXT, + run_id TEXT, + meta JSONB +); + +CREATE INDEX IF NOT EXISTS idx_strategy_log_ts ON strategy_log(ts); +CREATE INDEX IF NOT EXISTS idx_strategy_log_run_id ON strategy_log(run_id); +CREATE INDEX IF NOT EXISTS idx_strategy_log_event ON strategy_log(event); + +-- engine.log JSONL +CREATE TABLE IF NOT EXISTS engine_event ( + id BIGSERIAL PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL, + event TEXT, + data JSONB, + message TEXT, + meta JSONB +); + +CREATE INDEX IF NOT EXISTS idx_engine_event_ts ON engine_event(ts); +CREATE INDEX IF NOT EXISTS idx_engine_event_event ON engine_event(event); + +-- ========================================= +-- 5) Engine state (paper + derived) +-- Mirrors: state_paper.json, state.json +-- ========================================= + +-- state_paper.json +CREATE TABLE IF NOT EXISTS engine_state_paper ( + id SMALLINT PRIMARY KEY DEFAULT 1, + + initial_cash NUMERIC, + cash NUMERIC, + total_invested NUMERIC, + + nifty_units NUMERIC, + gold_units NUMERIC, + + last_sip_ts TIMESTAMPTZ, + last_run TIMESTAMPTZ, + + sip_frequency_value INTEGER, + sip_frequency_unit TEXT +); + +-- state.json (lighter) +CREATE TABLE IF NOT EXISTS engine_state ( + id SMALLINT PRIMARY KEY DEFAULT 1, + + total_invested NUMERIC, + nifty_units NUMERIC, + gold_units NUMERIC, + + last_sip_ts TIMESTAMPTZ, + last_run TIMESTAMPTZ +); + +-- ========================================= +-- 6) Paper broker (positions, orders, trades, equity curve) +-- Mirrors: paper_broker.json +-- ========================================= + +-- overall cash snapshot (paper_broker.json top-level cash) +CREATE TABLE IF NOT EXISTS paper_broker_account ( + id SMALLINT PRIMARY KEY DEFAULT 1, + cash NUMERIC NOT NULL +); + +-- positions map: symbol -> qty, avg_price, last_price +CREATE TABLE IF NOT EXISTS paper_position ( + symbol TEXT PRIMARY KEY, + qty NUMERIC NOT NULL, + avg_price NUMERIC, + last_price NUMERIC, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- orders list +CREATE TABLE IF NOT EXISTS paper_order ( + id TEXT PRIMARY KEY, + symbol TEXT NOT NULL, + side TEXT NOT NULL, -- buy/sell + qty NUMERIC NOT NULL, + price NUMERIC, + status TEXT NOT NULL, -- new/filled/cancelled/rejected etc + timestamp TIMESTAMPTZ NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_paper_order_ts ON paper_order(timestamp); +CREATE INDEX IF NOT EXISTS idx_paper_order_symbol ON paper_order(symbol); +CREATE INDEX IF NOT EXISTS idx_paper_order_status ON paper_order(status); + +-- trades list +CREATE TABLE IF NOT EXISTS paper_trade ( + id TEXT PRIMARY KEY, + order_id TEXT REFERENCES paper_order(id) ON DELETE SET NULL, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + qty NUMERIC NOT NULL, + price NUMERIC NOT NULL, + timestamp TIMESTAMPTZ NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_paper_trade_ts ON paper_trade(timestamp); +CREATE INDEX IF NOT EXISTS idx_paper_trade_symbol ON paper_trade(symbol); + +-- equity_curve list +CREATE TABLE IF NOT EXISTS paper_equity_curve ( + timestamp TIMESTAMPTZ PRIMARY KEY, + equity NUMERIC NOT NULL, + pnl NUMERIC +); + +CREATE INDEX IF NOT EXISTS idx_paper_equity_curve_ts ON paper_equity_curve(timestamp); + +-- ========================================= +-- 7) MTM ledger + event ledger +-- Mirrors: mtm_ledger.csv, ledger.csv +-- ========================================= + +CREATE TABLE IF NOT EXISTS mtm_ledger ( + timestamp TIMESTAMPTZ PRIMARY KEY, + + nifty_units NUMERIC, + gold_units NUMERIC, + + nifty_price NUMERIC, + gold_price NUMERIC, + + nifty_value NUMERIC, + gold_value NUMERIC, + + portfolio_value NUMERIC, + total_invested NUMERIC, + pnl NUMERIC +); + +CREATE INDEX IF NOT EXISTS idx_mtm_ledger_ts ON mtm_ledger(timestamp); + +CREATE TABLE IF NOT EXISTS event_ledger ( + id BIGSERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL, + event TEXT NOT NULL, + + nifty_units NUMERIC, + gold_units NUMERIC, + + nifty_price NUMERIC, + gold_price NUMERIC, + + amount NUMERIC +); + +CREATE INDEX IF NOT EXISTS idx_event_ledger_ts ON event_ledger(timestamp); +CREATE INDEX IF NOT EXISTS idx_event_ledger_event ON event_ledger(event); + +-- ========================================= +-- 8) Market data cache (History Cache CSVs) +-- Mirrors: SYMBOL.csv {Date, Close} +-- ========================================= + +CREATE TABLE IF NOT EXISTS market_close ( + symbol TEXT NOT NULL, + date DATE NOT NULL, + close NUMERIC NOT NULL, + PRIMARY KEY (symbol, date) +); + +CREATE INDEX IF NOT EXISTS idx_market_close_symbol ON market_close(symbol); +CREATE INDEX IF NOT EXISTS idx_market_close_date ON market_close(date);