162 lines
6.6 KiB
Python
162 lines
6.6 KiB
Python
"""
|
|
Async HTTP client for Frappe REST API.
|
|
Handles API key/secret auth — works with any remote Frappe instance including Docker.
|
|
"""
|
|
|
|
import json
|
|
import httpx
|
|
from typing import Any
|
|
from frappe_mcp.config import get_settings
|
|
from frappe_mcp.session import get_session
|
|
|
|
|
|
class FrappeAPIError(Exception):
|
|
def __init__(self, message: str, status_code: int = 0, exc_type: str = ""):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
self.exc_type = exc_type
|
|
|
|
|
|
class FrappeClient:
|
|
def __init__(self):
|
|
session = get_session()
|
|
if session:
|
|
# SSE mode: credentials supplied per-connection via request headers
|
|
self.base_url = session.frappe_url.rstrip("/")
|
|
self._api_key = session.api_key
|
|
self._api_secret = session.api_secret
|
|
self._site_name = session.site_name
|
|
self.request_timeout = session.request_timeout
|
|
else:
|
|
# Stdio/local mode: credentials from .env
|
|
s = get_settings()
|
|
self.base_url = s.frappe_url
|
|
self._api_key = s.frappe_api_key
|
|
self._api_secret = s.frappe_api_secret
|
|
self._site_name = s.frappe_site_name
|
|
self.request_timeout = s.request_timeout
|
|
|
|
if not self._api_key or not self._api_secret:
|
|
raise FrappeAPIError(
|
|
"Frappe credentials not configured. "
|
|
"In SSE mode pass X-Frappe-URL, X-Frappe-API-Key, X-Frappe-API-Secret headers. "
|
|
"In local mode set FRAPPE_API_KEY and FRAPPE_API_SECRET in .env"
|
|
)
|
|
|
|
def _build_auth_header(self) -> dict[str, str]:
|
|
return {"Authorization": f"token {self._api_key}:{self._api_secret}"}
|
|
|
|
def _headers(self, extra: dict | None = None) -> dict[str, str]:
|
|
h = {**self._build_auth_header(), "Content-Type": "application/json", "Accept": "application/json"}
|
|
if self._site_name:
|
|
h["X-Frappe-Site-Name"] = self._site_name
|
|
if extra:
|
|
h.update(extra)
|
|
return h
|
|
|
|
def _url(self, path: str) -> str:
|
|
return f"{self.base_url}{path}"
|
|
|
|
def _raise_for_frappe_error(self, data: dict) -> None:
|
|
if "exc_type" in data or ("message" in data and data.get("exc_type")):
|
|
raise FrappeAPIError(
|
|
data.get("message", "Unknown Frappe error"),
|
|
exc_type=data.get("exc_type", ""),
|
|
)
|
|
|
|
async def get(self, path: str, params: dict | None = None) -> Any:
|
|
async with httpx.AsyncClient(timeout=self.request_timeout) as client:
|
|
resp = await client.get(self._url(path), headers=self._headers(), params=params)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
self._raise_for_frappe_error(data)
|
|
return data.get("data", data)
|
|
|
|
def _handle_error_response(self, resp: httpx.Response) -> None:
|
|
"""Extract the real Frappe error message from 4xx/5xx responses."""
|
|
if resp.is_error:
|
|
try:
|
|
data = resp.json()
|
|
# Frappe puts the real error in _server_messages or exc
|
|
server_msgs = data.get("_server_messages", "")
|
|
exc = data.get("exc", "")
|
|
message = data.get("message", "")
|
|
if server_msgs:
|
|
import json as _json
|
|
try:
|
|
msgs = _json.loads(server_msgs)
|
|
parsed = [_json.loads(m).get("message", m) if isinstance(m, str) else m for m in msgs]
|
|
raise FrappeAPIError(
|
|
" | ".join(str(p) for p in parsed),
|
|
status_code=resp.status_code,
|
|
)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if exc:
|
|
last_line = [l for l in exc.strip().splitlines() if l.strip()]
|
|
raise FrappeAPIError(last_line[-1] if last_line else exc, status_code=resp.status_code)
|
|
if message:
|
|
raise FrappeAPIError(message, status_code=resp.status_code)
|
|
except FrappeAPIError:
|
|
raise
|
|
except Exception:
|
|
pass
|
|
resp.raise_for_status()
|
|
|
|
async def post(self, path: str, payload: dict | None = None) -> Any:
|
|
async with httpx.AsyncClient(timeout=self.request_timeout) as client:
|
|
resp = await client.post(self._url(path), headers=self._headers(), json=payload or {})
|
|
self._handle_error_response(resp)
|
|
data = resp.json()
|
|
self._raise_for_frappe_error(data)
|
|
return data.get("data", data)
|
|
|
|
async def put(self, path: str, payload: dict | None = None) -> Any:
|
|
async with httpx.AsyncClient(timeout=self.request_timeout) as client:
|
|
resp = await client.put(self._url(path), headers=self._headers(), json=payload or {})
|
|
self._handle_error_response(resp)
|
|
data = resp.json()
|
|
self._raise_for_frappe_error(data)
|
|
return data.get("data", data)
|
|
|
|
async def delete(self, path: str) -> Any:
|
|
async with httpx.AsyncClient(timeout=self.request_timeout) as client:
|
|
resp = await client.delete(self._url(path), headers=self._headers())
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
self._raise_for_frappe_error(data)
|
|
return data.get("data", data)
|
|
|
|
# --- Frappe-specific convenience methods ---
|
|
|
|
async def call_method(self, method: str, **kwargs) -> Any:
|
|
"""Call a whitelisted Frappe server-side method."""
|
|
return await self.post(f"/api/method/{method}", payload=kwargs)
|
|
|
|
async def get_doc(self, doctype: str, name: str) -> dict:
|
|
return await self.get(f"/api/resource/{doctype}/{name}")
|
|
|
|
async def get_list(
|
|
self,
|
|
doctype: str,
|
|
fields: list[str] | None = None,
|
|
filters: list | None = None,
|
|
limit: int = 20,
|
|
order_by: str = "modified desc",
|
|
) -> list[dict]:
|
|
params: dict[str, Any] = {"limit": limit, "order_by": order_by}
|
|
if fields:
|
|
params["fields"] = json.dumps(fields)
|
|
if filters:
|
|
params["filters"] = json.dumps(filters)
|
|
return await self.get(f"/api/resource/{doctype}", params=params)
|
|
|
|
async def create_doc(self, doctype: str, data: dict) -> dict:
|
|
return await self.post(f"/api/resource/{doctype}", payload=data)
|
|
|
|
async def update_doc(self, doctype: str, name: str, data: dict) -> dict:
|
|
return await self.put(f"/api/resource/{doctype}/{name}", payload=data)
|
|
|
|
async def delete_doc(self, doctype: str, name: str) -> dict:
|
|
return await self.delete(f"/api/resource/{doctype}/{name}")
|