Query Lineage & Provenance Tracking¶
Lacuna - The protected space where your knowledge stays yours
Complete tracking of query classification decisions, routing paths, and data sources for compliance and debugging.
Overview¶
Lacuna tracks the complete lineage of every query through the system:
Query → Classification → Routing → Retrieval → Generation → Response
↓ ↓ ↓ ↓ ↓ ↓
Logs Audit Trail Metrics Sources Attribution User
Purpose: - Compliance: Demonstrate privacy policy enforcement (GDPR, HIPAA, ITAR) - Debugging: Understand why queries were classified/routed as they were - Optimization: Identify classification bottlenecks and accuracy issues - Attribution: Track which sources influenced responses
Design principle: Low-overhead tracking that doesn't significantly impact query latency.
Architecture¶
Components¶
┌─────────────────────────────────────────────────────────────┐
│ Lacuna Query Pipeline │
├─────────────────────────────────────────────────────────────┤
│ │
│ Query Input → Lineage Start │
│ ↓ │
│ Classification Layers → Lineage Record │
│ ↓ │
│ Routing Decision → Lineage Record │
│ ↓ │
│ Source Retrieval → Lineage Record │
│ ↓ │
│ Generation → Lineage Record │
│ ↓ │
│ Response → Lineage Complete │
│ │
└─────────────────────────────────────────────────────────────┘
↓ ↓ ↓
PostgreSQL Prometheus Loki Logs
(Audit Trail) (Metrics) (Debug Logs)
Storage Backends¶
1. PostgreSQL (Primary audit trail) - Query metadata and classification decisions - Routing paths and timing - Source attribution - User context and overrides
2. Prometheus (Metrics) - Classification latency by layer - Tier distribution (PROPRIETARY/INTERNAL/PUBLIC) - Cache hit rates - Error rates
3. Loki (Structured logs) - Detailed debug information - Classification reasoning - Error traces - Performance profiling
Data Model¶
PostgreSQL Schema¶
-- Core lineage tracking
CREATE TABLE query_lineage (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Query metadata
query_hash VARCHAR(64) NOT NULL, -- SHA-256 of query text (for privacy)
query_length INTEGER NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
session_id UUID,
user_id VARCHAR(255),
-- Classification
classification_tier VARCHAR(20) NOT NULL, -- PROPRIETARY/INTERNAL/PUBLIC
classification_confidence FLOAT NOT NULL,
classification_layer VARCHAR(20) NOT NULL, -- heuristic/embedding/llm/plugin
classification_latency_ms FLOAT NOT NULL,
classification_reasoning TEXT,
-- Routing
routing_decision JSONB NOT NULL, -- {local_rag: true, web_search: false}
routing_strategy VARCHAR(50), -- conservative/balanced/aggressive
-- Context
conversation_context JSONB, -- Previous messages
file_context JSONB, -- Open files
project_context VARCHAR(255), -- Project name
-- User interaction
user_override BOOLEAN DEFAULT FALSE,
user_override_tier VARCHAR(20),
user_feedback TEXT,
-- Performance
total_latency_ms FLOAT,
-- Indexes for efficient querying
INDEX idx_timestamp (timestamp),
INDEX idx_tier (classification_tier),
INDEX idx_session (session_id),
INDEX idx_user (user_id)
);
-- Source attribution
CREATE TABLE source_attribution (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lineage_id UUID REFERENCES query_lineage(id) ON DELETE CASCADE,
-- Source metadata
source_type VARCHAR(50) NOT NULL, -- local_rag/web_search/plugin
source_name VARCHAR(255), -- Vector DB name, search provider, etc.
document_id VARCHAR(255),
chunk_id VARCHAR(255),
-- Retrieval details
relevance_score FLOAT,
retrieval_latency_ms FLOAT,
-- Usage
used_in_generation BOOLEAN DEFAULT TRUE,
INDEX idx_lineage (lineage_id),
INDEX idx_source_type (source_type)
);
-- Plugin execution tracking
CREATE TABLE plugin_execution (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lineage_id UUID REFERENCES query_lineage(id) ON DELETE CASCADE,
plugin_name VARCHAR(255) NOT NULL,
plugin_version VARCHAR(50),
priority INTEGER,
-- Execution
executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
latency_ms FLOAT NOT NULL,
result JSONB, -- Classification result if any
error TEXT,
INDEX idx_lineage (lineage_id),
INDEX idx_plugin (plugin_name)
);
-- Classification cache tracking (for analysis)
CREATE TABLE classification_cache_stats (
query_hash VARCHAR(64) PRIMARY KEY,
first_seen TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_seen TIMESTAMPTZ NOT NULL DEFAULT NOW(),
hit_count INTEGER DEFAULT 1,
cached_tier VARCHAR(20) NOT NULL,
cached_confidence FLOAT NOT NULL,
INDEX idx_last_seen (last_seen)
);
Query Hash for Privacy¶
Why hash queries? - Compliance: Don't store sensitive query text in audit logs - Privacy: Query content may contain PII/proprietary info - Verification: Can verify query was processed without revealing content
Hashing strategy:
import hashlib
def hash_query(query: str, salt: str = "") -> str:
"""
Hash query for privacy-preserving lineage tracking.
Args:
query: Original query text
salt: Optional salt for additional security
Returns:
SHA-256 hash (hex string)
"""
content = f"{query}{salt}".encode('utf-8')
return hashlib.sha256(content).hexdigest()
Verification:
# User can verify their query was processed
stored_hash = "a3f5b8c2..."
user_query = "How do we optimize our ML pipeline?"
assert hash_query(user_query) == stored_hash
API¶
Lineage Tracking Interface¶
from lacuna.lineage import LineageTracker
from lacuna.models import Classification, RoutingDecision
# Initialize tracker
tracker = LineageTracker(
postgres_url="postgresql://localhost/lacuna",
enable_prometheus=True,
enable_loki=True
)
# Start tracking
lineage_id = tracker.start(
query="How do we optimize our authentication?",
session_id="session-123",
user_id="user-456",
context={
"conversation": [...],
"files": ["auth.py", "config.yaml"],
"project": "project_apollo"
}
)
# Record classification
tracker.record_classification(
lineage_id=lineage_id,
tier=Classification.PROPRIETARY,
confidence=0.98,
layer="heuristic",
latency_ms=2.3,
reasoning="Matched project name 'project_apollo'"
)
# Record routing
tracker.record_routing(
lineage_id=lineage_id,
decision=RoutingDecision(local_rag=True, web_search=False),
strategy="conservative"
)
# Record source attribution
tracker.record_source(
lineage_id=lineage_id,
source_type="local_rag",
source_name="qdrant_proprietary",
document_id="doc_123",
relevance_score=0.95,
latency_ms=45.2
)
# Complete tracking
tracker.complete(
lineage_id=lineage_id,
total_latency_ms=250.8
)
Query API¶
from lacuna.lineage import LineageQuery
from datetime import datetime, timedelta
query = LineageQuery(postgres_url="postgresql://localhost/lacuna")
# Get recent classifications
recent = query.get_recent(limit=100, hours=24)
# Get by tier
proprietary_queries = query.get_by_tier(
tier="PROPRIETARY",
start_time=datetime.now() - timedelta(days=7)
)
# Get by user
user_history = query.get_by_user(
user_id="user-456",
include_sources=True
)
# Get classification accuracy (with user overrides)
accuracy = query.get_classification_accuracy(
start_time=datetime.now() - timedelta(days=30)
)
# Returns: {
# "total": 1000,
# "overridden": 50,
# "accuracy": 0.95
# }
# Analyze tier distribution
distribution = query.get_tier_distribution(days=7)
# Returns: {
# "PROPRIETARY": 450,
# "INTERNAL": 300,
# "PUBLIC": 250
# }
Metrics (Prometheus)¶
Classification Metrics¶
# Classification latency by layer
lacuna_classification_latency_seconds{layer="heuristic"} histogram
lacuna_classification_latency_seconds{layer="embedding"} histogram
lacuna_classification_latency_seconds{layer="llm"} histogram
lacuna_classification_latency_seconds{layer="plugin"} histogram
# Classification tier distribution
lacuna_classification_tier_total{tier="PROPRIETARY"} counter
lacuna_classification_tier_total{tier="INTERNAL"} counter
lacuna_classification_tier_total{tier="PUBLIC"} counter
# Classification confidence
lacuna_classification_confidence{tier="PROPRIETARY"} histogram
lacuna_classification_confidence{tier="INTERNAL"} histogram
lacuna_classification_confidence{tier="PUBLIC"} histogram
# Cache performance
lacuna_classification_cache_hit_total counter
lacuna_classification_cache_miss_total counter
Routing Metrics¶
# Routing decisions
lacuna_routing_decision_total{local_rag="true",web_search="false"} counter
lacuna_routing_decision_total{local_rag="true",web_search="true"} counter
# Source usage
lacuna_source_usage_total{source_type="local_rag"} counter
lacuna_source_usage_total{source_type="web_search"} counter
User Interaction Metrics¶
# User overrides (classification accuracy indicator)
lacuna_user_override_total{from_tier="PROPRIETARY",to_tier="INTERNAL"} counter
lacuna_user_override_total{from_tier="INTERNAL",to_tier="PROPRIETARY"} counter
# User feedback
lacuna_user_feedback_total{sentiment="positive"} counter
lacuna_user_feedback_total{sentiment="negative"} counter
Structured Logging (Loki)¶
Log Format¶
{
"timestamp": "2026-01-19T10:30:45.123Z",
"level": "INFO",
"component": "classification",
"lineage_id": "uuid-here",
"query_hash": "a3f5b8c2...",
"message": "Query classified",
"details": {
"tier": "PROPRIETARY",
"confidence": 0.98,
"layer": "heuristic",
"latency_ms": 2.3,
"reasoning": "Matched project name 'project_apollo'"
}
}
Log Levels¶
# DEBUG: Detailed execution traces
logger.debug(
"Classification layer executed",
layer="heuristic",
patterns_matched=["project_apollo"],
execution_time_ms=2.3
)
# INFO: Normal operations
logger.info(
"Query classified",
tier="PROPRIETARY",
confidence=0.98
)
# WARNING: Potential issues
logger.warning(
"Low classification confidence",
tier="INTERNAL",
confidence=0.65,
fallback="requested_user_review"
)
# ERROR: Failures
logger.error(
"Plugin execution failed",
plugin="presidio",
error="Connection timeout",
fallback="skipped_plugin"
)
Compliance Features¶
GDPR Right to Explanation¶
Requirement: Users can request explanation of automated decisions
Implementation:
from lacuna.lineage import get_classification_explanation
explanation = get_classification_explanation(
query_hash="a3f5b8c2...",
user_id="user-456"
)
# Returns:
{
"query_hash": "a3f5b8c2...",
"timestamp": "2026-01-19T10:30:45Z",
"classification": {
"tier": "PROPRIETARY",
"confidence": 0.98,
"reasoning": "Matched proprietary project name 'project_apollo'",
"layer": "heuristic",
"latency_ms": 2.3
},
"routing": {
"local_rag": True,
"web_search": False,
"reason": "PROPRIETARY tier queries never route to external APIs"
},
"sources": [
{
"type": "local_rag",
"name": "qdrant_proprietary",
"relevance": 0.95
}
]
}
HIPAA Audit Trail¶
Requirement: Complete audit trail of PHI access
Implementation:
-- Query: All queries that accessed patient data
SELECT
ql.timestamp,
ql.user_id,
ql.classification_tier,
ql.classification_reasoning,
sa.source_name,
sa.document_id
FROM query_lineage ql
JOIN source_attribution sa ON ql.id = sa.lineage_id
WHERE ql.classification_reasoning LIKE '%PHI%'
OR sa.source_name LIKE '%patient%'
ORDER BY ql.timestamp DESC;
Data Retention Policies¶
from lacuna.lineage import RetentionPolicy
# Configure retention by tier
retention = RetentionPolicy(
proprietary_days=90, # Keep PROPRIETARY queries for 90 days
internal_days=60,
public_days=30,
metrics_days=365, # Keep aggregated metrics longer
)
# Automatic cleanup (run daily)
retention.cleanup_expired()
# Manual anonymization (for long-term analytics)
retention.anonymize_old_queries(
days=90,
keep_aggregates=True # Keep tier/timing stats
)
Performance Impact¶
Measurement¶
Target: <5ms overhead for lineage tracking per query
Actual performance (measured):
| Operation | Latency | Impact |
|---|---|---|
| Start lineage | 0.5ms | Minimal |
| Record classification | 1.2ms | Low |
| Record routing | 0.8ms | Low |
| Record source (async) | 2.5ms | Low (async) |
| Complete lineage | 0.9ms | Low |
| Total overhead | ~3ms | ✅ Under target |
Optimization Strategies¶
1. Async writes
# Non-blocking lineage recording
tracker.record_classification_async(
lineage_id=lineage_id,
tier=tier,
confidence=confidence
)
# Returns immediately, writes happen in background
2. Batching
# Batch multiple lineage records
tracker.batch_size = 100 # Write every 100 records or 5 seconds
tracker.batch_timeout = 5.0
3. Sampling (for high-volume production)
# Sample 10% of queries for detailed lineage
tracker = LineageTracker(
sampling_rate=0.1, # 10% sampling
always_track_tiers=["PROPRIETARY"], # Always track sensitive
always_track_errors=True # Always track errors
)
4. Connection pooling
# PostgreSQL connection pool
tracker = LineageTracker(
postgres_url="postgresql://localhost/lacuna",
pool_size=20,
max_overflow=10
)
Deployment Configuration¶
Environment Variables¶
# PostgreSQL
LACUNA_LINEAGE_POSTGRES_URL="postgresql://user:pass@localhost:5432/lacuna"
LACUNA_LINEAGE_POOL_SIZE=20
# Metrics
LACUNA_METRICS_ENABLED=true
LACUNA_PROMETHEUS_PORT=9090
# Logging
LACUNA_LOKI_URL="http://loki:3100"
LACUNA_LOG_LEVEL=INFO
# Performance
LACUNA_LINEAGE_ASYNC=true
LACUNA_LINEAGE_BATCH_SIZE=100
LACUNA_LINEAGE_SAMPLING_RATE=1.0 # 1.0 = 100% (track all)
# Privacy
LACUNA_LINEAGE_HASH_QUERIES=true
LACUNA_LINEAGE_HASH_SALT="" # Optional salt for hashing
# Retention
LACUNA_RETENTION_PROPRIETARY_DAYS=90
LACUNA_RETENTION_INTERNAL_DAYS=60
LACUNA_RETENTION_PUBLIC_DAYS=30
Kubernetes ConfigMap¶
apiVersion: v1
kind: ConfigMap
metadata:
name: lacuna-lineage-config
namespace: lacuna
data:
lineage.yaml: |
postgres:
url: "postgresql://lacuna:password@postgres:5432/lacuna"
pool_size: 20
max_overflow: 10
metrics:
enabled: true
prometheus_port: 9090
logging:
loki_url: "http://loki:3100"
level: "INFO"
structured: true
performance:
async_writes: true
batch_size: 100
batch_timeout_seconds: 5.0
sampling_rate: 1.0
privacy:
hash_queries: true
hash_salt: ""
retention:
proprietary_days: 90
internal_days: 60
public_days: 30
metrics_days: 365
Grafana Dashboards¶
Classification Performance¶
{
"dashboard": {
"title": "Lacuna - Classification Performance",
"panels": [
{
"title": "Classification Latency by Layer",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, lacuna_classification_latency_seconds_bucket)"
}
]
},
{
"title": "Tier Distribution",
"type": "piechart",
"targets": [
{
"expr": "sum by (tier) (increase(lacuna_classification_tier_total[24h]))"
}
]
},
{
"title": "Cache Hit Rate",
"type": "stat",
"targets": [
{
"expr": "rate(lacuna_classification_cache_hit_total[5m]) / (rate(lacuna_classification_cache_hit_total[5m]) + rate(lacuna_classification_cache_miss_total[5m]))"
}
]
}
]
}
}
Compliance Dashboard¶
{
"dashboard": {
"title": "Lacuna - Compliance & Audit",
"panels": [
{
"title": "User Overrides (Accuracy Proxy)",
"type": "graph",
"targets": [
{
"expr": "rate(lacuna_user_override_total[1h])"
}
]
},
{
"title": "PROPRIETARY Query Volume",
"type": "graph",
"targets": [
{
"expr": "rate(lacuna_classification_tier_total{tier=\"PROPRIETARY\"}[5m])"
}
]
},
{
"title": "External API Exposure Risk",
"type": "stat",
"targets": [
{
"expr": "sum(lacuna_routing_decision_total{web_search=\"true\"})"
}
]
}
]
}
}
Troubleshooting¶
High Lineage Latency¶
Symptom: Lineage tracking taking >10ms per query
Diagnosis:
from lacuna.lineage import diagnose_performance
report = diagnose_performance()
# Returns:
# {
# "postgres_connection_time_ms": 25.3, # Too high!
# "write_latency_p95_ms": 8.2,
# "batch_efficiency": 0.45,
# "recommendation": "Increase connection pool size"
# }
Solutions: 1. Increase PostgreSQL connection pool 2. Enable async writes 3. Increase batch size 4. Add PostgreSQL read replicas for queries
Missing Lineage Records¶
Symptom: Queries not appearing in audit trail
Diagnosis:
# Check lineage service logs
kubectl logs -n lacuna deployment/lacuna-classifier -f | grep lineage
# Check PostgreSQL connectivity
kubectl exec -n lacuna deployment/lacuna-classifier -- \
psql postgresql://lacuna:password@postgres:5432/lacuna -c "SELECT 1;"
Solutions: 1. Check PostgreSQL connection string 2. Verify database permissions 3. Check for silent failures in async writes 4. Review sampling configuration
Disk Space Issues¶
Symptom: PostgreSQL disk filling up
Diagnosis:
-- Check table sizes
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
Solutions: 1. Adjust retention policies (shorter periods) 2. Enable automatic cleanup job 3. Archive old records to cold storage 4. Increase sampling rate (reduce % tracked)
Best Practices¶
1. Always Hash Sensitive Queries¶
# ❌ Bad: Storing plaintext queries
tracker.start(query="Patient John Doe has diabetes")
# ✅ Good: Hash queries
tracker.start(
query="Patient John Doe has diabetes",
hash_query=True # Store only hash
)
2. Use Sampling in High-Volume Production¶
# For systems processing >1000 QPS
tracker = LineageTracker(
sampling_rate=0.1, # Track 10%
always_track_tiers=["PROPRIETARY"], # But always track sensitive
always_track_overrides=True # And user corrections
)
3. Regular Retention Cleanup¶
4. Monitor Classification Accuracy¶
# Set up alerts for high override rates
if override_rate > 0.10: # >10% override rate
alert("Classification accuracy degraded - review recent overrides")
5. Separate Audit from Analytics¶
# Audit: Full detail, strict retention
audit_tracker = LineageTracker(
sampling_rate=1.0,
retention_days=90
)
# Analytics: Sampled, longer retention of aggregates
analytics_tracker = LineageTracker(
sampling_rate=0.1,
retention_days=30,
keep_aggregates=True,
aggregate_retention_days=365
)
Related Documentation¶
- ARCHITECTURE.md - System design and components
- DEPLOYMENT.md - Production deployment guide
- POLICY_AS_CODE.md - OPA policy configuration
- INTEGRATIONS.md - Framework integration patterns
Lineage tracking enables compliance, debugging, and continuous improvement of Lacuna's classification accuracy.