- Implemented server and client script management tools in `frappe_mcp/tools/scripts.py` - Added translation and user permission management tools in `frappe_mcp/tools/translations.py` - Created user and role management tools in `frappe_mcp/tools/users.py` - Developed webhook and API key management tools in `frappe_mcp/tools/webhooks.py` - Introduced workflow management tools in `frappe_mcp/tools/workflow_tools.py` - Added `pyproject.toml` for project metadata and dependencies
426 lines
15 KiB
Python
426 lines
15 KiB
Python
"""
|
|
Level 10 — Safety and governance tools.
|
|
Dry run, validate, risk scoring, confirmation tokens, audit log, rollback tracking.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import hashlib
|
|
import secrets
|
|
from mcp.types import Tool
|
|
from frappe_mcp.client.frappe_api import FrappeClient
|
|
from frappe_mcp.audit_store import log_action, read_audit_log, clear_audit_log
|
|
|
|
# In-memory confirmation token store (clears on restart — by design)
|
|
_pending_confirmations: dict[str, dict] = {}
|
|
|
|
# Reversible action registry: tool_call_id → undo info
|
|
_reversible_actions: dict[str, dict] = {}
|
|
|
|
# Risk profiles per tool prefix
|
|
_RISK_MAP = {
|
|
"frappe_delete": "high",
|
|
"frappe_bulk_delete": "high",
|
|
"frappe_cancel": "high",
|
|
"frappe_reset_customization": "high",
|
|
"frappe_revoke_api_key": "high",
|
|
"frappe_clear_error_logs": "medium",
|
|
"frappe_bulk_submit": "medium",
|
|
"frappe_submit": "medium",
|
|
"frappe_create": "low",
|
|
"frappe_update": "low",
|
|
"frappe_get": "low",
|
|
"frappe_list": "low",
|
|
"frappe_ping": "low",
|
|
}
|
|
|
|
|
|
def _score_risk(tool_name: str, arguments: dict) -> str:
|
|
for prefix, level in _RISK_MAP.items():
|
|
if tool_name.startswith(prefix):
|
|
return level
|
|
if "delete" in tool_name or "cancel" in tool_name or "reset" in tool_name:
|
|
return "high"
|
|
if "submit" in tool_name or "bulk" in tool_name:
|
|
return "medium"
|
|
return "low"
|
|
|
|
|
|
def tools() -> list[Tool]:
|
|
return [
|
|
Tool(
|
|
name="frappe_validate_document_payload",
|
|
description=(
|
|
"Validate document fields before creating or updating. "
|
|
"Checks required fields, field types, and link validity. "
|
|
"Does NOT save anything."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["doctype", "data"],
|
|
"properties": {
|
|
"doctype": {"type": "string"},
|
|
"data": {"type": "object"},
|
|
"name": {"type": "string", "description": "Provide if validating an update"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_dry_run_action",
|
|
description=(
|
|
"Simulate a create or update action without saving. "
|
|
"Returns what WOULD happen: validation result, computed fields, and any errors."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["action", "doctype", "data"],
|
|
"properties": {
|
|
"action": {"type": "string", "enum": ["create", "update"]},
|
|
"doctype": {"type": "string"},
|
|
"data": {"type": "object"},
|
|
"name": {"type": "string", "description": "Required for update dry-run"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_risk_score_action",
|
|
description=(
|
|
"Score the risk level of a proposed action before executing it. "
|
|
"Returns low | medium | high with reasoning."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["tool_name", "arguments"],
|
|
"properties": {
|
|
"tool_name": {"type": "string"},
|
|
"arguments": {"type": "object"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_request_confirmation_token",
|
|
description=(
|
|
"Generate a one-time confirmation token for a high-risk action. "
|
|
"The token expires in 60 seconds. "
|
|
"Pass the token to frappe_confirm_and_execute to proceed."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["tool_name", "arguments"],
|
|
"properties": {
|
|
"tool_name": {"type": "string"},
|
|
"arguments": {"type": "object"},
|
|
"reason": {"type": "string", "description": "Why this action is being performed"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_audit_tool_call",
|
|
description="Manually log a tool call to the MCP audit log.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["tool_name", "arguments"],
|
|
"properties": {
|
|
"tool_name": {"type": "string"},
|
|
"arguments": {"type": "object"},
|
|
"result_summary": {"type": "string"},
|
|
"risk": {"type": "string", "enum": ["low", "medium", "high"], "default": "low"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_get_audit_history",
|
|
description="Read the MCP-level audit log of all tool calls made in this session.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"limit": {"type": "integer", "default": 50},
|
|
"tool_filter": {"type": "string", "description": "Filter by tool name substring"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_clear_audit_log",
|
|
description="Clear the local MCP audit log file.",
|
|
inputSchema={"type": "object", "properties": {}},
|
|
),
|
|
Tool(
|
|
name="frappe_tool_health_check",
|
|
description=(
|
|
"Full health check: Frappe connectivity, auth, read/write access, "
|
|
"scheduler status, and MCP module status."
|
|
),
|
|
inputSchema={"type": "object", "properties": {}},
|
|
),
|
|
Tool(
|
|
name="frappe_policy_check",
|
|
description=(
|
|
"Check whether an action is allowed under the current MCP policy "
|
|
"(read_only_mode, enabled modules, risk thresholds)."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["tool_name"],
|
|
"properties": {
|
|
"tool_name": {"type": "string"},
|
|
"arguments": {"type": "object"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_register_reversible_action",
|
|
description=(
|
|
"Register an action as reversible so it can be rolled back later. "
|
|
"Stores doctype, name, and the pre-action state."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["action_id", "doctype", "name"],
|
|
"properties": {
|
|
"action_id": {"type": "string"},
|
|
"doctype": {"type": "string"},
|
|
"name": {"type": "string"},
|
|
"pre_state": {"type": "object", "description": "Document state before the action"},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="frappe_rollback_action",
|
|
description=(
|
|
"Roll back a previously registered reversible action "
|
|
"by restoring the document to its pre-action state."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"required": ["action_id"],
|
|
"properties": {
|
|
"action_id": {"type": "string"},
|
|
},
|
|
},
|
|
),
|
|
]
|
|
|
|
|
|
def handlers() -> dict:
|
|
return {
|
|
"frappe_validate_document_payload": _validate_payload,
|
|
"frappe_dry_run_action": _dry_run_action,
|
|
"frappe_risk_score_action": _risk_score_action,
|
|
"frappe_request_confirmation_token": _request_confirmation_token,
|
|
"frappe_audit_tool_call": _audit_tool_call,
|
|
"frappe_get_audit_history": _get_audit_history,
|
|
"frappe_clear_audit_log": _clear_audit_log,
|
|
"frappe_tool_health_check": _tool_health_check,
|
|
"frappe_policy_check": _policy_check,
|
|
"frappe_register_reversible_action": _register_reversible_action,
|
|
"frappe_rollback_action": _rollback_action,
|
|
}
|
|
|
|
|
|
async def _validate_payload(args: dict) -> str:
|
|
client = FrappeClient()
|
|
meta = await client.call_method(
|
|
"frappe.desk.form.load.getdoctype",
|
|
doctype=args["doctype"],
|
|
with_parent=1,
|
|
cached_timestamp=None,
|
|
)
|
|
data = args["data"]
|
|
errors = []
|
|
warnings = []
|
|
|
|
docs = meta.get("docs", []) if isinstance(meta, dict) else []
|
|
fields = next((d.get("fields", []) for d in docs if d.get("name") == args["doctype"]), [])
|
|
|
|
for field in fields:
|
|
fname = field.get("fieldname", "")
|
|
if field.get("reqd") and fname not in data and not data.get(fname):
|
|
errors.append(f"Required field missing: {fname} ({field.get('label', fname)})")
|
|
if fname in data and field.get("fieldtype") == "Int":
|
|
try:
|
|
int(data[fname])
|
|
except (TypeError, ValueError):
|
|
errors.append(f"Field {fname} expects an integer, got: {data[fname]}")
|
|
|
|
return json.dumps({
|
|
"valid": len(errors) == 0,
|
|
"errors": errors,
|
|
"warnings": warnings,
|
|
"fields_provided": list(data.keys()),
|
|
}, indent=2)
|
|
|
|
|
|
async def _dry_run_action(args: dict) -> str:
|
|
client = FrappeClient()
|
|
action = args["action"]
|
|
doctype = args["doctype"]
|
|
data = args["data"]
|
|
|
|
try:
|
|
if action == "create":
|
|
result = await client.call_method(
|
|
"frappe.client.validate",
|
|
doc={"doctype": doctype, **data},
|
|
)
|
|
else:
|
|
name = args.get("name", "")
|
|
existing = await client.get_doc(doctype, name)
|
|
merged = {**existing, **data}
|
|
result = await client.call_method("frappe.client.validate", doc=merged)
|
|
return json.dumps({"dry_run": True, "action": action, "status": "would_succeed", "result": result}, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"dry_run": True, "action": action, "status": "would_fail", "error": str(e)}, indent=2)
|
|
|
|
|
|
async def _risk_score_action(args: dict) -> str:
|
|
tool = args["tool_name"]
|
|
arguments = args.get("arguments", {})
|
|
risk = _score_risk(tool, arguments)
|
|
|
|
reasons = []
|
|
if "delete" in tool:
|
|
reasons.append("Destructive operation — cannot be undone easily")
|
|
if "bulk" in tool:
|
|
reasons.append("Bulk operation affects multiple records")
|
|
if "cancel" in tool:
|
|
reasons.append("Cancellation changes document lifecycle")
|
|
if "reset" in tool:
|
|
reasons.append("Resets customization — removes all property setters and custom fields")
|
|
if not reasons:
|
|
reasons.append("Standard read/write operation")
|
|
|
|
return json.dumps({"tool": tool, "risk": risk, "reasons": reasons}, indent=2)
|
|
|
|
|
|
async def _request_confirmation_token(args: dict) -> str:
|
|
token = secrets.token_hex(8)
|
|
_pending_confirmations[token] = {
|
|
"tool_name": args["tool_name"],
|
|
"arguments": args["arguments"],
|
|
"reason": args.get("reason", ""),
|
|
"expires_at": time.time() + 60,
|
|
"created_at": time.time(),
|
|
}
|
|
return json.dumps({
|
|
"token": token,
|
|
"tool_name": args["tool_name"],
|
|
"expires_in_seconds": 60,
|
|
"instruction": "Pass this token to frappe_confirm_and_execute to proceed with the action.",
|
|
}, indent=2)
|
|
|
|
|
|
async def _audit_tool_call(args: dict) -> str:
|
|
record_id = log_action(
|
|
tool_name=args["tool_name"],
|
|
arguments=args["arguments"],
|
|
result_summary=args.get("result_summary", ""),
|
|
risk=args.get("risk", "low"),
|
|
)
|
|
return json.dumps({"logged": True, "record_id": record_id}, indent=2)
|
|
|
|
|
|
async def _get_audit_history(args: dict) -> str:
|
|
records = read_audit_log(
|
|
limit=args.get("limit", 50),
|
|
tool_filter=args.get("tool_filter", ""),
|
|
)
|
|
return json.dumps({"count": len(records), "records": records}, indent=2)
|
|
|
|
|
|
async def _clear_audit_log(args: dict) -> str:
|
|
count = clear_audit_log()
|
|
return json.dumps({"cleared": True, "records_removed": count}, indent=2)
|
|
|
|
|
|
async def _tool_health_check(args: dict) -> str:
|
|
from frappe_mcp.healthcheck import run_health_check
|
|
from frappe_mcp.module_registry import ALL_MODULES, _is_module_enabled
|
|
from frappe_mcp.config import get_settings
|
|
import asyncio
|
|
|
|
client = FrappeClient()
|
|
settings = get_settings()
|
|
|
|
checks = {}
|
|
|
|
# connectivity
|
|
try:
|
|
versions = await client.call_method("frappe.utils.change_log.get_versions")
|
|
checks["frappe_reachable"] = {"status": "ok", "version": versions.get("frappe", {}).get("version") if isinstance(versions, dict) else None}
|
|
except Exception as e:
|
|
checks["frappe_reachable"] = {"status": "fail", "error": str(e)}
|
|
|
|
# auth
|
|
try:
|
|
user = await client.call_method("frappe.auth.get_logged_user")
|
|
checks["auth"] = {"status": "ok", "user": user}
|
|
except Exception as e:
|
|
checks["auth"] = {"status": "fail", "error": str(e)}
|
|
|
|
# modules
|
|
checks["modules"] = {
|
|
key: "enabled" if _is_module_enabled(key) else "disabled"
|
|
for key, _, _ in ALL_MODULES
|
|
}
|
|
checks["read_only_mode"] = settings.read_only_mode
|
|
checks["total_enabled_tools"] = sum(
|
|
len(mod.tools()) for key, mod, _ in ALL_MODULES if _is_module_enabled(key)
|
|
)
|
|
|
|
return json.dumps(checks, indent=2)
|
|
|
|
|
|
async def _policy_check(args: dict) -> str:
|
|
from frappe_mcp.config import get_settings
|
|
from frappe_mcp.module_registry import _is_module_enabled, ALL_MODULES
|
|
|
|
settings = get_settings()
|
|
tool = args["tool_name"]
|
|
risk = _score_risk(tool, args.get("arguments", {}))
|
|
|
|
violations = []
|
|
|
|
if settings.read_only_mode:
|
|
write_prefixes = ["frappe_create", "frappe_update", "frappe_delete", "frappe_submit",
|
|
"frappe_cancel", "frappe_bulk", "erpnext_"]
|
|
if any(tool.startswith(p) for p in write_prefixes):
|
|
violations.append("read_only_mode=true — write operations are blocked")
|
|
|
|
module_key = None
|
|
for key, mod, _ in ALL_MODULES:
|
|
if tool in mod.handlers():
|
|
module_key = key
|
|
break
|
|
if module_key and not _is_module_enabled(module_key):
|
|
violations.append(f"Module '{module_key}' is disabled in configuration")
|
|
|
|
return json.dumps({
|
|
"tool": tool,
|
|
"risk": risk,
|
|
"allowed": len(violations) == 0,
|
|
"violations": violations,
|
|
}, indent=2)
|
|
|
|
|
|
async def _register_reversible_action(args: dict) -> str:
|
|
_reversible_actions[args["action_id"]] = {
|
|
"doctype": args["doctype"],
|
|
"name": args["name"],
|
|
"pre_state": args.get("pre_state", {}),
|
|
"registered_at": time.time(),
|
|
}
|
|
return json.dumps({"registered": True, "action_id": args["action_id"]}, indent=2)
|
|
|
|
|
|
async def _rollback_action(args: dict) -> str:
|
|
action = _reversible_actions.get(args["action_id"])
|
|
if not action:
|
|
return json.dumps({"error": f"No reversible action found for id: {args['action_id']}"}, indent=2)
|
|
client = FrappeClient()
|
|
try:
|
|
result = await client.update_doc(action["doctype"], action["name"], action["pre_state"])
|
|
del _reversible_actions[args["action_id"]]
|
|
return json.dumps({"rolled_back": True, "doctype": action["doctype"], "name": action["name"]}, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"rolled_back": False, "error": str(e)}, indent=2)
|