Skip to content

Policy as Code with Open Policy Agent

Lacuna - The protected space where your knowledge stays yours

Define classification rules as code using Open Policy Agent (OPA) for governance-driven query routing.


Overview

Policy-as-Code separates classification rules (owned by compliance/security) from implementation (owned by engineering).

Why Policy-as-Code?

Traditional approach (hardcoded rules):

# Engineering owns both policy AND implementation
if "patient" in query or "PHI" in query:
    return Classification.PROPRIETARY

Problems: - Compliance team can't modify rules without engineering - No audit trail of policy changes - Rules scattered across codebase - Can't test policies independently

Policy-as-Code approach:

# Compliance team owns policy (security.rego)
package lacuna.classification

proprietary[reason] {
    contains(lower(input.query), "patient")
    reason := "Query contains PHI indicator"
}

Benefits: - ✅ Compliance team controls rules - ✅ Version-controlled policies (Git) - ✅ Policy testing framework - ✅ Audit trail via Git history - ✅ Centralized governance


Architecture

┌──────────────────────────────────────────────────────────┐
│                   Lacuna Classifier                      │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  Query → Heuristics → Embeddings → OPA → LLM → Result    │
│              ↓            ↓         ↓     ↓              │
│          Fast path    Semantic   Policy  Fallback        │
│           (1ms)       (10ms)    (5ms)   (200ms)          │
│                                   ↓                      │
│                         ┌─────────────────┐              │
│                         │   OPA Server    │              │
│                         │  (Port 8181)    │              │
│                         ├─────────────────┤              │
│                         │ policies/       │              │
│                         │  ├─ base.rego   │              │
│                         │  ├─ healthcare  │              │
│                         │  ├─ finance     │              │
│                         │  └─ custom      │              │
│                         └─────────────────┘              │
│                                                          │
└──────────────────────────────────────────────────────────┘

Getting Started

Installation

1. Install OPA

# Using Docker (recommended)
docker run -p 8181:8181 \
  -v $(pwd)/policies:/policies \
  openpolicyagent/opa:latest \
  run --server --addr :8181 /policies

# Or install binary
curl -L -o opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64
chmod +x opa
./opa run --server --addr :8181 policies/

2. Configure Lacuna

# config/plugins.yaml
plugins:
  opa:
    enabled: true
    priority: 10  # High priority (policy enforcement)
    endpoint: "http://localhost:8181"
    policy_path: "lacuna/classification"
    timeout: 1.0  # seconds
    fallback_on_error: true

3. Write first policy

# policies/base.rego
package lacuna.classification

import future.keywords.if
import future.keywords.in

# Classification result structure
classification[result] {
    result := {
        "tier": tier,
        "confidence": confidence,
        "reasoning": reasoning
    }
    tier := proprietary_tier
    confidence := 1.0
    reasoning := proprietary_reason
}

classification[result] {
    result := {
        "tier": tier,
        "confidence": confidence,
        "reasoning": reasoning
    }
    tier := "INTERNAL"
    confidence := 0.9
    reasoning := internal_reason
}

classification[result] {
    result := {
        "tier": "PUBLIC",
        "confidence": 0.8,
        "reasoning": "No sensitive patterns detected"
    }
    not proprietary_tier
    not internal_reason
}

# Proprietary rules
proprietary_tier := "PROPRIETARY"
proprietary_reason := reason {
    # Check for project names
    project := input.context.project
    project != "learning"
    reason := sprintf("Project context indicates proprietary: %s", [project])
}

proprietary_reason := reason {
    # Check for customer references
    contains(lower(input.query), "customer")
    reason := "Query references customers"
}

# Internal rules  
internal_reason := "Query about internal processes" {
    process_terms := ["deployment", "infrastructure", "monitoring"]
    some term in process_terms
    contains(lower(input.query), term)
}

# Helper functions
lower(s) := lower {
    lower := lower(s)
}

Testing Policies

# Test policy locally
opa test policies/ -v

# Test specific scenario
echo '{
  "query": "How do we deploy to customer_acme?",
  "context": {
    "project": "production"
  }
}' | opa eval --data policies/ \
  --input - \
  'data.lacuna.classification.classification'

Policy Structure

Input Schema

OPA receives query context from Lacuna:

{
  "query": "How do we optimize authentication?",
  "context": {
    "project": "project_apollo",
    "conversation": [
      {"role": "user", "content": "Tell me about our auth"},
      {"role": "assistant", "content": "Our auth uses..."}
    ],
    "files": ["auth.py", "config.yaml"],
    "user_id": "user-123",
    "session_id": "session-456",
    "timestamp": "2026-01-19T10:30:00Z"
  }
}

Output Schema

OPA returns classification decision:

{
  "tier": "PROPRIETARY",
  "confidence": 0.95,
  "reasoning": "Matched project name 'project_apollo'",
  "metadata": {
    "matched_rules": ["project_context", "proprietary_term"],
    "policy_version": "1.2.0"
  }
}

Policy Examples

Healthcare (HIPAA)

# policies/healthcare.rego
package lacuna.classification

import future.keywords.if

# PHI indicators
phi_terms := [
    "patient",
    "diagnosis",
    "treatment",
    "medical record",
    "health information",
    "prescription",
    "ssn",
    "date of birth"
]

# PROPRIETARY: Contains PHI
proprietary_tier := "PROPRIETARY" {
    some term in phi_terms
    contains(lower(input.query), term)
}

proprietary_reason := reason {
    matched_terms := [term | 
        some term in phi_terms
        contains(lower(input.query), term)
    ]
    count(matched_terms) > 0
    reason := sprintf("Query contains PHI indicators: %v", [matched_terms])
}

# PROPRIETARY: Patient ID patterns
proprietary_tier := "PROPRIETARY" {
    regex.match(`\bMRN[-\s]?\d{6,}`, input.query)
}

proprietary_reason := "Query contains medical record number pattern" {
    regex.match(`\bMRN[-\s]?\d{6,}`, input.query)
}

# INTERNAL: Clinical workflows (no patient data)
internal_tier := "INTERNAL" {
    workflow_terms := ["protocol", "procedure", "guideline", "workflow"]
    some term in workflow_terms
    contains(lower(input.query), term)
    not proprietary_tier
}

Finance (PCI-DSS, SOX)

# policies/finance.rego
package lacuna.classification

import future.keywords.if

# PCI-DSS: Payment card data
proprietary_tier := "PROPRIETARY" {
    pci_terms := [
        "credit card",
        "card number",
        "cvv",
        "cardholder",
        "payment",
        "transaction"
    ]
    some term in pci_terms
    contains(lower(input.query), term)
}

# Credit card number pattern
proprietary_tier := "PROPRIETARY" {
    regex.match(`\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b`, input.query)
}

# SOX: Financial reporting
proprietary_tier := "PROPRIETARY" {
    sox_terms := [
        "earnings",
        "revenue",
        "quarterly results",
        "financial statement",
        "audit"
    ]
    some term in sox_terms
    contains(lower(input.query), term)
    # Additional context check
    input.context.project in ["finance", "accounting"]
}

proprietary_reason := reason {
    matched := [term | 
        term := ["earnings", "revenue", "quarterly results"][_]
        contains(lower(input.query), term)
    ]
    count(matched) > 0
    reason := sprintf("Query contains SOX-sensitive terms: %v", [matched])
}

Defense (ITAR/CUI)

# policies/defense.rego
package lacuna.classification

import future.keywords.if

# ITAR controlled information
proprietary_tier := "PROPRIETARY" {
    itar_terms := [
        "export control",
        "itar",
        "munitions",
        "defense article",
        "technical data",
        "classified"
    ]
    some term in itar_terms
    contains(lower(input.query), term)
}

# CUI (Controlled Unclassified Information)
proprietary_tier := "PROPRIETARY" {
    cui_markers := [
        "cui",
        "controlled unclassified",
        "fouo",  # For Official Use Only
        "noforn"  # No Foreign Nationals
    ]
    some marker in cui_markers
    contains(lower(input.query), marker)
}

# Security classification levels
proprietary_tier := "PROPRIETARY" {
    classification_levels := [
        "confidential",
        "secret",
        "top secret"
    ]
    some level in classification_levels
    contains(lower(input.query), level)
}

proprietary_reason := "Query contains ITAR/CUI indicators" {
    proprietary_tier == "PROPRIETARY"
}

Custom Organization Rules

# policies/custom/acme_corp.rego
package lacuna.classification

import future.keywords.if

# Company-specific projects
proprietary_projects := [
    "project_apollo",
    "project_artemis",
    "skunkworks"
]

proprietary_tier := "PROPRIETARY" {
    input.context.project in proprietary_projects
}

# Company-specific customers
proprietary_customers := [
    "customer_alpha",
    "customer_beta",
    "vip_client"
]

proprietary_tier := "PROPRIETARY" {
    some customer in proprietary_customers
    contains(lower(input.query), customer)
}

# Competitive intelligence protection
proprietary_tier := "PROPRIETARY" {
    competitive_terms := [
        "market share",
        "pricing strategy",
        "product roadmap",
        "acquisition target"
    ]
    some term in competitive_terms
    contains(lower(input.query), term)
}

proprietary_reason := reason {
    proprietary_tier == "PROPRIETARY"
    reason := "Matched company-specific sensitivity rules"
}

Advanced Patterns

Time-Based Rules

# Embargo: Sensitive until announcement
proprietary_tier := "PROPRIETARY" {
    contains(lower(input.query), "new product launch")

    # Parse current time
    time.now_ns() < time.parse_rfc3339_ns("2026-03-01T00:00:00Z")
}

proprietary_reason := "Product information under embargo until March 2026" {
    contains(lower(input.query), "new product launch")
    time.now_ns() < time.parse_rfc3339_ns("2026-03-01T00:00:00Z")
}

Context-Aware Rules

# More permissive for specific users
internal_tier := "INTERNAL" {
    # Would normally be PROPRIETARY
    contains(lower(input.query), "internal infrastructure")

    # But user has elevated permissions
    input.context.user_id in data.elevated_users
}

# More restrictive in certain projects
proprietary_tier := "PROPRIETARY" {
    # Normally PUBLIC query
    contains(lower(input.query), "python tutorial")

    # But within sensitive project context
    input.context.project == "classified_research"
}

Multi-Factor Rules

# Require multiple conditions
proprietary_tier := "PROPRIETARY" {
    # Condition 1: Contains sensitive term
    contains(lower(input.query), "customer")

    # Condition 2: In production environment
    input.context.environment == "production"

    # Condition 3: During business hours (higher risk)
    hour := time.clock([time.now_ns()])[0]
    hour >= 9
    hour <= 17
}

Confidence Scoring

# Variable confidence based on matches
classification[result] {
    matched_terms := [term | 
        term := proprietary_terms[_]
        contains(lower(input.query), term)
    ]

    count(matched_terms) > 0

    # More matches = higher confidence
    confidence := min([1.0, count(matched_terms) * 0.3])

    result := {
        "tier": "PROPRIETARY",
        "confidence": confidence,
        "reasoning": sprintf("Matched %d proprietary terms", [count(matched_terms)])
    }
}

Testing Framework

Unit Tests

# policies/base_test.rego
package lacuna.classification

import future.keywords.if

test_proprietary_project_detection if {
    result := classification with input as {
        "query": "How do we deploy?",
        "context": {"project": "project_apollo"}
    }
    result.tier == "PROPRIETARY"
}

test_customer_reference_detection if {
    result := classification with input as {
        "query": "What did customer_alpha request?"
    }
    result.tier == "PROPRIETARY"
}

test_public_query if {
    result := classification with input as {
        "query": "What's the latest Python version?"
    }
    result.tier == "PUBLIC"
}

test_internal_process if {
    result := classification with input as {
        "query": "How do we deploy to staging?"
    }
    result.tier == "INTERNAL"
}

Run tests:

# All tests
opa test policies/ -v

# Specific policy
opa test policies/healthcare_test.rego -v

# With coverage
opa test policies/ --coverage --format=json

Integration Testing

# tests/test_opa_integration.py
import pytest
from lacuna.plugins import OPAPlugin

@pytest.fixture
def opa_plugin():
    return OPAPlugin(
        endpoint="http://localhost:8181",
        policy_path="lacuna/classification"
    )

def test_proprietary_classification(opa_plugin):
    """Test OPA correctly classifies proprietary queries."""
    result = opa_plugin.classify(
        query="How do we handle customer_alpha data?",
        context={"project": "production"}
    )

    assert result.tier == "PROPRIETARY"
    assert result.confidence >= 0.9
    assert "customer" in result.reasoning.lower()

def test_public_classification(opa_plugin):
    """Test OPA correctly classifies public queries."""
    result = opa_plugin.classify(
        query="What's the latest Rust version?",
        context={}
    )

    assert result.tier == "PUBLIC"

def test_policy_timeout_handling(opa_plugin):
    """Test graceful handling of OPA timeouts."""
    opa_plugin.timeout = 0.001  # Very short timeout

    result = opa_plugin.classify(
        query="Test query",
        context={}
    )

    # Should fallback gracefully
    assert result is None or result.tier == "PROPRIETARY"

Policy Deployment

GitOps Workflow

Developer writes policy → PR → Review → Merge → CI/CD → Deploy
     (Rego file)                           ↓
                                     OPA Test Suite
                                     Policy Validation
                                    Deploy to OPA Server

GitHub Actions example:

# .github/workflows/policy-ci.yml
name: OPA Policy CI

on:
  pull_request:
    paths:
      - 'policies/**'
  push:
    branches: [main]
    paths:
      - 'policies/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Setup OPA
        run: |
          curl -L -o opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64
          chmod +x opa
          sudo mv opa /usr/local/bin/

      - name: Run OPA Tests
        run: opa test policies/ -v

      - name: Check Coverage
        run: |
          opa test policies/ --coverage --format=json > coverage.json
          COVERAGE=$(jq '.coverage' coverage.json)
          if (( $(echo "$COVERAGE < 80" | bc -l) )); then
            echo "Coverage too low: $COVERAGE%"
            exit 1
          fi

      - name: Validate Policy Syntax
        run: opa check policies/

  deploy:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Deploy to OPA ConfigMap
        run: |
          kubectl create configmap opa-policies \
            --from-file=policies/ \
            --dry-run=client -o yaml | \
            kubectl apply -f -

          # Reload OPA
          kubectl rollout restart deployment/opa -n lacuna

Versioning Policies

# Include version metadata
package lacuna.classification

metadata := {
    "version": "1.2.0",
    "last_updated": "2026-01-19",
    "author": "compliance-team",
    "description": "HIPAA-compliant classification rules"
}

# Return version in results
classification[result] {
    # ... classification logic ...
    result := {
        "tier": tier,
        "confidence": confidence,
        "reasoning": reasoning,
        "metadata": {
            "policy_version": metadata.version
        }
    }
}

Monitoring & Debugging

OPA Metrics

# Policy evaluation latency
opa_http_request_duration_seconds_bucket{path="/v1/data/lacuna/classification"}

# Policy evaluation count
opa_http_request_total{path="/v1/data/lacuna/classification"}

# Policy errors
opa_http_request_error_total

Debug Mode

# Enable OPA decision logging
docker run -p 8181:8181 \
  -v $(pwd)/policies:/policies \
  openpolicyagent/opa:latest \
  run --server --addr :8181 \
  --log-level debug \
  --log-format json \
  /policies

Query decision log:

curl http://localhost:8181/logs

Policy Explanation

# Explain why a decision was made
curl -X POST http://localhost:8181/v1/data/lacuna/classification/classification \
  -H 'Content-Type: application/json' \
  -d '{
    "query": "How do we handle patient data?",
    "context": {"project": "healthcare"}
  }' \
  --explain notes

Best Practices

1. Start Simple, Add Complexity Gradually

# ✅ Good: Clear, simple rules
proprietary_tier := "PROPRIETARY" {
    input.context.project in ["apollo", "artemis"]
}

# ❌ Bad: Overly complex on day 1
proprietary_tier := "PROPRIETARY" {
    projects := [p | p := data.projects[_]; p.sensitivity == "high"]
    contexts := [c | c := input.context.tags[_]; c.level > 5]
    count(projects) > 0
    count(contexts) > 0
    time.now_ns() > time.parse_rfc3339_ns(data.embargo_dates[input.context.project])
}

2. Use Descriptive Rule Names

# ✅ Good: Self-documenting
proprietary_tier := "PROPRIETARY" {
    contains_phi_indicators
}

contains_phi_indicators {
    phi_terms := ["patient", "diagnosis", "medical record"]
    some term in phi_terms
    contains(lower(input.query), term)
}

# ❌ Bad: Unclear
proprietary_tier := "PROPRIETARY" {
    check1
}

check1 {
    data.terms[_] == "patient"
}

3. Always Provide Reasoning

# ✅ Good: Explains decision
proprietary_reason := reason {
    matched := [term | 
        term := phi_terms[_]
        contains(lower(input.query), term)
    ]
    reason := sprintf("Matched PHI terms: %v", [matched])
}

# ❌ Bad: No explanation
proprietary_reason := "Proprietary" {
    proprietary_tier == "PROPRIETARY"
}

4. Test Edge Cases

test_case_sensitivity if {
    # Should match regardless of case
    result1 := classification with input as {"query": "PATIENT data"}
    result2 := classification with input as {"query": "patient data"}
    result1.tier == result2.tier
}

test_partial_matches if {
    # "patients" should match "patient" rule
    result := classification with input as {"query": "How many patients?"}
    result.tier == "PROPRIETARY"
}

test_empty_context if {
    # Should not crash on missing context
    result := classification with input as {"query": "test"}
    result.tier in ["PROPRIETARY", "INTERNAL", "PUBLIC"]
}

5. Version and Document Policies

package lacuna.classification

# Policy metadata (required)
metadata := {
    "version": "2.1.0",
    "effective_date": "2026-01-19",
    "author": "security-team@company.com",
    "description": "Classification rules for healthcare data (HIPAA compliant)",
    "changelog": [
        "2.1.0: Added MRN pattern detection",
        "2.0.0: Reorganized for clarity",
        "1.0.0: Initial version"
    ]
}

Troubleshooting

Policy Not Applied

Symptom: OPA policy not affecting classifications

Diagnosis:

# Check OPA is running
curl http://localhost:8181/health

# Check policy is loaded
curl http://localhost:8181/v1/policies

# Test policy directly
curl -X POST http://localhost:8181/v1/data/lacuna/classification/classification \
  -d '{"input": {"query": "test", "context": {}}}'

Solutions: 1. Verify OPA endpoint in Lacuna config 2. Check policy package name matches config 3. Verify policy syntax: opa check policies/ 4. Check plugin priority (OPA should be high priority)

High Latency

Symptom: OPA policy evaluation >50ms

Diagnosis:

# Add timing annotations
package lacuna.classification

classification[result] {
    start := time.now_ns()
    # ... policy logic ...
    end := time.now_ns()
    duration_ms := (end - start) / 1000000

    trace(sprintf("Policy evaluation took %d ms", [duration_ms]))
    # ... return result ...
}

Solutions: 1. Simplify complex rules 2. Use OPA's built-in functions (faster than custom) 3. Cache expensive operations 4. Consider pre-computing data lookups

Conflicting Rules

Symptom: Multiple tiers returned

Diagnosis:

# Check for multiple matching rules
opa eval --data policies/ \
  --input input.json \
  'data.lacuna.classification.classification' \
  --explain full

Solutions: 1. Use rule prioritization 2. Add explicit exclusions: not internal_tier 3. Consolidate overlapping rules 4. Use else statements for fallbacks



Policy-as-Code enables compliance teams to control classification rules without engineering bottlenecks.