""" Async HTTP client for Frappe REST API. Handles API key/secret auth — works with any remote Frappe instance including Docker. """ import json import httpx from typing import Any from frappe_mcp.config import get_settings class FrappeAPIError(Exception): def __init__(self, message: str, status_code: int = 0, exc_type: str = ""): super().__init__(message) self.status_code = status_code self.exc_type = exc_type class FrappeClient: def __init__(self): self.settings = get_settings() self.base_url = self.settings.frappe_url self._auth_header = self._build_auth_header() def _build_auth_header(self) -> dict[str, str]: key = self.settings.frappe_api_key secret = self.settings.frappe_api_secret if not key or not secret: raise FrappeAPIError("FRAPPE_API_KEY and FRAPPE_API_SECRET must be set in .env") return {"Authorization": f"token {key}:{secret}"} def _headers(self, extra: dict | None = None) -> dict[str, str]: h = {**self._auth_header, "Content-Type": "application/json", "Accept": "application/json"} if self.settings.frappe_site_name: h["X-Frappe-Site-Name"] = self.settings.frappe_site_name if extra: h.update(extra) return h def _url(self, path: str) -> str: return f"{self.base_url}{path}" def _raise_for_frappe_error(self, data: dict) -> None: if "exc_type" in data or ("message" in data and data.get("exc_type")): raise FrappeAPIError( data.get("message", "Unknown Frappe error"), exc_type=data.get("exc_type", ""), ) async def get(self, path: str, params: dict | None = None) -> Any: async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client: resp = await client.get(self._url(path), headers=self._headers(), params=params) resp.raise_for_status() data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) def _handle_error_response(self, resp: httpx.Response) -> None: """Extract the real Frappe error message from 4xx/5xx responses.""" if resp.is_error: try: data = resp.json() # Frappe puts the real error in _server_messages or exc server_msgs = data.get("_server_messages", "") exc = data.get("exc", "") message = data.get("message", "") if server_msgs: import json as _json try: msgs = _json.loads(server_msgs) parsed = [_json.loads(m).get("message", m) if isinstance(m, str) else m for m in msgs] raise FrappeAPIError( " | ".join(str(p) for p in parsed), status_code=resp.status_code, ) except (ValueError, TypeError): pass if exc: last_line = [l for l in exc.strip().splitlines() if l.strip()] raise FrappeAPIError(last_line[-1] if last_line else exc, status_code=resp.status_code) if message: raise FrappeAPIError(message, status_code=resp.status_code) except FrappeAPIError: raise except Exception: pass resp.raise_for_status() async def post(self, path: str, payload: dict | None = None) -> Any: async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client: resp = await client.post(self._url(path), headers=self._headers(), json=payload or {}) self._handle_error_response(resp) data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) async def put(self, path: str, payload: dict | None = None) -> Any: async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client: resp = await client.put(self._url(path), headers=self._headers(), json=payload or {}) self._handle_error_response(resp) data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) async def delete(self, path: str) -> Any: async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client: resp = await client.delete(self._url(path), headers=self._headers()) resp.raise_for_status() data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) # --- Frappe-specific convenience methods --- async def call_method(self, method: str, **kwargs) -> Any: """Call a whitelisted Frappe server-side method.""" return await self.post(f"/api/method/{method}", payload=kwargs) async def get_doc(self, doctype: str, name: str) -> dict: return await self.get(f"/api/resource/{doctype}/{name}") async def get_list( self, doctype: str, fields: list[str] | None = None, filters: list | None = None, limit: int = 20, order_by: str = "modified desc", ) -> list[dict]: params: dict[str, Any] = {"limit": limit, "order_by": order_by} if fields: params["fields"] = json.dumps(fields) if filters: params["filters"] = json.dumps(filters) return await self.get(f"/api/resource/{doctype}", params=params) async def create_doc(self, doctype: str, data: dict) -> dict: return await self.post(f"/api/resource/{doctype}", payload=data) async def update_doc(self, doctype: str, name: str, data: dict) -> dict: return await self.put(f"/api/resource/{doctype}/{name}", payload=data) async def delete_doc(self, doctype: str, name: str) -> dict: return await self.delete(f"/api/resource/{doctype}/{name}")