import os from app.services.auth_service import create_user, get_user_by_username from app.services.db import db_connection VALID_ROLES = {"USER", "ADMIN", "SUPER_ADMIN"} def _sync_legacy_flags(cur, user_id: str, role: str): cur.execute( """ UPDATE app_user SET is_admin = %s, is_super_admin = %s WHERE id = %s """, (role in ("ADMIN", "SUPER_ADMIN"), role == "SUPER_ADMIN", user_id), ) def set_user_role(actor_id: str, target_id: str, new_role: str): if new_role not in VALID_ROLES: return {"error": "invalid_role"} with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( "SELECT role FROM app_user WHERE id = %s", (target_id,), ) row = cur.fetchone() if not row: return None old_role = row[0] if actor_id == target_id and old_role == "SUPER_ADMIN" and new_role != "SUPER_ADMIN": return {"error": "cannot_demote_self"} if old_role == new_role: return { "user_id": target_id, "old_role": old_role, "new_role": new_role, } cur.execute( """ UPDATE app_user SET role = %s WHERE id = %s """, (new_role, target_id), ) _sync_legacy_flags(cur, target_id, new_role) cur.execute( """ INSERT INTO admin_role_audit (actor_user_id, target_user_id, old_role, new_role) VALUES (%s, %s, %s, %s) """, (actor_id, target_id, old_role, new_role), ) return { "user_id": target_id, "old_role": old_role, "new_role": new_role, } def bootstrap_super_admin(): email = (os.getenv("SUPER_ADMIN_EMAIL") or "").strip() if not email: return existing = get_user_by_username(email) if existing: with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ UPDATE app_user SET role = 'SUPER_ADMIN' WHERE id = %s """, (existing["id"],), ) _sync_legacy_flags(cur, existing["id"], "SUPER_ADMIN") return password = (os.getenv("SUPER_ADMIN_PASSWORD") or "").strip() if not password: raise RuntimeError("SUPER_ADMIN_PASSWORD must be set to bootstrap SUPER_ADMIN") user = create_user(email, password) if not user: return with db_connection() as conn: with conn: with conn.cursor() as cur: cur.execute( """ UPDATE app_user SET role = 'SUPER_ADMIN' WHERE id = %s """, (user["id"],), ) _sync_legacy_flags(cur, user["id"], "SUPER_ADMIN")