SIP_GoldBees_Backend/backend/app/admin_role_service.py
2026-02-01 13:57:30 +00:00

110 lines
3.4 KiB
Python

import os
from app.services.auth_service import create_user, get_user_by_username
from app.services.db import db_connection
VALID_ROLES = {"USER", "ADMIN", "SUPER_ADMIN"}
def _sync_legacy_flags(cur, user_id: str, role: str):
cur.execute(
"""
UPDATE app_user
SET is_admin = %s, is_super_admin = %s
WHERE id = %s
""",
(role in ("ADMIN", "SUPER_ADMIN"), role == "SUPER_ADMIN", user_id),
)
def set_user_role(actor_id: str, target_id: str, new_role: str):
if new_role not in VALID_ROLES:
return {"error": "invalid_role"}
with db_connection() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT role FROM app_user WHERE id = %s",
(target_id,),
)
row = cur.fetchone()
if not row:
return None
old_role = row[0]
if actor_id == target_id and old_role == "SUPER_ADMIN" and new_role != "SUPER_ADMIN":
return {"error": "cannot_demote_self"}
if old_role == new_role:
return {
"user_id": target_id,
"old_role": old_role,
"new_role": new_role,
}
cur.execute(
"""
UPDATE app_user
SET role = %s
WHERE id = %s
""",
(new_role, target_id),
)
_sync_legacy_flags(cur, target_id, new_role)
cur.execute(
"""
INSERT INTO admin_role_audit
(actor_user_id, target_user_id, old_role, new_role)
VALUES (%s, %s, %s, %s)
""",
(actor_id, target_id, old_role, new_role),
)
return {
"user_id": target_id,
"old_role": old_role,
"new_role": new_role,
}
def bootstrap_super_admin():
email = (os.getenv("SUPER_ADMIN_EMAIL") or "").strip()
if not email:
return
existing = get_user_by_username(email)
if existing:
with db_connection() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE app_user
SET role = 'SUPER_ADMIN'
WHERE id = %s
""",
(existing["id"],),
)
_sync_legacy_flags(cur, existing["id"], "SUPER_ADMIN")
return
password = (os.getenv("SUPER_ADMIN_PASSWORD") or "").strip()
if not password:
raise RuntimeError("SUPER_ADMIN_PASSWORD must be set to bootstrap SUPER_ADMIN")
user = create_user(email, password)
if not user:
return
with db_connection() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE app_user
SET role = 'SUPER_ADMIN'
WHERE id = %s
""",
(user["id"],),
)
_sync_legacy_flags(cur, user["id"], "SUPER_ADMIN")