import os import smtplib import ssl import threading from email.message import EmailMessage SMTP_TIMEOUT_SECONDS = float((os.getenv("SMTP_TIMEOUT_SECONDS") or "5").strip()) def send_email(to_email: str, subject: str, body_text: str) -> bool: smtp_user = (os.getenv("SMTP_USER") or "").strip() smtp_pass = (os.getenv("SMTP_PASS") or "").replace(" ", "").strip() smtp_host = (os.getenv("SMTP_HOST") or "smtp.gmail.com").strip() smtp_port = int((os.getenv("SMTP_PORT") or "587").strip()) from_name = (os.getenv("SMTP_FROM_NAME") or "Quantfortune Support").strip() if not smtp_user or not smtp_pass: return False msg = EmailMessage() msg["From"] = f"{from_name} <{smtp_user}>" msg["To"] = to_email msg["Subject"] = subject msg.set_content(body_text) context = ssl.create_default_context() with smtplib.SMTP(smtp_host, smtp_port, timeout=SMTP_TIMEOUT_SECONDS) as server: server.starttls(context=context) server.login(smtp_user, smtp_pass) server.send_message(msg) return True def send_email_async(to_email: str, subject: str, body_text: str) -> bool: def _worker(): try: send_email(to_email, subject, body_text) except Exception: pass thread = threading.Thread(target=_worker, daemon=True) thread.start() return True