""" Local audit store — records every MCP tool call to a JSONL file on the MCP server. This is MCP-level auditing (not Frappe-level). Complements Frappe's Version DocType. """ import json import os import time from pathlib import Path from datetime import datetime, timezone _AUDIT_FILE = Path(os.environ.get("MCP_AUDIT_LOG", "frappe_mcp_audit.jsonl")) def log_action(tool_name: str, arguments: dict, result_summary: str = "", risk: str = "low") -> str: """Append one audit record. Returns the record ID.""" record = { "id": f"mcp-{int(time.time() * 1000)}", "timestamp": datetime.now(timezone.utc).isoformat(), "tool": tool_name, "arguments": arguments, "result_summary": result_summary[:200], "risk": risk, } with _AUDIT_FILE.open("a", encoding="utf-8") as f: f.write(json.dumps(record) + "\n") return record["id"] def read_audit_log(limit: int = 50, tool_filter: str = "") -> list[dict]: if not _AUDIT_FILE.exists(): return [] records = [] with _AUDIT_FILE.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: r = json.loads(line) if tool_filter and tool_filter not in r.get("tool", ""): continue records.append(r) except json.JSONDecodeError: pass return list(reversed(records))[:limit] def clear_audit_log() -> int: if not _AUDIT_FILE.exists(): return 0 with _AUDIT_FILE.open("r", encoding="utf-8") as f: count = sum(1 for line in f if line.strip()) _AUDIT_FILE.unlink() return count