2026-02-01 13:57:30 +00:00

297 lines
9.6 KiB
Python

from datetime import datetime, timezone
from app.services.crypto_service import decrypt_value, encrypt_value
from app.services.db import db_transaction
def _row_to_entry(row):
(
user_id,
broker,
connected,
access_token,
connected_at,
api_key,
api_secret,
user_name,
broker_user_id,
auth_state,
pending_broker,
pending_api_key,
pending_api_secret,
pending_started_at,
) = row
entry = {
"broker": broker,
"connected": bool(connected),
"connected_at": connected_at,
"api_key": api_key,
"auth_state": auth_state,
"user_name": user_name,
"broker_user_id": broker_user_id,
}
if pending_broker or pending_api_key or pending_api_secret or pending_started_at:
pending = {
"broker": pending_broker,
"api_key": pending_api_key,
"api_secret": decrypt_value(pending_api_secret)
if pending_api_secret
else None,
"started_at": pending_started_at,
}
entry["pending"] = pending
return entry
def load_user_brokers():
with db_transaction() as cur:
cur.execute(
"""
SELECT user_id, broker, connected, access_token, connected_at,
api_key, api_secret, user_name, broker_user_id, auth_state,
pending_broker, pending_api_key, pending_api_secret, pending_started_at
FROM user_broker
"""
)
rows = cur.fetchall()
return {row[0]: _row_to_entry(row) for row in rows}
def save_user_brokers(data):
with db_transaction() as cur:
for user_id, entry in data.items():
cur.execute(
"""
INSERT INTO user_broker (
user_id, broker, connected, access_token, connected_at,
api_key, api_secret, user_name, broker_user_id, auth_state,
pending_broker, pending_api_key, pending_api_secret, pending_started_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id)
DO UPDATE SET
broker = EXCLUDED.broker,
connected = EXCLUDED.connected,
access_token = EXCLUDED.access_token,
connected_at = EXCLUDED.connected_at,
api_key = EXCLUDED.api_key,
api_secret = EXCLUDED.api_secret,
user_name = EXCLUDED.user_name,
broker_user_id = EXCLUDED.broker_user_id,
auth_state = EXCLUDED.auth_state,
pending_broker = EXCLUDED.pending_broker,
pending_api_key = EXCLUDED.pending_api_key,
pending_api_secret = EXCLUDED.pending_api_secret,
pending_started_at = EXCLUDED.pending_started_at
""",
(
user_id,
entry.get("broker"),
bool(entry.get("connected")),
encrypt_value(entry.get("access_token"))
if entry.get("access_token")
else None,
entry.get("connected_at"),
entry.get("api_key"),
encrypt_value(entry.get("api_secret"))
if entry.get("api_secret")
else None,
entry.get("user_name"),
entry.get("broker_user_id"),
entry.get("auth_state"),
(entry.get("pending") or {}).get("broker"),
(entry.get("pending") or {}).get("api_key"),
encrypt_value((entry.get("pending") or {}).get("api_secret"))
if (entry.get("pending") or {}).get("api_secret")
else None,
(entry.get("pending") or {}).get("started_at"),
),
)
def now_utc():
return datetime.now(timezone.utc)
def get_user_broker(user_id: str):
with db_transaction() as cur:
cur.execute(
"""
SELECT user_id, broker, connected, access_token, connected_at,
api_key, api_secret, user_name, broker_user_id, auth_state,
pending_broker, pending_api_key, pending_api_secret, pending_started_at
FROM user_broker
WHERE user_id = %s
""",
(user_id,),
)
row = cur.fetchone()
if not row:
return None
return _row_to_entry(row)
def clear_user_broker(user_id: str):
with db_transaction() as cur:
cur.execute("DELETE FROM user_broker WHERE user_id = %s", (user_id,))
def set_pending_broker(user_id: str, broker: str, api_key: str, api_secret: str):
started_at = now_utc()
with db_transaction() as cur:
cur.execute(
"""
INSERT INTO user_broker (
user_id, pending_broker, pending_api_key, pending_api_secret, pending_started_at,
api_key, api_secret, auth_state
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id)
DO UPDATE SET
pending_broker = EXCLUDED.pending_broker,
pending_api_key = EXCLUDED.pending_api_key,
pending_api_secret = EXCLUDED.pending_api_secret,
pending_started_at = EXCLUDED.pending_started_at,
api_key = EXCLUDED.api_key,
api_secret = EXCLUDED.api_secret,
auth_state = EXCLUDED.auth_state
""",
(
user_id,
broker,
api_key,
encrypt_value(api_secret),
started_at,
api_key,
encrypt_value(api_secret),
"PENDING",
),
)
return {
"broker": broker,
"api_key": api_key,
"api_secret": api_secret,
"started_at": started_at,
}
def get_pending_broker(user_id: str):
with db_transaction() as cur:
cur.execute(
"""
SELECT pending_broker, pending_api_key, pending_api_secret, pending_started_at
FROM user_broker
WHERE user_id = %s
""",
(user_id,),
)
row = cur.fetchone()
if not row:
return None
if not row[0] or not row[1] or not row[2]:
return None
return {
"broker": row[0],
"api_key": row[1],
"api_secret": decrypt_value(row[2]),
"started_at": row[3],
}
def get_broker_credentials(user_id: str):
with db_transaction() as cur:
cur.execute(
"""
SELECT api_key, api_secret, pending_api_key, pending_api_secret
FROM user_broker
WHERE user_id = %s
""",
(user_id,),
)
row = cur.fetchone()
if not row:
return None
api_key, api_secret, pending_key, pending_secret = row
key = api_key or pending_key
secret = api_secret or pending_secret
if not key or not secret:
return None
return {
"api_key": key,
"api_secret": decrypt_value(secret),
}
def set_broker_auth_state(user_id: str, auth_state: str):
with db_transaction() as cur:
cur.execute(
"""
UPDATE user_broker
SET auth_state = %s
WHERE user_id = %s
""",
(auth_state, user_id),
)
def set_connected_broker(
user_id: str,
broker: str,
access_token: str,
api_key: str | None = None,
api_secret: str | None = None,
user_name: str | None = None,
broker_user_id: str | None = None,
auth_state: str | None = None,
):
connected_at = now_utc()
with db_transaction() as cur:
cur.execute(
"""
INSERT INTO user_broker (
user_id, broker, connected, access_token, connected_at,
api_key, api_secret, user_name, broker_user_id, auth_state,
pending_broker, pending_api_key, pending_api_secret, pending_started_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NULL, NULL, NULL, NULL)
ON CONFLICT (user_id)
DO UPDATE SET
broker = EXCLUDED.broker,
connected = EXCLUDED.connected,
access_token = EXCLUDED.access_token,
connected_at = EXCLUDED.connected_at,
api_key = EXCLUDED.api_key,
api_secret = EXCLUDED.api_secret,
user_name = EXCLUDED.user_name,
broker_user_id = EXCLUDED.broker_user_id,
auth_state = EXCLUDED.auth_state,
pending_broker = NULL,
pending_api_key = NULL,
pending_api_secret = NULL,
pending_started_at = NULL
""",
(
user_id,
broker,
True,
encrypt_value(access_token),
connected_at,
api_key,
encrypt_value(api_secret) if api_secret else None,
user_name,
broker_user_id,
auth_state,
),
)
return {
"broker": broker,
"connected": True,
"access_token": access_token,
"connected_at": connected_at,
"api_key": api_key,
"api_secret": api_secret,
"user_name": user_name,
"broker_user_id": broker_user_id,
"auth_state": auth_state,
}