Hestia-Nginx-Reverse-Proxy/ReverseProxy/fastapi_nginx_gen copy.py
2026-04-13 16:42:01 +00:00

436 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
FastAPI-based Nginx reverse-proxy generator.
POST /api/nginx/app
Body:
{
"domain": "ebayapp.data4autos.com",
"port": 3011,
"ip": "147.93.40.215", # optional (defaults to STATIC_IP)
"dryRun": false, # optional
"skipReload": false # optional
}
Auth (required unless DISABLE_AUTH=true):
send header: X-API-Key: <API_KEY>
Run as root (recommended) or configure sudoers for nginx/systemctl.
"""
import os
import re
import time
import pathlib
import subprocess
from typing import Optional, Dict, Any
from fastapi import FastAPI, Body, HTTPException, Header, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, field_validator
# ---------------------------- Config ----------------------------
STATIC_IP = os.getenv("STATIC_IP", "147.93.40.215")
PARENT_PATH = os.getenv("PARENT_PATH", "/home/user/conf/web") # <parent>/<domain>/
PUBLIC_WEB_ROOT = os.getenv("PUBLIC_WEB_ROOT", "/home/user/web") # for /error alias
ERROR_LOG_ROOT = os.getenv("ERROR_LOG_ROOT", "/var/log/apache2/domains")
# Binaries (absolute paths are safer under systemd)
SUDO_BIN = os.getenv("SUDO_BIN", "/usr/bin/sudo")
NGINX_BIN = os.getenv("NGINX_BIN", "/usr/sbin/nginx")
SYSTEMCTL_BIN = os.getenv("SYSTEMCTL_BIN", "/bin/systemctl")
# Simple header auth
API_KEY = os.getenv("API_KEY", "")
DISABLE_AUTH = os.getenv("DISABLE_AUTH", "false").lower() in ("1", "true", "yes")
# Safety: bound interface/port for uvicorn (when you use the run command below)
BIND_HOST = os.getenv("BIND_HOST", "127.0.0.1") # expose via reverse proxy if you like
BIND_PORT = int(os.getenv("BIND_PORT", "5055"))
# ------------------------- App bootstrap ------------------------
app = FastAPI(title="Nginx Reverse Proxy Generator (FastAPI)")
# ------------------------- Validators ---------------------------
_ipv4_re = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$")
_domain_re = re.compile(r"^(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$")
def is_valid_ipv4(ip: str) -> bool:
if not _ipv4_re.match(ip):
return False
parts = ip.split(".")
try:
return all(0 <= int(p) <= 255 and str(int(p)) == p for p in parts)
except Exception:
return False
def is_valid_domain(d: str) -> bool:
return bool(_domain_re.match(d))
def is_valid_port(p: int) -> bool:
return isinstance(p, int) and 1 <= p <= 65535
def safe_domain(d: str) -> str:
"""Reject path traversal and uppercases; ensure it's a normal FQDN."""
if not is_valid_domain(d):
raise ValueError("Invalid domain")
return d.lower()
# --------------------------- Models -----------------------------
class GenRequest(BaseModel):
domain: str
port: int
ip: Optional[str] = None
dryRun: Optional[bool] = False
skipReload: Optional[bool] = False
@field_validator("domain")
@classmethod
def _dom(cls, v: str) -> str:
if not is_valid_domain(v):
raise ValueError("Invalid domain")
return v
@field_validator("port")
@classmethod
def _prt(cls, v: int) -> int:
if not is_valid_port(v):
raise ValueError("Invalid port")
return v
@field_validator("ip")
@classmethod
def _ip(cls, v: Optional[str]) -> Optional[str]:
if v is None or str(v).strip() == "":
return None
if not is_valid_ipv4(v):
raise ValueError("Invalid ip")
return v
# ------------------------- Auth dependency ----------------------
def require_api_key(x_api_key: Optional[str] = Header(default=None)) -> None:
return
if DISABLE_AUTH:
return
if not API_KEY:
# If you forgot to set API_KEY but disabled auth is False
raise HTTPException(status_code=500, detail="Server misconfigured: API_KEY not set")
if x_api_key != API_KEY:
print(x_api_key,"!=",API_KEY)
raise HTTPException(status_code=401, detail="Invalid or missing API key")
# ----------------------- Templating funcs -----------------------
# def build_ssl(ip: str, domain: str, port: int) -> str:
# base = f"{PARENT_PATH}/{domain}"
# public = f"{PUBLIC_WEB_ROOT}/{domain}"
# return f"""server {{
# listen {ip}:443 ssl;
# server_name {domain};
# error_log {ERROR_LOG_ROOT}/{domain}.error.log error;
# ssl_certificate {base}/ssl/{domain}.pem;
# ssl_certificate_key {base}/ssl/{domain}.key;
# location / {{
# proxy_pass http://127.0.0.1:{port};
# proxy_http_version 1.1;
# proxy_set_header Upgrade $http_upgrade;
# proxy_set_header Connection 'upgrade';
# proxy_set_header Host $host;
# proxy_cache_bypass $http_upgrade;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# proxy_set_header X-Forwarded-Proto $scheme;
# }}
# location ~ /\\.(?!well-known\\/|file) {{
# deny all;
# return 404;
# }}
# location /error/ {{
# alias {public}/document_errors/;
# }}
# disable_symlinks if_not_owner from={public}/public_html;
# include {base}/nginx.ssl.conf_*;
# }}
# """.rstrip() + "\n"
# def build_nonssl(ip: str, domain: str, port: int) -> str:
# base = f"{PARENT_PATH}/{domain}"
# public = f"{PUBLIC_WEB_ROOT}/{domain}"
# return f"""server {{
# listen {ip}:80;
# server_name {domain};
# error_log {ERROR_LOG_ROOT}/{domain}.error.log error;
# include {base}/nginx.forcessl.conf*;
# location / {{
# proxy_pass http://127.0.0.1:{port};
# proxy_http_version 1.1;
# proxy_set_header Upgrade $http_upgrade;
# proxy_set_header Connection 'upgrade';
# proxy_set_header Host $host;
# proxy_cache_bypass $http_upgrade;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# proxy_set_header X-Forwarded-Proto $scheme;
# }}
# location ~ /\\.(?!well-known\\/|file) {{
# deny all;
# return 404;
# }}
# location /error/ {{
# alias {public}/document_errors/;
# }}
# disable_symlinks if_not_owner from={public}/public_html;
# include {base}/nginx.conf_*;
# }}
# """.rstrip() + "\n"
def build_ssl(ip: str, domain: str, port: int) -> str:
base = f"{PARENT_PATH}/{domain}"
public = f"{PUBLIC_WEB_ROOT}/{domain}"
return f"""server {{
listen {ip}:443 ssl;
server_name {domain};
error_log {ERROR_LOG_ROOT}/{domain}.error.log error;
ssl_certificate {base}/ssl/{domain}.pem;
ssl_certificate_key {base}/ssl/{domain}.key;
ssl_stapling on;
ssl_stapling_verify on;
# TLS 1.3 0-RTT protection
if ($anti_replay = 307) {{ return 307 https://$host$request_uri; }}
if ($anti_replay = 425) {{ return 425; }}
include {base}/nginx.hsts.conf*;
#################################################################
# FORCE BROWSER TO UPGRADE HTTP REQUESTS TO HTTPS
#################################################################
add_header Content-Security-Policy "upgrade-insecure-requests";
#################################################################
# SECURITY
#################################################################
location ~ /\\.(?!well-known\\/|file) {{
deny all;
return 404;
}}
#################################################################
# MAIN REVERSE PROXY
#################################################################
location / {{
proxy_pass http://127.0.0.1:{port};
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-NginX-Proxy true;
proxy_set_header X-Forwarded-Proto https;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port 443;
# convert upstream http redirects into https
proxy_redirect http:// https://;
proxy_cache_bypass $http_upgrade;
proxy_request_buffering off;
proxy_connect_timeout 36000s;
proxy_read_timeout 36000s;
proxy_send_timeout 36000s;
send_timeout 36000s;
client_max_body_size 10240m;
}}
#################################################################
location /error/ {{
alias {public}/document_errors/;
}}
disable_symlinks if_not_owner from={public}/public_html;
proxy_hide_header Upgrade;
include {base}/nginx.ssl.conf_*;
}}
""".rstrip() + "\n"
def build_nonssl(ip: str, domain: str, port: int) -> str:
return f"""server {{
listen {ip}:80;
server_name {domain};
return 301 https://$host$request_uri;
}}
""".rstrip() + "\n"
# ----------------------- Shell helpers --------------------------
def run(cmd: list[str], timeout: int = 30) -> dict:
"""Run a command and return {ok, rc, stdout, stderr} without raising."""
try:
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=False,
text=True,
)
return {"ok": p.returncode == 0, "rc": p.returncode, "stdout": p.stdout, "stderr": p.stderr}
except subprocess.TimeoutExpired as te:
return {"ok": False, "rc": -1, "stdout": te.stdout or "", "stderr": f"Timeout: {te}"}
except Exception as e:
return {"ok": False, "rc": -2, "stdout": "", "stderr": str(e)}
# --------------------------- Routes -----------------------------
@app.get("/healthz")
def healthz():
return {"ok": True, "ip": STATIC_IP, "parent": PARENT_PATH}
@app.post("/api/nginx/app", dependencies=[Depends(require_api_key)])
def create_or_update_vhost(payload: GenRequest = Body(...)) -> JSONResponse:
# Resolve values
ip = payload.ip or STATIC_IP
domain = safe_domain(payload.domain)
port = int(payload.port)
# Render templates
ssl_txt = build_ssl(ip, domain, port)
nonssl_txt = build_nonssl(ip, domain, port)
# Target paths
domain_dir = pathlib.Path(PARENT_PATH) / domain
ssl_file = domain_dir / "nginx.ssl.conf"
nonssl_file = domain_dir / "nginx.conf"
if payload.dryRun:
return JSONResponse(
{
"ok": True,
"dryRun": True,
"files": {
str(nonssl_file): nonssl_txt,
str(ssl_file): ssl_txt,
},
}
)
# Ensure directory exists
try:
domain_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create {domain_dir}: {e}")
# Check write access
if not os.access(domain_dir, os.W_OK | os.X_OK):
raise HTTPException(
status_code=403,
detail=f"No write access to {domain_dir}. Run as root or fix permissions.",
)
# Backup old files, if present
def backup_if_exists(p: pathlib.Path) -> Optional[pathlib.Path]:
try:
if p.is_file():
bkp = p.with_suffix(p.suffix + f".bak-{int(time.time())}")
bkp.write_text(p.read_text(encoding="utf-8"), encoding="utf-8")
return bkp
except Exception:
pass
return None
nonssl_bkp = backup_if_exists(nonssl_file)
ssl_bkp = backup_if_exists(ssl_file)
# Write new files
try:
nonssl_file.write_text(nonssl_txt, encoding="utf-8")
ssl_file.write_text(ssl_txt, encoding="utf-8")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed writing files: {e}")
# Validate Nginx config (nginx -t)
test = run([SUDO_BIN, NGINX_BIN, "-t"])
if not test["ok"]:
# Restore backups
try:
if nonssl_bkp and nonssl_bkp.exists():
nonssl_file.write_text(nonssl_bkp.read_text(encoding="utf-8"), encoding="utf-8")
if ssl_bkp and ssl_bkp.exists():
ssl_file.write_text(ssl_bkp.read_text(encoding="utf-8"), encoding="utf-8")
except Exception:
pass
raise HTTPException(
status_code=500,
detail=f"nginx -t failed: rc={test['rc']} stderr={test['stderr'] or test['stdout']}",
)
# Reload Nginx (unless skipped)
reload_out: Dict[str, Any] = {"skipped": True}
if not payload.skipReload:
reload_out = run([SUDO_BIN, SYSTEMCTL_BIN, "reload", "nginx"])
if not reload_out["ok"]:
# Config is valid, but reload failed — return error for visibility
raise HTTPException(
status_code=500,
detail=f"Failed to reload nginx: rc={reload_out['rc']} stderr={reload_out['stderr'] or reload_out['stdout']}",
)
return JSONResponse(
{
"ok": True,
"written": [str(nonssl_file), str(ssl_file)],
"test": test,
"reload": reload_out,
"render": {"nonssl": nonssl_txt, "ssl": ssl_txt},
}
)