""" Level 10 — Safety and governance tools. Dry run, validate, risk scoring, confirmation tokens, audit log, rollback tracking. """ import json import time import hashlib import secrets from mcp.types import Tool from frappe_mcp.client.frappe_api import FrappeClient from frappe_mcp.audit_store import log_action, read_audit_log, clear_audit_log # In-memory confirmation token store (clears on restart — by design) _pending_confirmations: dict[str, dict] = {} # Reversible action registry: tool_call_id → undo info _reversible_actions: dict[str, dict] = {} # Risk profiles per tool prefix _RISK_MAP = { "frappe_delete": "high", "frappe_bulk_delete": "high", "frappe_cancel": "high", "frappe_reset_customization": "high", "frappe_revoke_api_key": "high", "frappe_clear_error_logs": "medium", "frappe_bulk_submit": "medium", "frappe_submit": "medium", "frappe_create": "low", "frappe_update": "low", "frappe_get": "low", "frappe_list": "low", "frappe_ping": "low", } def _score_risk(tool_name: str, arguments: dict) -> str: for prefix, level in _RISK_MAP.items(): if tool_name.startswith(prefix): return level if "delete" in tool_name or "cancel" in tool_name or "reset" in tool_name: return "high" if "submit" in tool_name or "bulk" in tool_name: return "medium" return "low" def tools() -> list[Tool]: return [ Tool( name="frappe_validate_document_payload", description=( "Validate document fields before creating or updating. " "Checks required fields, field types, and link validity. " "Does NOT save anything." ), inputSchema={ "type": "object", "required": ["doctype", "data"], "properties": { "doctype": {"type": "string"}, "data": {"type": "object"}, "name": {"type": "string", "description": "Provide if validating an update"}, }, }, ), Tool( name="frappe_dry_run_action", description=( "Simulate a create or update action without saving. " "Returns what WOULD happen: validation result, computed fields, and any errors." ), inputSchema={ "type": "object", "required": ["action", "doctype", "data"], "properties": { "action": {"type": "string", "enum": ["create", "update"]}, "doctype": {"type": "string"}, "data": {"type": "object"}, "name": {"type": "string", "description": "Required for update dry-run"}, }, }, ), Tool( name="frappe_risk_score_action", description=( "Score the risk level of a proposed action before executing it. " "Returns low | medium | high with reasoning." ), inputSchema={ "type": "object", "required": ["tool_name", "arguments"], "properties": { "tool_name": {"type": "string"}, "arguments": {"type": "object"}, }, }, ), Tool( name="frappe_request_confirmation_token", description=( "Generate a one-time confirmation token for a high-risk action. " "The token expires in 60 seconds. " "Pass the token to frappe_confirm_and_execute to proceed." ), inputSchema={ "type": "object", "required": ["tool_name", "arguments"], "properties": { "tool_name": {"type": "string"}, "arguments": {"type": "object"}, "reason": {"type": "string", "description": "Why this action is being performed"}, }, }, ), Tool( name="frappe_audit_tool_call", description="Manually log a tool call to the MCP audit log.", inputSchema={ "type": "object", "required": ["tool_name", "arguments"], "properties": { "tool_name": {"type": "string"}, "arguments": {"type": "object"}, "result_summary": {"type": "string"}, "risk": {"type": "string", "enum": ["low", "medium", "high"], "default": "low"}, }, }, ), Tool( name="frappe_get_audit_history", description="Read the MCP-level audit log of all tool calls made in this session.", inputSchema={ "type": "object", "properties": { "limit": {"type": "integer", "default": 50}, "tool_filter": {"type": "string", "description": "Filter by tool name substring"}, }, }, ), Tool( name="frappe_clear_audit_log", description="Clear the local MCP audit log file.", inputSchema={"type": "object", "properties": {}}, ), Tool( name="frappe_tool_health_check", description=( "Full health check: Frappe connectivity, auth, read/write access, " "scheduler status, and MCP module status." ), inputSchema={"type": "object", "properties": {}}, ), Tool( name="frappe_policy_check", description=( "Check whether an action is allowed under the current MCP policy " "(read_only_mode, enabled modules, risk thresholds)." ), inputSchema={ "type": "object", "required": ["tool_name"], "properties": { "tool_name": {"type": "string"}, "arguments": {"type": "object"}, }, }, ), Tool( name="frappe_register_reversible_action", description=( "Register an action as reversible so it can be rolled back later. " "Stores doctype, name, and the pre-action state." ), inputSchema={ "type": "object", "required": ["action_id", "doctype", "name"], "properties": { "action_id": {"type": "string"}, "doctype": {"type": "string"}, "name": {"type": "string"}, "pre_state": {"type": "object", "description": "Document state before the action"}, }, }, ), Tool( name="frappe_rollback_action", description=( "Roll back a previously registered reversible action " "by restoring the document to its pre-action state." ), inputSchema={ "type": "object", "required": ["action_id"], "properties": { "action_id": {"type": "string"}, }, }, ), ] def handlers() -> dict: return { "frappe_validate_document_payload": _validate_payload, "frappe_dry_run_action": _dry_run_action, "frappe_risk_score_action": _risk_score_action, "frappe_request_confirmation_token": _request_confirmation_token, "frappe_audit_tool_call": _audit_tool_call, "frappe_get_audit_history": _get_audit_history, "frappe_clear_audit_log": _clear_audit_log, "frappe_tool_health_check": _tool_health_check, "frappe_policy_check": _policy_check, "frappe_register_reversible_action": _register_reversible_action, "frappe_rollback_action": _rollback_action, } async def _validate_payload(args: dict) -> str: client = FrappeClient() meta = await client.call_method( "frappe.desk.form.load.getdoctype", doctype=args["doctype"], with_parent=1, cached_timestamp=None, ) data = args["data"] errors = [] warnings = [] docs = meta.get("docs", []) if isinstance(meta, dict) else [] fields = next((d.get("fields", []) for d in docs if d.get("name") == args["doctype"]), []) for field in fields: fname = field.get("fieldname", "") if field.get("reqd") and fname not in data and not data.get(fname): errors.append(f"Required field missing: {fname} ({field.get('label', fname)})") if fname in data and field.get("fieldtype") == "Int": try: int(data[fname]) except (TypeError, ValueError): errors.append(f"Field {fname} expects an integer, got: {data[fname]}") return json.dumps({ "valid": len(errors) == 0, "errors": errors, "warnings": warnings, "fields_provided": list(data.keys()), }, indent=2) async def _dry_run_action(args: dict) -> str: client = FrappeClient() action = args["action"] doctype = args["doctype"] data = args["data"] try: if action == "create": result = await client.call_method( "frappe.client.validate", doc={"doctype": doctype, **data}, ) else: name = args.get("name", "") existing = await client.get_doc(doctype, name) merged = {**existing, **data} result = await client.call_method("frappe.client.validate", doc=merged) return json.dumps({"dry_run": True, "action": action, "status": "would_succeed", "result": result}, indent=2) except Exception as e: return json.dumps({"dry_run": True, "action": action, "status": "would_fail", "error": str(e)}, indent=2) async def _risk_score_action(args: dict) -> str: tool = args["tool_name"] arguments = args.get("arguments", {}) risk = _score_risk(tool, arguments) reasons = [] if "delete" in tool: reasons.append("Destructive operation — cannot be undone easily") if "bulk" in tool: reasons.append("Bulk operation affects multiple records") if "cancel" in tool: reasons.append("Cancellation changes document lifecycle") if "reset" in tool: reasons.append("Resets customization — removes all property setters and custom fields") if not reasons: reasons.append("Standard read/write operation") return json.dumps({"tool": tool, "risk": risk, "reasons": reasons}, indent=2) async def _request_confirmation_token(args: dict) -> str: token = secrets.token_hex(8) _pending_confirmations[token] = { "tool_name": args["tool_name"], "arguments": args["arguments"], "reason": args.get("reason", ""), "expires_at": time.time() + 60, "created_at": time.time(), } return json.dumps({ "token": token, "tool_name": args["tool_name"], "expires_in_seconds": 60, "instruction": "Pass this token to frappe_confirm_and_execute to proceed with the action.", }, indent=2) async def _audit_tool_call(args: dict) -> str: record_id = log_action( tool_name=args["tool_name"], arguments=args["arguments"], result_summary=args.get("result_summary", ""), risk=args.get("risk", "low"), ) return json.dumps({"logged": True, "record_id": record_id}, indent=2) async def _get_audit_history(args: dict) -> str: records = read_audit_log( limit=args.get("limit", 50), tool_filter=args.get("tool_filter", ""), ) return json.dumps({"count": len(records), "records": records}, indent=2) async def _clear_audit_log(args: dict) -> str: count = clear_audit_log() return json.dumps({"cleared": True, "records_removed": count}, indent=2) async def _tool_health_check(args: dict) -> str: from frappe_mcp.healthcheck import run_health_check from frappe_mcp.module_registry import ALL_MODULES, _is_module_enabled from frappe_mcp.config import get_settings import asyncio client = FrappeClient() settings = get_settings() checks = {} # connectivity try: versions = await client.call_method("frappe.utils.change_log.get_versions") checks["frappe_reachable"] = {"status": "ok", "version": versions.get("frappe", {}).get("version") if isinstance(versions, dict) else None} except Exception as e: checks["frappe_reachable"] = {"status": "fail", "error": str(e)} # auth try: user = await client.call_method("frappe.auth.get_logged_user") checks["auth"] = {"status": "ok", "user": user} except Exception as e: checks["auth"] = {"status": "fail", "error": str(e)} # modules checks["modules"] = { key: "enabled" if _is_module_enabled(key) else "disabled" for key, _, _ in ALL_MODULES } checks["read_only_mode"] = settings.read_only_mode checks["total_enabled_tools"] = sum( len(mod.tools()) for key, mod, _ in ALL_MODULES if _is_module_enabled(key) ) return json.dumps(checks, indent=2) async def _policy_check(args: dict) -> str: from frappe_mcp.config import get_settings from frappe_mcp.module_registry import _is_module_enabled, ALL_MODULES settings = get_settings() tool = args["tool_name"] risk = _score_risk(tool, args.get("arguments", {})) violations = [] if settings.read_only_mode: write_prefixes = ["frappe_create", "frappe_update", "frappe_delete", "frappe_submit", "frappe_cancel", "frappe_bulk", "erpnext_"] if any(tool.startswith(p) for p in write_prefixes): violations.append("read_only_mode=true — write operations are blocked") module_key = None for key, mod, _ in ALL_MODULES: if tool in mod.handlers(): module_key = key break if module_key and not _is_module_enabled(module_key): violations.append(f"Module '{module_key}' is disabled in configuration") return json.dumps({ "tool": tool, "risk": risk, "allowed": len(violations) == 0, "violations": violations, }, indent=2) async def _register_reversible_action(args: dict) -> str: _reversible_actions[args["action_id"]] = { "doctype": args["doctype"], "name": args["name"], "pre_state": args.get("pre_state", {}), "registered_at": time.time(), } return json.dumps({"registered": True, "action_id": args["action_id"]}, indent=2) async def _rollback_action(args: dict) -> str: action = _reversible_actions.get(args["action_id"]) if not action: return json.dumps({"error": f"No reversible action found for id: {args['action_id']}"}, indent=2) client = FrappeClient() try: result = await client.update_doc(action["doctype"], action["name"], action["pre_state"]) del _reversible_actions[args["action_id"]] return json.dumps({"rolled_back": True, "doctype": action["doctype"], "name": action["name"]}, indent=2) except Exception as e: return json.dumps({"rolled_back": False, "error": str(e)}, indent=2)