#!/usr/bin/env python3 """ FastAPI-based Nginx reverse-proxy generator. POST /api/nginx/app Body: { "domain": "ebayapp.data4autos.com", "port": 3011, "ip": "147.93.40.215", "dryRun": false, "skipReload": false } Auth (required unless DISABLE_AUTH=true): send header: X-API-Key: """ import os import re import time import pathlib import subprocess from typing import Optional, Dict, Any from fastapi import FastAPI, Body, HTTPException, Header, Depends from fastapi.responses import JSONResponse from pydantic import BaseModel, field_validator # ---------------------------- Config ---------------------------- STATIC_IP = os.getenv("STATIC_IP", "147.93.40.215") PARENT_PATH = os.getenv("PARENT_PATH", "/home/user/conf/web") PUBLIC_WEB_ROOT = os.getenv("PUBLIC_WEB_ROOT", "/home/user/web") ERROR_LOG_ROOT = os.getenv("ERROR_LOG_ROOT", "/var/log/apache2/domains") SUDO_BIN = os.getenv("SUDO_BIN", "/usr/bin/sudo") NGINX_BIN = os.getenv("NGINX_BIN", "/usr/sbin/nginx") SYSTEMCTL_BIN = os.getenv("SYSTEMCTL_BIN", "/bin/systemctl") API_KEY = os.getenv("API_KEY", "") DISABLE_AUTH = os.getenv("DISABLE_AUTH", "false").lower() in ("1", "true", "yes") BIND_HOST = os.getenv("BIND_HOST", "127.0.0.1") BIND_PORT = int(os.getenv("BIND_PORT", "5055")) # ------------------------- App bootstrap ------------------------ app = FastAPI(title="Nginx Reverse Proxy Generator (FastAPI)") # ------------------------- Validators --------------------------- _ipv4_re = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$") _domain_re = re.compile(r"^(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$") def is_valid_ipv4(ip: str) -> bool: if not _ipv4_re.match(ip): return False parts = ip.split(".") try: return all(0 <= int(p) <= 255 and str(int(p)) == p for p in parts) except Exception: return False def is_valid_domain(d: str) -> bool: return bool(_domain_re.match(d)) def is_valid_port(p: int) -> bool: return isinstance(p, int) and 1 <= p <= 65535 def safe_domain(d: str) -> str: if not is_valid_domain(d): raise ValueError("Invalid domain") return d.lower() # --------------------------- Models ----------------------------- class GenRequest(BaseModel): domain: str port: int ip: Optional[str] = None dryRun: Optional[bool] = False skipReload: Optional[bool] = False @field_validator("domain") @classmethod def _dom(cls, v: str) -> str: if not is_valid_domain(v): raise ValueError("Invalid domain") return v @field_validator("port") @classmethod def _prt(cls, v: int) -> int: if not is_valid_port(v): raise ValueError("Invalid port") return v @field_validator("ip") @classmethod def _ip(cls, v: Optional[str]) -> Optional[str]: if v is None or str(v).strip() == "": return None if not is_valid_ipv4(v): raise ValueError("Invalid ip") return v # ------------------------- Auth dependency ---------------------- def require_api_key(x_api_key: Optional[str] = Header(default=None)) -> None: return if DISABLE_AUTH: return if not API_KEY: raise HTTPException(status_code=500, detail="Server misconfigured: API_KEY not set") if x_api_key != API_KEY: print(x_api_key, "!=", API_KEY) raise HTTPException(status_code=401, detail="Invalid or missing API key") # ----------------------- Templating funcs ----------------------- def build_ssl(ip: str, domain: str, port: int) -> str: base = f"{PARENT_PATH}/{domain}" public = f"{PUBLIC_WEB_ROOT}/{domain}" return f"""server {{ listen {ip}:443 ssl; server_name {domain}; error_log {ERROR_LOG_ROOT}/{domain}.error.log error; ssl_certificate {base}/ssl/{domain}.pem; ssl_certificate_key {base}/ssl/{domain}.key; ssl_stapling on; ssl_stapling_verify on; # TLS 1.3 0-RTT protection if ($anti_replay = 307) {{ return 307 https://$host$request_uri; }} if ($anti_replay = 425) {{ return 425; }} include {base}/nginx.hsts.conf*; ################################################################# # FORCE BROWSER TO UPGRADE HTTP REQUESTS TO HTTPS ################################################################# add_header Content-Security-Policy "upgrade-insecure-requests"; ################################################################# # SECURITY ################################################################# location ~ /\\.(?!well-known\\/|file) {{ deny all; return 404; }} ################################################################# # MAIN REVERSE PROXY ################################################################# location / {{ proxy_pass http://127.0.0.1:{port}; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-NginX-Proxy true; proxy_set_header X-Forwarded-Proto https; proxy_set_header X-Forwarded-Host $host; proxy_set_header X-Forwarded-Port 443; # convert upstream http redirects into https proxy_redirect http:// https://; proxy_cache_bypass $http_upgrade; proxy_request_buffering off; proxy_connect_timeout 36000s; proxy_read_timeout 36000s; proxy_send_timeout 36000s; send_timeout 36000s; client_max_body_size 10240m; }} ################################################################# location /error/ {{ alias {public}/document_errors/; }} disable_symlinks if_not_owner from={public}/public_html; proxy_hide_header Upgrade; include {base}/nginx.ssl.conf_*; }} """.rstrip() + "\n" def build_nonssl(ip: str, domain: str, port: int) -> str: return f"""server {{ listen {ip}:80; server_name {domain}; return 301 https://$host$request_uri; }} """.rstrip() + "\n" # ----------------------- Shell helpers -------------------------- def run(cmd: list[str], timeout: int = 30) -> dict: try: p = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=False, text=True, ) return {"ok": p.returncode == 0, "rc": p.returncode, "stdout": p.stdout, "stderr": p.stderr} except subprocess.TimeoutExpired as te: return {"ok": False, "rc": -1, "stdout": te.stdout or "", "stderr": f"Timeout: {te}"} except Exception as e: return {"ok": False, "rc": -2, "stdout": "", "stderr": str(e)} # --------------------------- Routes ----------------------------- @app.get("/healthz") def healthz(): return {"ok": True, "ip": STATIC_IP, "parent": PARENT_PATH} @app.post("/api/nginx/app", dependencies=[Depends(require_api_key)]) def create_or_update_vhost(payload: GenRequest = Body(...)) -> JSONResponse: ip = payload.ip or STATIC_IP domain = safe_domain(payload.domain) port = int(payload.port) ssl_txt = build_ssl(ip, domain, port) nonssl_txt = build_nonssl(ip, domain, port) domain_dir = pathlib.Path(PARENT_PATH) / domain ssl_file = domain_dir / "nginx.ssl.conf" nonssl_file = domain_dir / "nginx.conf" if payload.dryRun: return JSONResponse( { "ok": True, "dryRun": True, "files": { str(nonssl_file): nonssl_txt, str(ssl_file): ssl_txt, }, } ) try: domain_dir.mkdir(parents=True, exist_ok=True) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create {domain_dir}: {e}") if not os.access(domain_dir, os.W_OK | os.X_OK): raise HTTPException( status_code=403, detail=f"No write access to {domain_dir}. Run as root or fix permissions.", ) def backup_if_exists(p: pathlib.Path) -> Optional[pathlib.Path]: try: if p.is_file(): bkp = p.with_suffix(p.suffix + f".bak-{int(time.time())}") bkp.write_text(p.read_text(encoding="utf-8"), encoding="utf-8") return bkp except Exception: pass return None nonssl_bkp = backup_if_exists(nonssl_file) ssl_bkp = backup_if_exists(ssl_file) try: nonssl_file.write_text(nonssl_txt, encoding="utf-8") ssl_file.write_text(ssl_txt, encoding="utf-8") except Exception as e: raise HTTPException(status_code=500, detail=f"Failed writing files: {e}") test = run([SUDO_BIN, NGINX_BIN, "-t"]) if not test["ok"]: try: if nonssl_bkp and nonssl_bkp.exists(): nonssl_file.write_text(nonssl_bkp.read_text(encoding="utf-8"), encoding="utf-8") if ssl_bkp and ssl_bkp.exists(): ssl_file.write_text(ssl_bkp.read_text(encoding="utf-8"), encoding="utf-8") except Exception: pass raise HTTPException( status_code=500, detail=f"nginx -t failed: rc={test['rc']} stderr={test['stderr'] or test['stdout']}", ) reload_out: Dict[str, Any] = {"skipped": True} if not payload.skipReload: reload_out = run([SUDO_BIN, SYSTEMCTL_BIN, "reload", "nginx"]) if not reload_out["ok"]: raise HTTPException( status_code=500, detail=f"Failed to reload nginx: rc={reload_out['rc']} stderr={reload_out['stderr'] or reload_out['stdout']}", ) return JSONResponse( { "ok": True, "written": [str(nonssl_file), str(ssl_file)], "test": test, "reload": reload_out, "render": {"nonssl": nonssl_txt, "ssl": ssl_txt}, } )