Policy Engine¶
The policy engine is the first line of defense in MCPKernel. Every tool call is evaluated against a set of rules before execution. Rules are defined in YAML and support regex matching, taint label conditions, and OWASP ASI 2026 mappings.
How It Works¶
Rules are evaluated by priority (lowest number = highest precedence). The most restrictive matching rule wins.
| Action | What Happens |
|---|---|
deny |
Block the tool call immediately |
sandbox |
Execute in sandboxed environment |
warn |
Allow but log a warning |
audit |
Allow and log for review |
allow |
Allow silently |
Tutorial: Your First Policy¶
Step 1: Create a policy file¶
# policies/my_policy.yaml
rules:
- id: block-shell
name: Block shell execution
description: Prevent any tool from running shell commands
action: deny
priority: 10
tool_patterns:
- "shell_.*"
- "os_command"
- "subprocess_run"
Step 2: Validate it¶
Output:
Step 3: Use it in Python¶
from mcpkernel.policy.engine import PolicyEngine, PolicyRule, PolicyAction
# Create an engine with deny-by-default
engine = PolicyEngine(default_action=PolicyAction.DENY)
# Add a rule that allows file reading
engine.add_rule(PolicyRule(
id="allow-read",
name="Allow file reading",
description="Permit read_file and list_dir tools",
action=PolicyAction.ALLOW,
priority=50,
tool_patterns=["read_file", "list_dir"],
))
# Add a rule that blocks shell access
engine.add_rule(PolicyRule(
id="block-shell",
name="Block shell execution",
action=PolicyAction.DENY,
priority=10,
tool_patterns=["shell_.*", "os_command"],
))
# Evaluate tool calls
result1 = engine.evaluate("read_file", {"path": "/tmp/data.csv"})
print(f"read_file: {result1.action}") # Output: read_file: allow
print(f"Allowed: {result1.allowed}") # Output: Allowed: True
result2 = engine.evaluate("shell_exec", {"command": "ls"})
print(f"shell_exec: {result2.action}") # Output: shell_exec: deny
print(f"Reasons: {result2.reasons}")
# Output: Reasons: ['[block-shell] Block shell execution: ']
result3 = engine.evaluate("unknown_tool", {})
print(f"unknown_tool: {result3.action}") # Output: unknown_tool: deny
# (deny-by-default catches everything not explicitly allowed)
Rule Matching in Detail¶
Tool Name Patterns (regex)¶
Tool patterns use Python regex with re.fullmatch():
rules:
- id: block-http
name: Block outbound HTTP
action: deny
tool_patterns:
- "http_.*" # Matches http_get, http_post, etc.
- "fetch_.*" # Matches fetch_url, fetch_data
- "api_call" # Exact match
# In Python
rule = PolicyRule(
id="block-http",
name="Block outbound HTTP",
action=PolicyAction.DENY,
tool_patterns=["http_.*", "fetch_.*", "api_call"],
)
Argument Patterns (regex on values)¶
Match specific argument values:
rules:
- id: block-traversal
name: Block path traversal
action: deny
priority: 10
tool_patterns:
- "file_read"
- "file_write"
argument_patterns:
path: "\\.\\./|/etc/|/proc/"
engine = PolicyEngine(default_action=PolicyAction.ALLOW)
engine.add_rule(PolicyRule(
id="block-traversal",
name="Block path traversal",
action=PolicyAction.DENY,
priority=10,
tool_patterns=["file_read", "file_write"],
argument_patterns={"path": r"\.\./|/etc/|/proc/"},
))
# Safe path — allowed
result = engine.evaluate("file_read", {"path": "/home/user/data.csv"})
print(f"Safe path: {result.action}") # Output: Safe path: allow
# Traversal attempt — blocked
result = engine.evaluate("file_read", {"path": "../../etc/passwd"})
print(f"Traversal: {result.action}") # Output: Traversal: deny
print(f"Reason: {result.reasons[0]}")
# Output: Reason: [block-traversal] Block path traversal:
Taint Label Conditions¶
Rules can trigger only when data carries specific taint labels:
rules:
- id: block-pii-exfil
name: Block PII in outbound calls
description: Prevent PII-tainted data from reaching HTTP sinks
action: deny
priority: 10
tool_patterns:
- "http_post"
- "send_email"
- "webhook_.*"
taint_labels:
- pii
- secret
owasp_asi_id: ASI-03
engine = PolicyEngine(default_action=PolicyAction.ALLOW)
engine.add_rule(PolicyRule(
id="block-pii-exfil",
name="Block PII exfiltration",
action=PolicyAction.DENY,
tool_patterns=["http_post", "send_email"],
taint_labels=["pii", "secret"],
))
# No taint — allowed
result = engine.evaluate("http_post", {"url": "https://api.example.com"})
print(f"No taint: {result.action}") # Output: No taint: allow
# With PII taint — blocked
result = engine.evaluate(
"http_post",
{"url": "https://api.example.com"},
taint_labels={"pii"},
)
print(f"With PII: {result.action}") # Output: With PII: deny
OWASP ASI 2026 Policy¶
MCPKernel ships with a complete OWASP ASI 2026 policy file:
# policies/owasp_asi_2026_strict.yaml (excerpt)
rules:
# ASI-01: Prompt Injection
- id: ASI-01-001
name: Block untrusted input in eval
action: deny
priority: 10
tool_patterns: [".*"]
taint_labels: [user_input, llm_output, untrusted_external]
owasp_asi_id: ASI-01
# ASI-02: Tool Misuse
- id: ASI-02-001
name: Sandbox all code execution
action: sandbox
priority: 20
tool_patterns: ["execute_.*", "run_.*", "shell_.*"]
owasp_asi_id: ASI-02
# ASI-03: Data Exfiltration
- id: ASI-03-001
name: Block PII in outbound calls
action: deny
priority: 10
tool_patterns: ["http_post", "send_email", "webhook_.*"]
taint_labels: [pii, secret]
owasp_asi_id: ASI-03
Use OWASP policy in code¶
from mcpkernel import MCPKernelProxy
async with MCPKernelProxy(
upstream=["http://localhost:3000/mcp"],
policy="owasp-asi-2026",
) as proxy:
# All calls enforced against OWASP ASI 2026 rules
result = await proxy.call_tool("read_file", {"path": "data.csv"})
Built-in Presets¶
from mcpkernel.api import POLICY_PRESETS
for name, config in POLICY_PRESETS.items():
print(f"{name}: {config['description']}")
Output:
permissive: Audit everything, block nothing. Good for development.
standard: Block known-dangerous patterns, audit the rest.
strict: Deny-by-default. Only explicitly allowed tools pass.
owasp-asi-2026: Full OWASP ASI 2026 compliance rule set.
Advanced: Multiple Rules and Precedence¶
When multiple rules match, the most restrictive action wins:
from mcpkernel.policy.engine import PolicyEngine, PolicyRule, PolicyAction
engine = PolicyEngine(default_action=PolicyAction.ALLOW)
# Lower priority number = higher precedence
engine.add_rule(PolicyRule(
id="audit-all",
name="Audit everything",
action=PolicyAction.AUDIT,
priority=100,
tool_patterns=[".*"],
))
engine.add_rule(PolicyRule(
id="block-shell",
name="Block shell",
action=PolicyAction.DENY,
priority=10,
tool_patterns=["shell_.*"],
))
# shell_exec matches both rules — DENY wins (more restrictive)
result = engine.evaluate("shell_exec", {})
print(f"shell_exec: {result.action}") # Output: shell_exec: deny
print(f"Rules matched: {len(result.matched_rules)}") # Output: Rules matched: 2
# read_file matches only audit rule
result = engine.evaluate("read_file", {})
print(f"read_file: {result.action}") # Output: read_file: audit
Rule precedence order (most to least restrictive):¶
deny— Block completelysandbox— Execute in isolationwarn— Allow with warningaudit— Allow and logallow— Allow silently