#!/usr/bin/env python3 """ FastAPI-based Nginx reverse-proxy generator. POST /api/nginx/app Body: { "domain": "ebayapp.data4autos.com", "port": 3011, "ip": "147.93.40.215", # optional (defaults to STATIC_IP) "dryRun": false, # optional "skipReload": false # optional } Auth (required unless DISABLE_AUTH=true): send header: X-API-Key: Run as root (recommended) or configure sudoers for nginx/systemctl. """ import os import re import time import pathlib import subprocess from typing import Optional, Dict, Any from fastapi import FastAPI, Body, HTTPException, Header, Depends from fastapi.responses import JSONResponse from pydantic import BaseModel, field_validator # ---------------------------- Config ---------------------------- STATIC_IP = os.getenv("STATIC_IP", "147.93.40.215") PARENT_PATH = os.getenv("PARENT_PATH", "/home/user/conf/web") # // PUBLIC_WEB_ROOT = os.getenv("PUBLIC_WEB_ROOT", "/home/user/web") # for /error alias ERROR_LOG_ROOT = os.getenv("ERROR_LOG_ROOT", "/var/log/apache2/domains") # Binaries (absolute paths are safer under systemd) SUDO_BIN = os.getenv("SUDO_BIN", "/usr/bin/sudo") NGINX_BIN = os.getenv("NGINX_BIN", "/usr/sbin/nginx") SYSTEMCTL_BIN = os.getenv("SYSTEMCTL_BIN", "/bin/systemctl") # Simple header auth API_KEY = os.getenv("API_KEY", "") DISABLE_AUTH = os.getenv("DISABLE_AUTH", "false").lower() in ("1", "true", "yes") # Safety: bound interface/port for uvicorn (when you use the run command below) BIND_HOST = os.getenv("BIND_HOST", "127.0.0.1") # expose via reverse proxy if you like BIND_PORT = int(os.getenv("BIND_PORT", "5055")) # ------------------------- App bootstrap ------------------------ app = FastAPI(title="Nginx Reverse Proxy Generator (FastAPI)") # ------------------------- Validators --------------------------- _ipv4_re = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$") _domain_re = re.compile(r"^(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$") def is_valid_ipv4(ip: str) -> bool: if not _ipv4_re.match(ip): return False parts = ip.split(".") try: return all(0 <= int(p) <= 255 and str(int(p)) == p for p in parts) except Exception: return False def is_valid_domain(d: str) -> bool: return bool(_domain_re.match(d)) def is_valid_port(p: int) -> bool: return isinstance(p, int) and 1 <= p <= 65535 def safe_domain(d: str) -> str: """Reject path traversal and uppercases; ensure it's a normal FQDN.""" if not is_valid_domain(d): raise ValueError("Invalid domain") return d.lower() # --------------------------- Models ----------------------------- class GenRequest(BaseModel): domain: str port: int ip: Optional[str] = None dryRun: Optional[bool] = False skipReload: Optional[bool] = False @field_validator("domain") @classmethod def _dom(cls, v: str) -> str: if not is_valid_domain(v): raise ValueError("Invalid domain") return v @field_validator("port") @classmethod def _prt(cls, v: int) -> int: if not is_valid_port(v): raise ValueError("Invalid port") return v @field_validator("ip") @classmethod def _ip(cls, v: Optional[str]) -> Optional[str]: if v is None or str(v).strip() == "": return None if not is_valid_ipv4(v): raise ValueError("Invalid ip") return v # ------------------------- Auth dependency ---------------------- def require_api_key(x_api_key: Optional[str] = Header(default=None)) -> None: return if DISABLE_AUTH: return if not API_KEY: # If you forgot to set API_KEY but disabled auth is False raise HTTPException(status_code=500, detail="Server misconfigured: API_KEY not set") if x_api_key != API_KEY: print(x_api_key,"!=",API_KEY) raise HTTPException(status_code=401, detail="Invalid or missing API key") # ----------------------- Templating funcs ----------------------- # def build_ssl(ip: str, domain: str, port: int) -> str: # base = f"{PARENT_PATH}/{domain}" # public = f"{PUBLIC_WEB_ROOT}/{domain}" # return f"""server {{ # listen {ip}:443 ssl; # server_name {domain}; # error_log {ERROR_LOG_ROOT}/{domain}.error.log error; # ssl_certificate {base}/ssl/{domain}.pem; # ssl_certificate_key {base}/ssl/{domain}.key; # location / {{ # proxy_pass http://127.0.0.1:{port}; # proxy_http_version 1.1; # proxy_set_header Upgrade $http_upgrade; # proxy_set_header Connection 'upgrade'; # proxy_set_header Host $host; # proxy_cache_bypass $http_upgrade; # proxy_set_header X-Real-IP $remote_addr; # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # proxy_set_header X-Forwarded-Proto $scheme; # }} # location ~ /\\.(?!well-known\\/|file) {{ # deny all; # return 404; # }} # location /error/ {{ # alias {public}/document_errors/; # }} # disable_symlinks if_not_owner from={public}/public_html; # include {base}/nginx.ssl.conf_*; # }} # """.rstrip() + "\n" # def build_nonssl(ip: str, domain: str, port: int) -> str: # base = f"{PARENT_PATH}/{domain}" # public = f"{PUBLIC_WEB_ROOT}/{domain}" # return f"""server {{ # listen {ip}:80; # server_name {domain}; # error_log {ERROR_LOG_ROOT}/{domain}.error.log error; # include {base}/nginx.forcessl.conf*; # location / {{ # proxy_pass http://127.0.0.1:{port}; # proxy_http_version 1.1; # proxy_set_header Upgrade $http_upgrade; # proxy_set_header Connection 'upgrade'; # proxy_set_header Host $host; # proxy_cache_bypass $http_upgrade; # proxy_set_header X-Real-IP $remote_addr; # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # proxy_set_header X-Forwarded-Proto $scheme; # }} # location ~ /\\.(?!well-known\\/|file) {{ # deny all; # return 404; # }} # location /error/ {{ # alias {public}/document_errors/; # }} # disable_symlinks if_not_owner from={public}/public_html; # include {base}/nginx.conf_*; # }} # """.rstrip() + "\n" def build_ssl(ip: str, domain: str, port: int) -> str: base = f"{PARENT_PATH}/{domain}" public = f"{PUBLIC_WEB_ROOT}/{domain}" return f"""server {{ listen {ip}:443 ssl; server_name {domain}; error_log {ERROR_LOG_ROOT}/{domain}.error.log error; ssl_certificate {base}/ssl/{domain}.pem; ssl_certificate_key {base}/ssl/{domain}.key; ssl_stapling on; ssl_stapling_verify on; # TLS 1.3 0-RTT protection if ($anti_replay = 307) {{ return 307 https://$host$request_uri; }} if ($anti_replay = 425) {{ return 425; }} include {base}/nginx.hsts.conf*; ################################################################# # FORCE BROWSER TO UPGRADE HTTP REQUESTS TO HTTPS ################################################################# add_header Content-Security-Policy "upgrade-insecure-requests"; ################################################################# # SECURITY ################################################################# location ~ /\\.(?!well-known\\/|file) {{ deny all; return 404; }} ################################################################# # MAIN REVERSE PROXY ################################################################# location / {{ proxy_pass http://127.0.0.1:{port}; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-NginX-Proxy true; proxy_set_header X-Forwarded-Proto https; proxy_set_header X-Forwarded-Host $host; proxy_set_header X-Forwarded-Port 443; # convert upstream http redirects into https proxy_redirect http:// https://; proxy_cache_bypass $http_upgrade; proxy_request_buffering off; proxy_connect_timeout 36000s; proxy_read_timeout 36000s; proxy_send_timeout 36000s; send_timeout 36000s; client_max_body_size 10240m; }} ################################################################# location /error/ {{ alias {public}/document_errors/; }} disable_symlinks if_not_owner from={public}/public_html; proxy_hide_header Upgrade; include {base}/nginx.ssl.conf_*; }} """.rstrip() + "\n" def build_nonssl(ip: str, domain: str, port: int) -> str: return f"""server {{ listen {ip}:80; server_name {domain}; return 301 https://$host$request_uri; }} """.rstrip() + "\n" # ----------------------- Shell helpers -------------------------- def run(cmd: list[str], timeout: int = 30) -> dict: """Run a command and return {ok, rc, stdout, stderr} without raising.""" try: p = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=False, text=True, ) return {"ok": p.returncode == 0, "rc": p.returncode, "stdout": p.stdout, "stderr": p.stderr} except subprocess.TimeoutExpired as te: return {"ok": False, "rc": -1, "stdout": te.stdout or "", "stderr": f"Timeout: {te}"} except Exception as e: return {"ok": False, "rc": -2, "stdout": "", "stderr": str(e)} # --------------------------- Routes ----------------------------- @app.get("/healthz") def healthz(): return {"ok": True, "ip": STATIC_IP, "parent": PARENT_PATH} @app.post("/api/nginx/app", dependencies=[Depends(require_api_key)]) def create_or_update_vhost(payload: GenRequest = Body(...)) -> JSONResponse: # Resolve values ip = payload.ip or STATIC_IP domain = safe_domain(payload.domain) port = int(payload.port) # Render templates ssl_txt = build_ssl(ip, domain, port) nonssl_txt = build_nonssl(ip, domain, port) # Target paths domain_dir = pathlib.Path(PARENT_PATH) / domain ssl_file = domain_dir / "nginx.ssl.conf" nonssl_file = domain_dir / "nginx.conf" if payload.dryRun: return JSONResponse( { "ok": True, "dryRun": True, "files": { str(nonssl_file): nonssl_txt, str(ssl_file): ssl_txt, }, } ) # Ensure directory exists try: domain_dir.mkdir(parents=True, exist_ok=True) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create {domain_dir}: {e}") # Check write access if not os.access(domain_dir, os.W_OK | os.X_OK): raise HTTPException( status_code=403, detail=f"No write access to {domain_dir}. Run as root or fix permissions.", ) # Backup old files, if present def backup_if_exists(p: pathlib.Path) -> Optional[pathlib.Path]: try: if p.is_file(): bkp = p.with_suffix(p.suffix + f".bak-{int(time.time())}") bkp.write_text(p.read_text(encoding="utf-8"), encoding="utf-8") return bkp except Exception: pass return None nonssl_bkp = backup_if_exists(nonssl_file) ssl_bkp = backup_if_exists(ssl_file) # Write new files try: nonssl_file.write_text(nonssl_txt, encoding="utf-8") ssl_file.write_text(ssl_txt, encoding="utf-8") except Exception as e: raise HTTPException(status_code=500, detail=f"Failed writing files: {e}") # Validate Nginx config (nginx -t) test = run([SUDO_BIN, NGINX_BIN, "-t"]) if not test["ok"]: # Restore backups try: if nonssl_bkp and nonssl_bkp.exists(): nonssl_file.write_text(nonssl_bkp.read_text(encoding="utf-8"), encoding="utf-8") if ssl_bkp and ssl_bkp.exists(): ssl_file.write_text(ssl_bkp.read_text(encoding="utf-8"), encoding="utf-8") except Exception: pass raise HTTPException( status_code=500, detail=f"nginx -t failed: rc={test['rc']} stderr={test['stderr'] or test['stdout']}", ) # Reload Nginx (unless skipped) reload_out: Dict[str, Any] = {"skipped": True} if not payload.skipReload: reload_out = run([SUDO_BIN, SYSTEMCTL_BIN, "reload", "nginx"]) if not reload_out["ok"]: # Config is valid, but reload failed — return error for visibility raise HTTPException( status_code=500, detail=f"Failed to reload nginx: rc={reload_out['rc']} stderr={reload_out['stderr'] or reload_out['stdout']}", ) return JSONResponse( { "ok": True, "written": [str(nonssl_file), str(ssl_file)], "test": test, "reload": reload_out, "render": {"nonssl": nonssl_txt, "ssl": ssl_txt}, } )