Lacuna Architecture Overview¶
System Architecture¶
High-Level Components¶
┌─────────────────────────────────────────────────────────────────────┐
│ User Layer │
│ • Jupyter Notebooks • VS Code • CLI • Web Dashboard │
└────────────────┬────────────────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────────────┐
│ Lacuna Core Engine │
│ │
│ ┌─────────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Operation │ │ Classificat. │ │ Policy Engine │ │
│ │ Interceptor │→ │ Pipeline │→ │ (OPA) │ │
│ └─────────────────┘ └──────────────┘ └────────────────────┘ │
│ │ │ │ │
│ ↓ ↓ ↓ │
│ ┌─────────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Lineage │ │ Provenance │ │ Audit Logger │ │
│ │ Tracker │ │ Capture │ │ (PostgreSQL) │ │
│ └─────────────────┘ └──────────────┘ └────────────────────┘ │
└────────────────┬────────────────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────────────┐
│ Integration Layer │
│ • dbt • Databricks Unity Catalog • Snowflake • File Systems │
└─────────────────────────────────────────────────────────────────────┘
Detailed Component Architecture¶
1. Operation Interceptor¶
Purpose: Capture all data operations before execution
Implementations:
File System Operations (FUSE)
class LacunaFUSE:
"""Intercept file read/write operations."""
def open(self, path: str, flags: int) -> int:
# Before opening file
operation = DataOperation(
action="read" if flags & os.O_RDONLY else "write",
resource_type="file",
resource_id=path,
user=get_current_user()
)
# Classify and check policy
result = lacuna.evaluate(operation)
if not result.allowed:
raise PermissionError(result.reasoning)
# Log and proceed
lacuna.audit_log(operation, result)
return os.open(path, flags)
Database Operations (SQLAlchemy Middleware)
class LacunaQueryInterceptor:
"""Intercept SQL queries."""
def before_cursor_execute(self, conn, cursor, statement, params, ...):
# Parse SQL to extract tables
tables = parse_sql_tables(statement)
# Classify operation
operation = DataOperation(
action=get_sql_action(statement), # SELECT, INSERT, UPDATE
resource_type="table",
resource_ids=tables,
user=get_current_user()
)
# Check policy
result = lacuna.evaluate(operation)
if not result.allowed:
raise PermissionError(f"Query denied: {result.reasoning}")
Jupyter Notebook (IPython Magic)
@register_line_magic
@register_cell_magic
def lacuna(line, cell=None):
"""Intercept notebook operations."""
# Extract operations from code
operations = parse_python_code(cell or line)
# Check each operation
for op in operations:
result = lacuna.evaluate(op)
if not result.allowed:
# Show inline warning
display(HTML(format_warning(result)))
return
# Execute code
get_ipython().run_cell(cell or line)
2. Classification Pipeline¶
Three-Layer Design:
class ClassificationPipeline:
"""Three-layer classification with fallback."""
def classify(self, operation: DataOperation) -> Classification:
# Layer 1: Heuristics (<1ms)
result = self.heuristic_classifier.classify(operation)
if result.confidence > 0.9:
return result
# Layer 2: Embeddings (<10ms)
result = self.embedding_classifier.classify(operation)
if result.confidence > 0.8:
return result
# Layer 3: LLM (<200ms)
result = self.llm_classifier.classify(operation)
return result
Layer 1: Heuristic Classifier
class HeuristicClassifier:
"""Fast pattern matching."""
def __init__(self, patterns: List[Pattern]):
# Compile regex patterns
self.patterns = [
(re.compile(p.regex), p.tier, p.tags)
for p in patterns
]
def classify(self, operation: DataOperation) -> Classification:
# Check file path
for regex, tier, tags in self.patterns:
if regex.match(operation.resource_id):
return Classification(
tier=tier,
confidence=1.0,
tags=tags,
reasoning=f"Matched pattern: {regex.pattern}"
)
# Check for known PII columns
if operation.resource_type == "table":
columns = get_table_columns(operation.resource_id)
pii_columns = [c for c in columns if self.is_pii_column(c)]
if pii_columns:
return Classification(
tier=Tier.PROPRIETARY,
confidence=1.0,
tags=["PII"],
reasoning=f"Contains PII columns: {pii_columns}"
)
# No match
return Classification(tier=None, confidence=0.0)
Layer 2: Embedding Classifier
class EmbeddingClassifier:
"""Semantic similarity matching."""
def __init__(self, model_name: str, examples: Dict[Tier, List[str]]):
self.model = SentenceTransformer(model_name)
# Pre-compute embeddings for examples
self.example_embeddings = {}
for tier, texts in examples.items():
embeddings = self.model.encode(texts)
self.example_embeddings[tier] = embeddings
def classify(self, operation: DataOperation) -> Classification:
# Create description of operation
description = self.describe_operation(operation)
# Compute embedding
query_embedding = self.model.encode([description])[0]
# Find most similar example
best_tier = None
best_similarity = 0.0
for tier, examples in self.example_embeddings.items():
similarities = cosine_similarity([query_embedding], examples)[0]
max_sim = max(similarities)
if max_sim > best_similarity:
best_similarity = max_sim
best_tier = tier
return Classification(
tier=best_tier,
confidence=best_similarity,
reasoning=f"Semantic similarity: {best_similarity:.2f}"
)
Layer 3: LLM Classifier
class LLMClassifier:
"""LLM-based reasoning for complex cases."""
def classify(self, operation: DataOperation) -> Classification:
# Build context
context = self.build_context(operation)
# Construct prompt
prompt = f"""
Classify this data operation:
Action: {operation.action}
Resource: {operation.resource_id}
User: {operation.user.role}
Context: {context}
Classification tiers:
- PROPRIETARY: Competitive secrets, PII, regulated data
- INTERNAL: Internal use only, not competitive
- PUBLIC: Publicly available or could be
Respond with JSON:
{{
"tier": "PROPRIETARY" | "INTERNAL" | "PUBLIC",
"confidence": 0.0-1.0,
"reasoning": "explanation",
"tags": ["tag1", "tag2"]
}}
"""
# Call LLM
response = self.llm.chat([
{"role": "user", "content": prompt}
])
# Parse response
result = json.loads(response.content)
return Classification(
tier=Tier[result["tier"]],
confidence=result["confidence"],
reasoning=result["reasoning"],
tags=result["tags"]
)
3. Lineage Tracker¶
Purpose: Build complete data lineage graph
class LineageTracker:
"""Track data lineage across operations."""
def __init__(self):
self.graph = nx.DiGraph() # NetworkX directed graph
def track_operation(self, operation: DataOperation):
"""Record operation in lineage graph."""
# Add nodes
for source in operation.sources:
if source not in self.graph:
self.graph.add_node(source, **self.get_metadata(source))
self.graph.add_node(
operation.destination,
**self.get_metadata(operation.destination)
)
# Add edges (sources → destination)
for source in operation.sources:
self.graph.add_edge(
source,
operation.destination,
operation=operation.action,
timestamp=datetime.now(),
user=operation.user.id,
code=operation.code # Transformation code if available
)
def get_upstream(self, resource: str) -> List[str]:
"""Get all upstream dependencies."""
return list(nx.ancestors(self.graph, resource))
def get_downstream(self, resource: str) -> List[str]:
"""Get all downstream dependencies."""
return list(nx.descendants(self.graph, resource))
def get_lineage_chain(self, resource: str) -> List[str]:
"""Get full lineage path."""
# Find all paths to root nodes (sources with no parents)
roots = [n for n in self.graph.nodes() if self.graph.in_degree(n) == 0]
chains = []
for root in roots:
try:
path = nx.shortest_path(self.graph, root, resource)
chains.append(path)
except nx.NetworkXNoPath:
continue
return chains
Classification Inheritance
class ClassificationInheritance:
"""Apply inheritance rules to derived data."""
def infer_classification(
self,
operation: DataOperation,
source_classifications: List[Classification]
) -> Classification:
"""Infer classification for operation result."""
if operation.action == "join":
# Maximum classification of sources
max_tier = max(c.tier for c in source_classifications)
union_tags = set()
for c in source_classifications:
union_tags.update(c.tags)
return Classification(
tier=max_tier,
tags=list(union_tags),
reasoning="Inherited maximum classification from join sources"
)
elif operation.action == "aggregate":
# May downgrade if no individual-level data
if self.preserves_individual_data(operation):
# Group-by with low cardinality → individuals still identifiable
return max(source_classifications, key=lambda c: c.tier)
else:
# High cardinality aggregation → safe to downgrade
max_tier = max(c.tier for c in source_classifications)
downgraded = self.downgrade_tier(max_tier)
return Classification(
tier=downgraded,
tags=["DERIVED_FROM_" + max_tier.name],
reasoning="Downgraded due to aggregation without individual data"
)
elif operation.action == "filter":
# Inherit source classification
return source_classifications[0]
elif operation.action == "anonymize":
# Downgrade to INTERNAL if anonymization verified
if self.verify_anonymization(operation):
return Classification(
tier=Tier.INTERNAL,
tags=["ANONYMIZED"],
reasoning="Anonymization verified, downgraded from PROPRIETARY"
)
else:
# Keep original classification if anonymization insufficient
return source_classifications[0]
else:
# Default: inherit maximum classification
return max(source_classifications, key=lambda c: c.tier)
4. Policy Engine (OPA)¶
Integration with Open Policy Agent:
class OPAPolicyEngine:
"""Evaluate policies using OPA."""
def __init__(self, opa_url: str):
self.opa_url = opa_url
def evaluate(
self,
operation: DataOperation,
classification: Classification
) -> PolicyDecision:
"""Evaluate operation against policies."""
# Build OPA input
input_data = {
"action": operation.action,
"source": {
"classification": classification.tier.value,
"tags": classification.tags,
"lineage": operation.lineage_chain
},
"destination": {
"type": operation.destination_type,
"path": operation.destination,
"encrypted": operation.destination_encrypted
},
"user": {
"id": operation.user.id,
"role": operation.user.role,
"clearance": operation.user.clearance,
"department": operation.user.department
},
"context": {
"purpose": operation.purpose,
"environment": os.getenv("ENVIRONMENT", "production")
}
}
# Query OPA
response = requests.post(
f"{self.opa_url}/v1/data/governance/allow",
json={"input": input_data},
timeout=0.1 # 100ms timeout
)
result = response.json()["result"]
return PolicyDecision(
allowed=result.get("allow", False),
policy_id=result.get("policy_id"),
policy_version=result.get("policy_version"),
reasoning=result.get("reasoning", ""),
alternatives=result.get("alternatives", [])
)
Example OPA Policy:
package governance
# Allow export if all conditions met
allow {
# Not PROPRIETARY data
input.source.classification != "PROPRIETARY"
}
allow {
# Or destination is approved
approved_destination
}
allow {
# Or exception granted
input.exception.approved == true
}
# Helper: Check approved destinations
approved_destination {
input.destination.type == "governed_storage"
input.destination.encrypted == true
}
approved_destination {
input.destination.type == "database"
startswith(input.destination.path, "/approved/")
}
# Provide alternatives when denied
alternatives[msg] {
not allow
input.source.tags[_] == "PII"
msg := "Anonymize PII: lacuna.anonymize(data, pii_columns)"
}
alternatives[msg] {
not allow
msg := sprintf("Save to governed storage: %v", [approved_paths[0]])
}
# List of approved paths
approved_paths := [
"/governed/workspace/",
"s3://company-governed-data/"
]
5. Audit Logger¶
Tamper-Evident Logging with Hash Chains:
class AuditLogger:
"""ISO 27001-compliant audit logging."""
def __init__(self, backend: AuditBackend):
self.backend = backend
self.queue = Queue() # Async logging
# Start background writer
self.writer_thread = threading.Thread(
target=self._writer_loop,
daemon=True
)
self.writer_thread.start()
def log(self, event: AuditEvent):
"""Log audit event (non-blocking)."""
# Create audit record
record = AuditRecord(
event_id=str(uuid.uuid4()),
timestamp=self.get_ntp_time(),
event_type=event.type,
severity=event.severity,
user_id=event.user.id,
resource_id=event.resource_id,
action=event.action,
action_result=event.result,
# ... full record fields
)
# Queue for async write
self.queue.put(record)
def _writer_loop(self):
"""Background thread that writes batches."""
batch = []
while True:
try:
# Collect batch
while len(batch) < 100:
record = self.queue.get(timeout=1.0)
batch.append(record)
except Empty:
pass # Timeout, write what we have
# Write batch
if batch:
self._write_batch(batch)
batch = []
def _write_batch(self, records: List[AuditRecord]):
"""Write batch with hash chain."""
# Get last record hash
last_hash = self.backend.get_last_hash()
# Link records in batch
for i, record in enumerate(records):
if i == 0:
record.previous_record_hash = last_hash
else:
record.previous_record_hash = records[i-1].record_hash
# Compute this record's hash
record.record_hash = self._compute_hash(record)
last_hash = record.record_hash
# Write batch atomically
self.backend.write_batch(records)
Hash Computation:
def _compute_hash(self, record: AuditRecord) -> str:
"""Compute SHA-256 hash of record."""
# Serialize record deterministically
data = {
"event_id": record.event_id,
"timestamp": record.timestamp.isoformat(),
"event_type": record.event_type.value,
"user_id": record.user_id,
"resource_id": record.resource_id,
"action": record.action,
"action_result": record.action_result,
"previous_record_hash": record.previous_record_hash,
# Include all relevant fields...
}
serialized = json.dumps(data, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()
Data Flow Example¶
Scenario: User exports customer data
1. User Code:
customers.to_csv("~/Downloads/export.csv")
2. Operation Interceptor (IPython magic):
├─ Detects: file write operation
├─ Creates DataOperation object
└─ Passes to Classification Pipeline
3. Classification Pipeline:
├─ Layer 1 (Heuristics):
│ ├─ Checks file path patterns → no match
│ └─ Checks DataFrame columns → finds "email", "phone"
│ └─ Returns: PROPRIETARY (confidence: 1.0)
│
└─ [Layers 2, 3 skipped due to high confidence]
4. Lineage Tracker:
├─ Traces: customers.csv → customers DataFrame → export.csv
├─ Inherits classification: export.csv = PROPRIETARY
└─ Tags: [PII, CUSTOMER_DATA, EMAIL, PHONE]
5. Policy Engine (OPA):
├─ Query: Can user export PROPRIETARY to ~/Downloads?
├─ Policy evaluation:
│ ├─ Check: classification != PROPRIETARY → FALSE
│ ├─ Check: destination approved → FALSE
│ └─ Check: exception granted → FALSE
│
└─ Decision: DENY
└─ Alternatives:
• "Anonymize: lacuna.anonymize(customers, ['email', 'phone'])"
• "Save to: /governed/workspace/export.csv"
6. Audit Logger:
├─ Create audit record:
│ ├─ event_type: DATA_EXPORT
│ ├─ action_result: denied
│ ├─ reasoning: "Cannot export PROPRIETARY to unmanaged location"
│ └─ previous_record_hash: <hash of last record>
│
├─ Compute hash chain:
│ └─ record_hash: SHA256(record + previous_hash)
│
└─ Write to PostgreSQL (async)
7. User Feedback:
Display inline error with alternatives
8. Alerting (if configured):
├─ Check alert rules
├─ Match: "proprietary_data_export"
└─ Notify: #data-governance on Slack
Performance Characteristics¶
Latency Targets¶
| Component | Target | Typical | Notes |
|---|---|---|---|
| Heuristic Classification | <1ms | 0.5ms | Regex matching |
| Embedding Classification | <10ms | 5ms | Cached embeddings |
| LLM Classification | <200ms | 150ms | Rare (2% of ops) |
| Policy Evaluation (OPA) | <50ms | 20ms | Cached policies |
| Audit Logging | Non-blocking | 0ms (async) | Queued writes |
| Total (98% of ops) | <50ms | 25ms | Heuristic + policy |
| Total (2% of ops) | <300ms | 190ms | LLM + policy |
Throughput Targets¶
| Operation | Target | Scalability |
|---|---|---|
| Classifications/second | 1,000+ | Horizontal scaling |
| Policy evaluations/second | 5,000+ | OPA sidecars |
| Audit log writes/second | 10,000+ | Batching + async |
Deployment Architecture¶
Single-Tenant Deployment¶
┌──────────────────────────────────────────────────┐
│ Load Balancer │
└────────────┬─────────────────────────────────────┘
│
┌────────┴────────┐
│ │
┌───▼────┐ ┌────▼───┐
│ Lacuna │ │ Lacuna │ (Multiple instances)
│ Core │ │ Core │
└───┬────┘ └────┬───┘
│ │
└────────┬───────┘
│
┌────────▼────────┐
│ PostgreSQL │ (Audit logs)
│ Cluster │
└─────────────────┘
┌─────────────────┐
│ OPA Server │ (Policy engine)
└─────────────────┘
┌─────────────────┐
│ Redis Cache │ (Classifications, embeddings)
└─────────────────┘
Multi-Tenant Deployment¶
┌──────────────────────────────────────────────────┐
│ API Gateway + Tenant Router │
└────────┬─────────────────────────────────────────┘
│
┌────┴─────────────────┐
│ │
┌───▼──────┐ ┌─────▼────┐
│ Tenant A │ │ Tenant B │
│ Namespace│ │ Namespace│
└───┬──────┘ └─────┬────┘
│ │
┌───▼──────────────────────▼────┐
│ Shared PostgreSQL │
│ (Row-level security) │
└───────────────────────────────┘
┌────────────────────────────────┐
│ Shared OPA Server │
│ (Per-tenant policies) │
└────────────────────────────────┘
Summary¶
Lacuna's architecture provides:
✓ Modular design - Components can be deployed/scaled independently ✓ Performance - 98% of operations complete in <50ms ✓ Scalability - Horizontal scaling for all components ✓ Extensibility - Pluggable classifiers, policies, integrations ✓ Reliability - Async logging, graceful degradation ✓ Compliance - Tamper-evident audit logs, complete provenance