MCP-Frappe/frappe_mcp/audit_store.py
MOHAN 2ee93048e1 feat: Add tools for managing server scripts, client scripts, translations, assignment rules, user permissions, webhooks, API keys, and workflows
- Implemented server and client script management tools in `frappe_mcp/tools/scripts.py`
- Added translation and user permission management tools in `frappe_mcp/tools/translations.py`
- Created user and role management tools in `frappe_mcp/tools/users.py`
- Developed webhook and API key management tools in `frappe_mcp/tools/webhooks.py`
- Introduced workflow management tools in `frappe_mcp/tools/workflow_tools.py`
- Added `pyproject.toml` for project metadata and dependencies
2026-04-21 20:26:45 +05:30

56 lines
1.7 KiB
Python

"""
Local audit store — records every MCP tool call to a JSONL file on the MCP server.
This is MCP-level auditing (not Frappe-level). Complements Frappe's Version DocType.
"""
import json
import os
import time
from pathlib import Path
from datetime import datetime, timezone
_AUDIT_FILE = Path(os.environ.get("MCP_AUDIT_LOG", "frappe_mcp_audit.jsonl"))
def log_action(tool_name: str, arguments: dict, result_summary: str = "", risk: str = "low") -> str:
"""Append one audit record. Returns the record ID."""
record = {
"id": f"mcp-{int(time.time() * 1000)}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": tool_name,
"arguments": arguments,
"result_summary": result_summary[:200],
"risk": risk,
}
with _AUDIT_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
return record["id"]
def read_audit_log(limit: int = 50, tool_filter: str = "") -> list[dict]:
if not _AUDIT_FILE.exists():
return []
records = []
with _AUDIT_FILE.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
r = json.loads(line)
if tool_filter and tool_filter not in r.get("tool", ""):
continue
records.append(r)
except json.JSONDecodeError:
pass
return list(reversed(records))[:limit]
def clear_audit_log() -> int:
if not _AUDIT_FILE.exists():
return 0
with _AUDIT_FILE.open("r", encoding="utf-8") as f:
count = sum(1 for line in f if line.strip())
_AUDIT_FILE.unlink()
return count