2026-04-13 16:42:01 +00:00

332 lines
9.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
FastAPI-based Nginx reverse-proxy generator.
POST /api/nginx/app
Body:
{
"domain": "ebayapp.data4autos.com",
"port": 3011,
"ip": "147.93.40.215",
"dryRun": false,
"skipReload": false
}
Auth (required unless DISABLE_AUTH=true):
send header: X-API-Key: <API_KEY>
"""
import os
import re
import time
import pathlib
import subprocess
from typing import Optional, Dict, Any
from fastapi import FastAPI, Body, HTTPException, Header, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, field_validator
# ---------------------------- Config ----------------------------
STATIC_IP = os.getenv("STATIC_IP", "147.93.40.215")
PARENT_PATH = os.getenv("PARENT_PATH", "/home/user/conf/web")
PUBLIC_WEB_ROOT = os.getenv("PUBLIC_WEB_ROOT", "/home/user/web")
ERROR_LOG_ROOT = os.getenv("ERROR_LOG_ROOT", "/var/log/apache2/domains")
SUDO_BIN = os.getenv("SUDO_BIN", "/usr/bin/sudo")
NGINX_BIN = os.getenv("NGINX_BIN", "/usr/sbin/nginx")
SYSTEMCTL_BIN = os.getenv("SYSTEMCTL_BIN", "/bin/systemctl")
API_KEY = os.getenv("API_KEY", "")
DISABLE_AUTH = os.getenv("DISABLE_AUTH", "false").lower() in ("1", "true", "yes")
BIND_HOST = os.getenv("BIND_HOST", "127.0.0.1")
BIND_PORT = int(os.getenv("BIND_PORT", "5055"))
# ------------------------- App bootstrap ------------------------
app = FastAPI(title="Nginx Reverse Proxy Generator (FastAPI)")
# ------------------------- Validators ---------------------------
_ipv4_re = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$")
_domain_re = re.compile(r"^(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$")
def is_valid_ipv4(ip: str) -> bool:
if not _ipv4_re.match(ip):
return False
parts = ip.split(".")
try:
return all(0 <= int(p) <= 255 and str(int(p)) == p for p in parts)
except Exception:
return False
def is_valid_domain(d: str) -> bool:
return bool(_domain_re.match(d))
def is_valid_port(p: int) -> bool:
return isinstance(p, int) and 1 <= p <= 65535
def safe_domain(d: str) -> str:
if not is_valid_domain(d):
raise ValueError("Invalid domain")
return d.lower()
# --------------------------- Models -----------------------------
class GenRequest(BaseModel):
domain: str
port: int
ip: Optional[str] = None
dryRun: Optional[bool] = False
skipReload: Optional[bool] = False
@field_validator("domain")
@classmethod
def _dom(cls, v: str) -> str:
if not is_valid_domain(v):
raise ValueError("Invalid domain")
return v
@field_validator("port")
@classmethod
def _prt(cls, v: int) -> int:
if not is_valid_port(v):
raise ValueError("Invalid port")
return v
@field_validator("ip")
@classmethod
def _ip(cls, v: Optional[str]) -> Optional[str]:
if v is None or str(v).strip() == "":
return None
if not is_valid_ipv4(v):
raise ValueError("Invalid ip")
return v
# ------------------------- Auth dependency ----------------------
def require_api_key(x_api_key: Optional[str] = Header(default=None)) -> None:
return
if DISABLE_AUTH:
return
if not API_KEY:
raise HTTPException(status_code=500, detail="Server misconfigured: API_KEY not set")
if x_api_key != API_KEY:
print(x_api_key, "!=", API_KEY)
raise HTTPException(status_code=401, detail="Invalid or missing API key")
# ----------------------- Templating funcs -----------------------
def build_ssl(ip: str, domain: str, port: int) -> str:
base = f"{PARENT_PATH}/{domain}"
public = f"{PUBLIC_WEB_ROOT}/{domain}"
return f"""server {{
listen {ip}:443 ssl;
server_name {domain};
error_log {ERROR_LOG_ROOT}/{domain}.error.log error;
ssl_certificate {base}/ssl/{domain}.pem;
ssl_certificate_key {base}/ssl/{domain}.key;
ssl_stapling on;
ssl_stapling_verify on;
# TLS 1.3 0-RTT protection
if ($anti_replay = 307) {{ return 307 https://$host$request_uri; }}
if ($anti_replay = 425) {{ return 425; }}
include {base}/nginx.hsts.conf*;
#################################################################
# FORCE BROWSER TO UPGRADE HTTP REQUESTS TO HTTPS
#################################################################
add_header Content-Security-Policy "upgrade-insecure-requests";
#################################################################
# SECURITY
#################################################################
location ~ /\\.(?!well-known\\/|file) {{
deny all;
return 404;
}}
#################################################################
# MAIN REVERSE PROXY
#################################################################
location / {{
proxy_pass http://127.0.0.1:{port};
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-NginX-Proxy true;
proxy_set_header X-Forwarded-Proto https;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port 443;
# convert upstream http redirects into https
proxy_redirect http:// https://;
proxy_cache_bypass $http_upgrade;
proxy_request_buffering off;
proxy_connect_timeout 36000s;
proxy_read_timeout 36000s;
proxy_send_timeout 36000s;
send_timeout 36000s;
client_max_body_size 10240m;
}}
#################################################################
location /error/ {{
alias {public}/document_errors/;
}}
disable_symlinks if_not_owner from={public}/public_html;
proxy_hide_header Upgrade;
include {base}/nginx.ssl.conf_*;
}}
""".rstrip() + "\n"
def build_nonssl(ip: str, domain: str, port: int) -> str:
return f"""server {{
listen {ip}:80;
server_name {domain};
return 301 https://$host$request_uri;
}}
""".rstrip() + "\n"
# ----------------------- Shell helpers --------------------------
def run(cmd: list[str], timeout: int = 30) -> dict:
try:
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=False,
text=True,
)
return {"ok": p.returncode == 0, "rc": p.returncode, "stdout": p.stdout, "stderr": p.stderr}
except subprocess.TimeoutExpired as te:
return {"ok": False, "rc": -1, "stdout": te.stdout or "", "stderr": f"Timeout: {te}"}
except Exception as e:
return {"ok": False, "rc": -2, "stdout": "", "stderr": str(e)}
# --------------------------- Routes -----------------------------
@app.get("/healthz")
def healthz():
return {"ok": True, "ip": STATIC_IP, "parent": PARENT_PATH}
@app.post("/api/nginx/app", dependencies=[Depends(require_api_key)])
def create_or_update_vhost(payload: GenRequest = Body(...)) -> JSONResponse:
ip = payload.ip or STATIC_IP
domain = safe_domain(payload.domain)
port = int(payload.port)
ssl_txt = build_ssl(ip, domain, port)
nonssl_txt = build_nonssl(ip, domain, port)
domain_dir = pathlib.Path(PARENT_PATH) / domain
ssl_file = domain_dir / "nginx.ssl.conf"
nonssl_file = domain_dir / "nginx.conf"
if payload.dryRun:
return JSONResponse(
{
"ok": True,
"dryRun": True,
"files": {
str(nonssl_file): nonssl_txt,
str(ssl_file): ssl_txt,
},
}
)
try:
domain_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create {domain_dir}: {e}")
if not os.access(domain_dir, os.W_OK | os.X_OK):
raise HTTPException(
status_code=403,
detail=f"No write access to {domain_dir}. Run as root or fix permissions.",
)
def backup_if_exists(p: pathlib.Path) -> Optional[pathlib.Path]:
try:
if p.is_file():
bkp = p.with_suffix(p.suffix + f".bak-{int(time.time())}")
bkp.write_text(p.read_text(encoding="utf-8"), encoding="utf-8")
return bkp
except Exception:
pass
return None
nonssl_bkp = backup_if_exists(nonssl_file)
ssl_bkp = backup_if_exists(ssl_file)
try:
nonssl_file.write_text(nonssl_txt, encoding="utf-8")
ssl_file.write_text(ssl_txt, encoding="utf-8")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed writing files: {e}")
test = run([SUDO_BIN, NGINX_BIN, "-t"])
if not test["ok"]:
try:
if nonssl_bkp and nonssl_bkp.exists():
nonssl_file.write_text(nonssl_bkp.read_text(encoding="utf-8"), encoding="utf-8")
if ssl_bkp and ssl_bkp.exists():
ssl_file.write_text(ssl_bkp.read_text(encoding="utf-8"), encoding="utf-8")
except Exception:
pass
raise HTTPException(
status_code=500,
detail=f"nginx -t failed: rc={test['rc']} stderr={test['stderr'] or test['stdout']}",
)
reload_out: Dict[str, Any] = {"skipped": True}
if not payload.skipReload:
reload_out = run([SUDO_BIN, SYSTEMCTL_BIN, "reload", "nginx"])
if not reload_out["ok"]:
raise HTTPException(
status_code=500,
detail=f"Failed to reload nginx: rc={reload_out['rc']} stderr={reload_out['stderr'] or reload_out['stdout']}",
)
return JSONResponse(
{
"ok": True,
"written": [str(nonssl_file), str(ssl_file)],
"test": test,
"reload": reload_out,
"render": {"nonssl": nonssl_txt, "ssl": ssl_txt},
}
)