import hashlib import secrets from datetime import datetime, timedelta, timezone from uuid import uuid4 from app.services.db import db_connection CALLBACK_STATE_TTL_SECONDS = 15 * 60 def _now_utc() -> datetime: return datetime.now(timezone.utc) def _state_hash(state: str) -> str: return hashlib.sha256(state.encode("utf-8")).hexdigest() def create_broker_callback_state( *, user_id: str, session_id: str, broker: str, flow: str, ttl_seconds: int = CALLBACK_STATE_TTL_SECONDS, ) -> str: state = secrets.token_urlsafe(32) now = _now_utc() expires_at = now + timedelta(seconds=ttl_seconds) state_hash = _state_hash(state) with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ DELETE FROM broker_callback_state WHERE expires_at <= %s OR consumed_at IS NOT NULL """, (now,), ) cur.execute( """ INSERT INTO broker_callback_state ( id, state_hash, user_id, session_id, broker, flow, created_at, expires_at, consumed_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NULL) """, ( str(uuid4()), state_hash, user_id, session_id, broker.strip().upper(), flow.strip().lower(), now, expires_at, ), ) return state def consume_broker_callback_state( *, state: str, user_id: str, session_id: str, broker: str, flow: str, ): if not state: return None now = _now_utc() state_hash = _state_hash(state) with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ UPDATE broker_callback_state SET consumed_at = %s WHERE state_hash = %s AND user_id = %s AND session_id = %s AND broker = %s AND flow = %s AND consumed_at IS NULL AND expires_at > %s RETURNING id, expires_at """, ( now, state_hash, user_id, session_id, broker.strip().upper(), flow.strip().lower(), now, ), ) row = cur.fetchone() if not row: return None return {"id": row[0], "expires_at": row[1].isoformat() if row[1] else None}