SIP_GoldBees_Backend/app/admin_auth.py
2026-02-01 13:06:44 +00:00

72 lines
2.3 KiB
Python

from fastapi import HTTPException, Request
from app.services.auth_service import get_user_for_session
from app.services.db import db_connection
SESSION_COOKIE_NAME = "session_id"
def _resolve_role(row) -> str:
role = row[2]
if role:
return role
if row[4]:
return "SUPER_ADMIN"
if row[3]:
return "ADMIN"
return "USER"
def require_admin(request: Request):
session_id = request.cookies.get(SESSION_COOKIE_NAME)
if not session_id:
raise HTTPException(status_code=401, detail="Not authenticated")
user = get_user_for_session(session_id)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
with db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, username, role, is_admin, is_super_admin FROM app_user WHERE id = %s",
(user["id"],),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=403, detail="Admin access required")
role = _resolve_role(row)
if role not in ("ADMIN", "SUPER_ADMIN"):
raise HTTPException(status_code=403, detail="Admin access required")
return {
"id": row[0],
"username": row[1],
"role": role,
}
def require_super_admin(request: Request):
session_id = request.cookies.get(SESSION_COOKIE_NAME)
if not session_id:
raise HTTPException(status_code=401, detail="Not authenticated")
user = get_user_for_session(session_id)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
with db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, username, role, is_admin, is_super_admin FROM app_user WHERE id = %s",
(user["id"],),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=403, detail="Super admin access required")
role = _resolve_role(row)
if role != "SUPER_ADMIN":
raise HTTPException(status_code=403, detail="Super admin access required")
return {
"id": row[0],
"username": row[1],
"role": role,
}