Taint Tracking¶
Taint tracking labels data as it flows through tool calls — flagging secrets, PII, LLM output, and untrusted input. When tainted data reaches a sensitive sink (like an HTTP POST), MCPKernel blocks it.
How It Works¶
Every piece of data gets a taint label that follows it through the pipeline:
| Label | What It Catches |
|---|---|
secret |
API keys, tokens, passwords |
pii |
Names, emails, phone numbers, SSNs |
user_input |
Raw user-provided data |
llm_output |
LLM-generated content |
untrusted_external |
Data from external APIs |
custom |
Your own labels |
Tutorial: Basic Taint Tracking¶
Step 1: Mark data as tainted¶
from mcpkernel.taint.tracker import TaintTracker, TaintLabel
tracker = TaintTracker()
# Mark a value as containing a secret
tv = tracker.mark(
data="sk-abc123secretkey",
label=TaintLabel.SECRET,
source_id="openai-key",
)
print(f"Tainted: {tv.is_tainted}") # Output: Tainted: True
print(f"Labels: {tv.labels}") # Output: Labels: {<TaintLabel.SECRET: 'secret'>}
print(f"Source: {tv.source_id}") # Output: Source: openai-key
print(f"Provenance: {tv.provenance}") # Output: Provenance: ['marked:secret']
Step 2: Track multiple labels¶
# Mark PII data
pii_data = tracker.mark(
data="John Doe, john@example.com",
label=TaintLabel.PII,
source_id="user-profile",
)
# Mark user input
user_data = tracker.mark(
data="Please delete all files",
label=TaintLabel.USER_INPUT,
source_id="chat-message-42",
)
# Check overall taint state
print(f"Total tracked: {tracker.summary()}")
Output:
{
'total_tracked': 3,
'active_tainted': 3,
'by_label': {'secret': 1, 'pii': 1, 'user_input': 1},
'sanitizers': []
}
Step 3: Query tainted values¶
# Get all values with a specific label
secrets = tracker.get_by_label(TaintLabel.SECRET)
print(f"Secrets found: {len(secrets)}") # Output: Secrets found: 1
for s in secrets:
print(f" Source: {s.source_id}, Labels: {[l.value for l in s.labels]}")
# Output: Source: openai-key, Labels: ['secret']
# Get all tainted values
all_tainted = tracker.get_all_tainted()
print(f"Total tainted: {len(all_tainted)}") # Output: Total tainted: 3
Tutorial: Clearing Taint (Sanitization)¶
Taint can only be cleared with an explicit sanitizer — this creates an audit trail:
from mcpkernel.taint.tracker import TaintTracker, TaintLabel
tracker = TaintTracker()
# Register a known sanitizer
tracker.register_sanitizer("pii_redactor_v2")
print(f"Known sanitizer: {tracker.is_known_sanitizer('pii_redactor_v2')}")
# Output: Known sanitizer: True
# Mark data as PII
tv = tracker.mark("SSN: 123-45-6789", TaintLabel.PII, source_id="form-data")
print(f"Before: {tv.labels}") # Output: Before: {<TaintLabel.PII: 'pii'>}
# Clear the taint with sanitizer justification
tracker.clear("form-data", TaintLabel.PII, sanitizer="pii_redactor_v2")
tv = tracker.get("form-data")
print(f"After: {tv.labels}") # Output: After: set()
print(f"Provenance: {tv.provenance}")
# Output: Provenance: ['marked:pii', 'cleared:pii:by:pii_redactor_v2']
Audit Trail
Every clear() operation records which sanitizer was used and when. This is critical for compliance — you can prove that PII was properly handled.
Tutorial: Taint in the Security Pipeline¶
When you use MCPKernelProxy, taint tracking is automatic:
import asyncio
from mcpkernel import MCPKernelProxy, PolicyViolation
async def demo():
async with MCPKernelProxy(
policy="strict",
taint=True, # Enable taint tracking
audit=True,
) as proxy:
# This tool call has its arguments scanned for secrets
try:
result = await proxy.call_tool("http_post", {
"url": "https://api.example.com",
"body": "API key: sk-abc123secretkey", # Secret detected!
})
except PolicyViolation as e:
print(f"Blocked: {e}")
# Output: Blocked: [policy-deny] Taint violation: secret detected
asyncio.run(demo())
Configuration¶
# config.yaml
taint:
mode: full # full | light | off
block_on_violation: true
pii_patterns_enabled: true
static_analysis_enabled: true
| Mode | Behavior |
|---|---|
full |
Every argument and result scanned; blocks on violation |
light |
Scan arguments only; log but don't block |
off |
Taint tracking disabled |
Available Taint Labels¶
from mcpkernel.taint.tracker import TaintLabel
# All available labels
for label in TaintLabel:
print(f" {label.value}")
Output:
Advanced: Building a TaintedValue Manually¶
from mcpkernel.taint.tracker import TaintedValue, TaintLabel
# Create a value with multiple taint labels
tv = TaintedValue(
value="User said: my SSN is 123-45-6789",
labels={TaintLabel.USER_INPUT, TaintLabel.PII},
source_id="chat-msg-99",
provenance=["marked:user_input", "marked:pii"],
metadata={"session_id": "abc123", "agent": "assistant"},
)
print(f"Value tainted: {tv.is_tainted}") # True
print(f"Labels: {sorted(l.value for l in tv.labels)}") # ['pii', 'user_input']
# Add another label
tv.add_label(TaintLabel.UNTRUSTED_EXTERNAL)
print(f"Labels now: {sorted(l.value for l in tv.labels)}")
# Output: ['pii', 'untrusted_external', 'user_input']
Pattern: Taint + Policy Engine Together¶
The most powerful pattern is combining taint labels with policy rules:
from mcpkernel.taint.tracker import TaintTracker, TaintLabel
from mcpkernel.policy.engine import PolicyEngine, PolicyRule, PolicyAction
# Set up taint tracking
tracker = TaintTracker()
tracker.mark("user query", TaintLabel.USER_INPUT, source_id="msg-1")
# Set up policy with taint-aware rules
engine = PolicyEngine(default_action=PolicyAction.ALLOW)
engine.add_rule(PolicyRule(
id="block-user-input-in-exec",
name="Block user input in code execution",
action=PolicyAction.DENY,
priority=10,
tool_patterns=["execute_.*", "run_.*"],
taint_labels=["user_input"],
))
# Evaluate with taint context
result = engine.evaluate(
"execute_code",
{"code": "print('hello')"},
taint_labels={"user_input"},
)
print(f"Action: {result.action}") # Output: Action: deny
print(f"OWASP: {result.metadata}")
# Output: OWASP: {'owasp_asi_ids': []}
This is exactly how MCPKernel detects data exfiltration (ASI-03) — PII-tainted data reaching an HTTP sink triggers a deny.