""" Async HTTP client for Frappe REST API. Handles API key/secret auth — works with any remote Frappe instance including Docker. """ import json import httpx from typing import Any from frappe_mcp.config import get_settings from frappe_mcp.session import get_session class FrappeAPIError(Exception): def __init__(self, message: str, status_code: int = 0, exc_type: str = ""): super().__init__(message) self.status_code = status_code self.exc_type = exc_type class FrappeClient: def __init__(self): session = get_session() if session: # SSE mode: credentials supplied per-connection via request headers self.base_url = session.frappe_url.rstrip("/") self._api_key = session.api_key self._api_secret = session.api_secret self._site_name = session.site_name self.request_timeout = session.request_timeout else: # Stdio/local mode: credentials from .env s = get_settings() self.base_url = s.frappe_url self._api_key = s.frappe_api_key self._api_secret = s.frappe_api_secret self._site_name = s.frappe_site_name self.request_timeout = s.request_timeout if not self._api_key or not self._api_secret: raise FrappeAPIError( "Frappe credentials not configured. " "In SSE mode pass X-Frappe-URL, X-Frappe-API-Key, X-Frappe-API-Secret headers. " "In local mode set FRAPPE_API_KEY and FRAPPE_API_SECRET in .env" ) def _build_auth_header(self) -> dict[str, str]: return {"Authorization": f"token {self._api_key}:{self._api_secret}"} def _headers(self, extra: dict | None = None) -> dict[str, str]: h = {**self._build_auth_header(), "Content-Type": "application/json", "Accept": "application/json"} if self._site_name: h["X-Frappe-Site-Name"] = self._site_name if extra: h.update(extra) return h def _url(self, path: str) -> str: return f"{self.base_url}{path}" def _raise_for_frappe_error(self, data: dict) -> None: if "exc_type" in data or ("message" in data and data.get("exc_type")): raise FrappeAPIError( data.get("message", "Unknown Frappe error"), exc_type=data.get("exc_type", ""), ) async def get(self, path: str, params: dict | None = None) -> Any: async with httpx.AsyncClient(timeout=self.request_timeout) as client: resp = await client.get(self._url(path), headers=self._headers(), params=params) resp.raise_for_status() data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) def _handle_error_response(self, resp: httpx.Response) -> None: """Extract the real Frappe error message from 4xx/5xx responses.""" if resp.is_error: try: data = resp.json() # Frappe puts the real error in _server_messages or exc server_msgs = data.get("_server_messages", "") exc = data.get("exc", "") message = data.get("message", "") if server_msgs: import json as _json try: msgs = _json.loads(server_msgs) parsed = [_json.loads(m).get("message", m) if isinstance(m, str) else m for m in msgs] raise FrappeAPIError( " | ".join(str(p) for p in parsed), status_code=resp.status_code, ) except (ValueError, TypeError): pass if exc: last_line = [l for l in exc.strip().splitlines() if l.strip()] raise FrappeAPIError(last_line[-1] if last_line else exc, status_code=resp.status_code) if message: raise FrappeAPIError(message, status_code=resp.status_code) except FrappeAPIError: raise except Exception: pass resp.raise_for_status() async def post(self, path: str, payload: dict | None = None) -> Any: async with httpx.AsyncClient(timeout=self.request_timeout) as client: resp = await client.post(self._url(path), headers=self._headers(), json=payload or {}) self._handle_error_response(resp) data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) async def put(self, path: str, payload: dict | None = None) -> Any: async with httpx.AsyncClient(timeout=self.request_timeout) as client: resp = await client.put(self._url(path), headers=self._headers(), json=payload or {}) self._handle_error_response(resp) data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) async def delete(self, path: str) -> Any: async with httpx.AsyncClient(timeout=self.request_timeout) as client: resp = await client.delete(self._url(path), headers=self._headers()) resp.raise_for_status() data = resp.json() self._raise_for_frappe_error(data) return data.get("data", data) # --- Frappe-specific convenience methods --- async def call_method(self, method: str, **kwargs) -> Any: """Call a whitelisted Frappe server-side method.""" return await self.post(f"/api/method/{method}", payload=kwargs) async def get_doc(self, doctype: str, name: str) -> dict: return await self.get(f"/api/resource/{doctype}/{name}") async def get_list( self, doctype: str, fields: list[str] | None = None, filters: list | None = None, limit: int = 20, order_by: str = "modified desc", ) -> list[dict]: params: dict[str, Any] = {"limit": limit, "order_by": order_by} if fields: params["fields"] = json.dumps(fields) if filters: params["filters"] = json.dumps(filters) return await self.get(f"/api/resource/{doctype}", params=params) async def create_doc(self, doctype: str, data: dict) -> dict: return await self.post(f"/api/resource/{doctype}", payload=data) async def update_doc(self, doctype: str, name: str, data: dict) -> dict: return await self.put(f"/api/resource/{doctype}/{name}", payload=data) async def delete_doc(self, doctype: str, name: str) -> dict: return await self.delete(f"/api/resource/{doctype}/{name}")