Phase 5: Security Hardening
Status: Not Started Duration: 8-10 weeks Team Size: 3-4 engineers (2 security specialists, 1 DevOps, 1 Python/Rust) Prerequisites: Phase 2 complete (all arms deployed) Start Date: TBD Target Completion: TBD
Overview
Phase 5 implements comprehensive security hardening across all system layers, establishing defense-in-depth with capability-based access control, container sandboxing, PII protection, security testing automation, and comprehensive audit logging.
Key Deliverables:
- Capability System - JWT-based time-limited permissions with automatic rotation
- Container Sandboxing - gVisor, seccomp profiles, resource limits, network policies
- PII Protection - Multi-layer detection (regex + NER), redaction, differential privacy
- Security Testing - SAST, DAST, dependency scanning, penetration testing automation
- Audit Logging - Immutable provenance tracking, compliance reporting (GDPR, CCPA, SOC 2)
Success Criteria:
- ✅ Zero high-severity vulnerabilities in production
- ✅ PII detection >99% accuracy (F1 score)
- ✅ Container escapes blocked (100% in testing)
- ✅ All API calls authenticated and authorized
- ✅ Audit logs immutable and complete (100% coverage)
- ✅ GDPR/CCPA compliance verified
- ✅ Penetration test passed with no critical findings
Reference: docs/doc_phases/PHASE-5-COMPLETE-SPECIFICATIONS.md (12,500+ lines)
Sprint 5.1: Capability System [Week 23-24]
Duration: 2 weeks Team: 2 engineers (1 security specialist, 1 Python) Prerequisites: Phase 2 complete (all arms deployed) Priority: CRITICAL
Sprint Goals
- Implement JWT-based capability tokens with time-limited scopes
- Create capability validation middleware for all arms
- Set up automatic token rotation and revocation
- Implement least-privilege principle for all operations
- Audit all capability grants and usage
- Document capability design patterns
Architecture Decisions
Token Format: JWT with custom claims for capabilities Signing Algorithm: RS256 (asymmetric) for key rotation Token Lifetime: 15 minutes default, 1 hour maximum Storage: Redis for active tokens, PostgreSQL for audit trail Revocation Strategy: Token blocklist + short TTL
Tasks
Capability Token Generation (8 hours)
-
Design Capability Schema (2 hours)
- Define capability types (read, write, execute, admin)
- Define resource scopes (task_id, arm_id, global)
- Define constraint types (time_limit, cost_limit, data_limit)
- Code example:
# orchestrator/auth/capabilities.py from typing import List, Optional, Dict, Any from datetime import datetime, timedelta from pydantic import BaseModel, Field import jwt from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend class CapabilityScope(BaseModel): """Defines what resources a capability grants access to.""" resource_type: str # "task", "arm", "memory", "global" resource_id: Optional[str] = None # Specific ID or "*" for all actions: List[str] # ["read", "write", "execute", "delete"] class CapabilityConstraints(BaseModel): """Constraints on capability usage.""" max_cost_tokens: Optional[int] = None max_execution_time_seconds: Optional[int] = None allowed_tools: Optional[List[str]] = None blocked_hosts: List[str] = Field(default_factory=list) allowed_hosts: Optional[List[str]] = None max_output_size_bytes: Optional[int] = None class CapabilityToken(BaseModel): """JWT payload for capability tokens.""" sub: str # Subject (arm_id or user_id) iss: str = "octollm-orchestrator" # Issuer aud: str # Audience (target arm or service) exp: datetime # Expiration time nbf: datetime # Not before time iat: datetime # Issued at time jti: str # JWT ID (unique token identifier) scopes: List[CapabilityScope] constraints: CapabilityConstraints task_id: Optional[str] = None # Associated task parent_token_id: Optional[str] = None # Token delegation chain class CapabilityManager: """Manages capability token lifecycle.""" def __init__( self, private_key_path: str, public_key_path: str, redis_client: Redis, db_session: AsyncSession ): """Initialize capability manager with RSA keys.""" self.redis = redis_client self.db = db_session # Load RSA keys with open(private_key_path, "rb") as f: self.private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(public_key_path, "rb") as f: self.public_key = serialization.load_pem_public_key( f.read(), backend=default_backend() ) async def issue_token( self, subject: str, audience: str, scopes: List[CapabilityScope], constraints: CapabilityConstraints, lifetime_seconds: int = 900, # 15 minutes default task_id: Optional[str] = None ) -> str: """Issue a new capability token.""" import uuid now = datetime.utcnow() token_id = str(uuid.uuid4()) payload = CapabilityToken( sub=subject, aud=audience, exp=now + timedelta(seconds=lifetime_seconds), nbf=now, iat=now, jti=token_id, scopes=scopes, constraints=constraints, task_id=task_id ) # Sign token token = jwt.encode( payload.dict(), self.private_key, algorithm="RS256" ) # Store in Redis for revocation checks await self.redis.setex( f"capability:{token_id}", lifetime_seconds, token ) # Audit log await self._log_token_issuance(payload) return token async def validate_token( self, token: str, required_scope: CapabilityScope ) -> CapabilityToken: """Validate token and check if it grants required scope.""" try: # Decode and verify signature payload = jwt.decode( token, self.public_key, algorithms=["RS256"], options={"verify_exp": True} ) capability = CapabilityToken(**payload) # Check if token is revoked token_exists = await self.redis.exists(f"capability:{capability.jti}") if not token_exists: raise ValueError("Token has been revoked") # Check if token grants required scope if not self._has_scope(capability, required_scope): raise PermissionError(f"Token does not grant required scope: {required_scope}") # Audit log await self._log_token_usage(capability, required_scope) return capability except jwt.ExpiredSignatureError: raise ValueError("Token has expired") except jwt.InvalidTokenError as e: raise ValueError(f"Invalid token: {e}") def _has_scope( self, capability: CapabilityToken, required_scope: CapabilityScope ) -> bool: """Check if capability grants required scope.""" for scope in capability.scopes: # Check resource type matches if scope.resource_type != required_scope.resource_type: continue # Check resource ID matches (or is wildcard) if scope.resource_id not in (required_scope.resource_id, "*"): continue # Check all required actions are granted if all(action in scope.actions for action in required_scope.actions): return True return False async def revoke_token(self, token_id: str): """Revoke a token before expiration.""" await self.redis.delete(f"capability:{token_id}") await self._log_token_revocation(token_id) async def _log_token_issuance(self, capability: CapabilityToken): """Log token issuance to database.""" # Implementation: Insert into audit_logs table pass async def _log_token_usage(self, capability: CapabilityToken, scope: CapabilityScope): """Log token usage to database.""" # Implementation: Insert into audit_logs table pass async def _log_token_revocation(self, token_id: str): """Log token revocation to database.""" # Implementation: Insert into audit_logs table pass - Files to create:
orchestrator/auth/capabilities.py
-
Generate RSA Key Pair (1 hour)
- Create key generation script
- Store in Kubernetes secrets
- Implement key rotation strategy
- Code example:
# scripts/generate_capability_keys.py from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend import os def generate_rsa_keys(key_size: int = 4096): """Generate RSA key pair for capability tokens.""" # Generate private key private_key = rsa.generate_private_key( public_exponent=65537, key_size=key_size, backend=default_backend() ) # Serialize private key private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) # Generate public key public_key = private_key.public_key() public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) # Write to files os.makedirs("keys", exist_ok=True) with open("keys/capability_private_key.pem", "wb") as f: f.write(private_pem) os.chmod("keys/capability_private_key.pem", 0o600) with open("keys/capability_public_key.pem", "wb") as f: f.write(public_pem) print("Generated RSA keys:") print(" Private: keys/capability_private_key.pem") print(" Public: keys/capability_public_key.pem") print("\nAdd to Kubernetes secrets:") print(" kubectl create secret generic capability-keys \\") print(" --from-file=private=keys/capability_private_key.pem \\") print(" --from-file=public=keys/capability_public_key.pem \\") print(" -n octollm") if __name__ == "__main__": generate_rsa_keys() - Files to create:
scripts/generate_capability_keys.py
-
Implement Token Refresh Endpoint (2 hours)
- FastAPI endpoint for token renewal
- Validate existing token before refresh
- Prevent token chaining abuse
- Code example:
# orchestrator/api/auth.py from fastapi import APIRouter, Depends, HTTPException, Header from typing import Optional router = APIRouter(prefix="/auth", tags=["authentication"]) async def get_capability_manager() -> CapabilityManager: """Dependency injection for capability manager.""" # Implementation: Get from app state pass @router.post("/token/refresh", response_model=Dict[str, Any]) async def refresh_token( authorization: str = Header(...), manager: CapabilityManager = Depends(get_capability_manager) ) -> Dict[str, Any]: """Refresh an existing capability token. Args: authorization: Bearer token to refresh Returns: New token with same scopes and constraints Raises: HTTPException: If token is invalid or expired """ # Extract token from Authorization header if not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Invalid authorization header") old_token = authorization[7:] try: # Validate old token (this also checks expiration) capability = await manager.validate_token( old_token, CapabilityScope(resource_type="global", actions=["refresh"]) ) except ValueError as e: # Token expired - allow refresh if within grace period (5 minutes) try: payload = jwt.decode( old_token, manager.public_key, algorithms=["RS256"], options={"verify_exp": False} # Skip expiration check ) capability = CapabilityToken(**payload) # Check if within grace period grace_period_seconds = 300 # 5 minutes if (datetime.utcnow() - capability.exp).total_seconds() > grace_period_seconds: raise HTTPException(status_code=401, detail="Token expired beyond grace period") except Exception: raise HTTPException(status_code=401, detail=str(e)) except PermissionError: raise HTTPException(status_code=403, detail="Token does not have refresh permission") # Issue new token with same scopes new_token = await manager.issue_token( subject=capability.sub, audience=capability.aud, scopes=capability.scopes, constraints=capability.constraints, task_id=capability.task_id ) # Revoke old token await manager.revoke_token(capability.jti) return { "access_token": new_token, "token_type": "Bearer", "expires_in": 900 # 15 minutes } - Files to create:
orchestrator/api/auth.py
-
Create Capability Middleware (3 hours)
- FastAPI middleware for automatic validation
- Extract and validate tokens from headers
- Inject validated capability into request state
- Code example:
# orchestrator/middleware/auth.py from fastapi import Request, HTTPException from starlette.middleware.base import BaseHTTPMiddleware from typing import Callable class CapabilityMiddleware(BaseHTTPMiddleware): """Middleware to validate capability tokens on all requests.""" def __init__( self, app, capability_manager: CapabilityManager, public_paths: List[str] = None ): super().__init__(app) self.manager = capability_manager self.public_paths = public_paths or ["/health", "/metrics", "/docs", "/openapi.json"] async def dispatch(self, request: Request, call_next: Callable): """Validate capability token for protected endpoints.""" # Skip authentication for public paths if request.url.path in self.public_paths: return await call_next(request) # Extract token from Authorization header auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization header") token = auth_header[7:] # Determine required scope based on request required_scope = self._get_required_scope(request) # Validate token try: capability = await self.manager.validate_token(token, required_scope) except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) # Inject capability into request state request.state.capability = capability # Continue processing request response = await call_next(request) return response def _get_required_scope(self, request: Request) -> CapabilityScope: """Determine required scope based on HTTP method and path.""" # Parse path to extract resource type and ID path_parts = request.url.path.strip("/").split("/") if len(path_parts) >= 2 and path_parts[0] == "tasks": resource_type = "task" resource_id = path_parts[1] if len(path_parts) > 1 else None elif len(path_parts) >= 2 and path_parts[0] == "arms": resource_type = "arm" resource_id = path_parts[1] if len(path_parts) > 1 else None else: resource_type = "global" resource_id = None # Determine actions based on HTTP method method_to_actions = { "GET": ["read"], "POST": ["write"], "PUT": ["write"], "PATCH": ["write"], "DELETE": ["delete"] } actions = method_to_actions.get(request.method, ["read"]) return CapabilityScope( resource_type=resource_type, resource_id=resource_id, actions=actions ) - Files to create:
orchestrator/middleware/auth.py
Arm Integration (6 hours)
-
Add Capability Validation to All Arms (4 hours)
- Planner Arm: Validate planning capabilities
- Executor Arm: Validate execution capabilities with tool constraints
- Coder Arm: Validate code generation capabilities
- Judge Arm: Validate validation capabilities
- Safety Guardian Arm: Validate PII detection capabilities
- Retriever Arm: Validate search capabilities
- Code example (Executor Arm):
// arms/executor/src/auth.rs use jsonwebtoken::{decode, DecodingKey, Validation, Algorithm}; use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc}; use std::collections::HashSet; #[derive(Debug, Serialize, Deserialize)] pub struct CapabilityScope { pub resource_type: String, pub resource_id: Option<String>, pub actions: Vec<String>, } #[derive(Debug, Serialize, Deserialize)] pub struct CapabilityConstraints { pub max_execution_time_seconds: Option<u64>, pub allowed_tools: Option<Vec<String>>, pub blocked_hosts: Vec<String>, pub allowed_hosts: Option<Vec<String>>, } #[derive(Debug, Serialize, Deserialize)] pub struct CapabilityToken { pub sub: String, pub aud: String, pub exp: i64, pub jti: String, pub scopes: Vec<CapabilityScope>, pub constraints: CapabilityConstraints, pub task_id: Option<String>, } pub struct CapabilityValidator { public_key: DecodingKey, } impl CapabilityValidator { pub fn new(public_key_pem: &str) -> Result<Self, Box<dyn std::error::Error>> { let public_key = DecodingKey::from_rsa_pem(public_key_pem.as_bytes())?; Ok(Self { public_key }) } pub fn validate_token( &self, token: &str, required_scope: &CapabilityScope, ) -> Result<CapabilityToken, Box<dyn std::error::Error>> { // Decode and verify token let mut validation = Validation::new(Algorithm::RS256); validation.set_audience(&["executor-arm"]); let token_data = decode::<CapabilityToken>( token, &self.public_key, &validation, )?; let capability = token_data.claims; // Check if token grants required scope if !self.has_scope(&capability, required_scope) { return Err("Token does not grant required scope".into()); } Ok(capability) } fn has_scope( &self, capability: &CapabilityToken, required_scope: &CapabilityScope, ) -> bool { for scope in &capability.scopes { // Check resource type matches if scope.resource_type != required_scope.resource_type { continue; } // Check resource ID matches (or is wildcard) let resource_id_match = match (&scope.resource_id, &required_scope.resource_id) { (Some(id1), Some(id2)) => id1 == id2 || id1 == "*", (Some(id), None) => id == "*", (None, _) => false, }; if !resource_id_match { continue; } // Check all required actions are granted let required_actions: HashSet<_> = required_scope.actions.iter().collect(); let granted_actions: HashSet<_> = scope.actions.iter().collect(); if required_actions.is_subset(&granted_actions) { return true; } } false } pub fn validate_tool_execution( &self, capability: &CapabilityToken, tool_name: &str, ) -> Result<(), Box<dyn std::error::Error>> { // Check if tool is allowed if let Some(allowed_tools) = &capability.constraints.allowed_tools { if !allowed_tools.contains(&tool_name.to_string()) { return Err(format!("Tool '{}' not allowed by capability", tool_name).into()); } } Ok(()) } pub fn validate_host_access( &self, capability: &CapabilityToken, host: &str, ) -> Result<(), Box<dyn std::error::Error>> { // Check blocked hosts if capability.constraints.blocked_hosts.iter().any(|h| h == host) { return Err(format!("Host '{}' is blocked", host).into()); } // Check allowed hosts (if specified) if let Some(allowed_hosts) = &capability.constraints.allowed_hosts { if !allowed_hosts.iter().any(|h| h == host) { return Err(format!("Host '{}' not in allowed list", host).into()); } } Ok(()) } } // Integration with Actix-web use actix_web::{ dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, Error, HttpMessage, HttpResponse, }; use futures::future::LocalBoxFuture; use std::rc::Rc; pub struct CapabilityAuth { validator: Rc<CapabilityValidator>, } impl CapabilityAuth { pub fn new(public_key_pem: &str) -> Result<Self, Box<dyn std::error::Error>> { let validator = CapabilityValidator::new(public_key_pem)?; Ok(Self { validator: Rc::new(validator), }) } } impl<S, B> Transform<S, ServiceRequest> for CapabilityAuth where S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static, S::Future: 'static, B: 'static, { type Response = ServiceResponse<B>; type Error = Error; type InitError = (); type Transform = CapabilityAuthMiddleware<S>; type Future = std::future::Ready<Result<Self::Transform, Self::InitError>>; fn new_transform(&self, service: S) -> Self::Future { std::future::ready(Ok(CapabilityAuthMiddleware { service: Rc::new(service), validator: self.validator.clone(), })) } } pub struct CapabilityAuthMiddleware<S> { service: Rc<S>, validator: Rc<CapabilityValidator>, } impl<S, B> Service<ServiceRequest> for CapabilityAuthMiddleware<S> where S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static, S::Future: 'static, B: 'static, { type Response = ServiceResponse<B>; type Error = Error; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; forward_ready!(service); fn call(&self, req: ServiceRequest) -> Self::Future { let validator = self.validator.clone(); let service = self.service.clone(); Box::pin(async move { // Extract token from Authorization header let auth_header = req.headers().get("Authorization"); let token = if let Some(value) = auth_header { let auth_str = value.to_str().map_err(|_| { actix_web::error::ErrorUnauthorized("Invalid authorization header") })?; if !auth_str.starts_with("Bearer ") { return Err(actix_web::error::ErrorUnauthorized("Invalid authorization format")); } &auth_str[7..] } else { return Err(actix_web::error::ErrorUnauthorized("Missing authorization header")); }; // Validate token let required_scope = CapabilityScope { resource_type: "arm".to_string(), resource_id: Some("executor".to_string()), actions: vec!["execute".to_string()], }; let capability = validator.validate_token(token, &required_scope) .map_err(|e| actix_web::error::ErrorForbidden(e.to_string()))?; // Store capability in request extensions req.extensions_mut().insert(capability); // Continue processing service.call(req).await }) } } - Files to update:
arms/executor/src/auth.rs,arms/executor/src/main.rs
-
Test Capability Enforcement (2 hours)
- Unit tests for token validation
- Integration tests for denied access
- Test token expiration handling
- Test constraint enforcement
- Code example:
# tests/test_capabilities.py import pytest from datetime import datetime, timedelta import jwt @pytest.mark.asyncio async def test_token_validation_success(capability_manager): """Test successful token validation.""" scopes = [ CapabilityScope( resource_type="task", resource_id="task-123", actions=["read", "write"] ) ] constraints = CapabilityConstraints(max_cost_tokens=1000) token = await capability_manager.issue_token( subject="planner-arm", audience="orchestrator", scopes=scopes, constraints=constraints ) required_scope = CapabilityScope( resource_type="task", resource_id="task-123", actions=["read"] ) validated = await capability_manager.validate_token(token, required_scope) assert validated.sub == "planner-arm" @pytest.mark.asyncio async def test_token_validation_insufficient_scope(capability_manager): """Test token validation fails with insufficient scope.""" scopes = [ CapabilityScope( resource_type="task", resource_id="task-123", actions=["read"] ) ] constraints = CapabilityConstraints() token = await capability_manager.issue_token( subject="planner-arm", audience="orchestrator", scopes=scopes, constraints=constraints ) required_scope = CapabilityScope( resource_type="task", resource_id="task-123", actions=["write"] # Not granted ) with pytest.raises(PermissionError): await capability_manager.validate_token(token, required_scope) @pytest.mark.asyncio async def test_token_expiration(capability_manager): """Test token expires after TTL.""" scopes = [CapabilityScope(resource_type="global", actions=["read"])] constraints = CapabilityConstraints() # Issue token with 1 second lifetime token = await capability_manager.issue_token( subject="test", audience="test", scopes=scopes, constraints=constraints, lifetime_seconds=1 ) # Wait for expiration await asyncio.sleep(2) required_scope = CapabilityScope(resource_type="global", actions=["read"]) with pytest.raises(ValueError, match="expired"): await capability_manager.validate_token(token, required_scope) @pytest.mark.asyncio async def test_token_revocation(capability_manager): """Test token can be revoked.""" scopes = [CapabilityScope(resource_type="global", actions=["read"])] constraints = CapabilityConstraints() token = await capability_manager.issue_token( subject="test", audience="test", scopes=scopes, constraints=constraints ) # Decode to get token ID payload = jwt.decode( token, capability_manager.public_key, algorithms=["RS256"], options={"verify_exp": False} ) # Revoke token await capability_manager.revoke_token(payload["jti"]) # Validation should fail required_scope = CapabilityScope(resource_type="global", actions=["read"]) with pytest.raises(ValueError, match="revoked"): await capability_manager.validate_token(token, required_scope) - Files to create:
tests/test_capabilities.py
Documentation and Deployment (2 hours)
-
Document Capability Patterns (1 hour)
- Least-privilege examples
- Token delegation patterns
- Constraint design guidelines
- Files to create:
docs/security/capability-patterns.md
-
Update Kubernetes Deployments (1 hour)
- Mount RSA public key in all arm pods
- Environment variables for key paths
- Secret rotation procedures
- Code example:
# k8s/arms/executor-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: executor-arm namespace: octollm spec: replicas: 3 template: spec: containers: - name: executor-arm image: octollm/executor-arm:latest env: - name: CAPABILITY_PUBLIC_KEY_PATH value: /etc/octollm/keys/capability_public_key.pem volumeMounts: - name: capability-keys mountPath: /etc/octollm/keys readOnly: true volumes: - name: capability-keys secret: secretName: capability-keys items: - key: public path: capability_public_key.pem - Files to update: All arm deployment YAML files
Testing Requirements
Unit Tests
- Token generation and validation (20 test cases)
- Scope matching logic (15 test cases)
- Constraint enforcement (10 test cases)
- Key rotation (5 test cases)
Integration Tests
- End-to-end token flow (orchestrator → arm → validation)
- Token refresh workflow
- Multi-arm delegation chains
- Revocation propagation
Security Tests
- Token forgery attempts (invalid signatures)
- Scope escalation attempts
- Expired token usage
- Replay attack prevention
Documentation Deliverables
- Capability system architecture diagram (Mermaid)
- Token lifecycle documentation
- Scope design guidelines
- Key rotation runbook
- Troubleshooting guide (common auth failures)
Success Criteria
- All API endpoints require valid capability tokens
- Token validation latency <5ms (P95)
- Zero privilege escalation vulnerabilities in testing
- Audit logs capture 100% of token operations
- Key rotation procedure tested and documented
Common Pitfalls
- Clock Skew: Use NTP synchronization across all nodes to prevent token expiration issues
- Key Rotation Downtime: Implement graceful key rotation with overlapping validity periods
- Token Size: Keep scopes minimal to avoid large JWT payloads (>1KB impacts performance)
- Revocation Lag: Redis eviction policies can cause revoked tokens to persist—use explicit TTL checks
- Constraint Bypass: Validate constraints at execution time, not just at token issuance
Estimated Effort
- Development: 16 hours
- Testing: 4 hours
- Documentation: 2 hours
- Total: 22 hours (~1 week for 2 engineers)
Dependencies
- Prerequisites: Redis cluster, PostgreSQL for audit logs
- Blocking: None
- Blocked By: Sprint 5.1 must complete before Sprint 5.2 (sandboxing needs capability validation)
Sprint 5.2: Container Sandboxing [Week 25-26]
Duration: 2 weeks Team: 2 engineers (1 security specialist, 1 DevOps) Prerequisites: Sprint 5.1 complete (capability system) Priority: CRITICAL
Sprint Goals
- Implement gVisor runtime for Executor Arm containers
- Create seccomp profiles for syscall filtering
- Set up resource limits (CPU, memory, network)
- Implement network policies for egress control
- Test container escape prevention
- Document sandbox configuration
Architecture Decisions
Container Runtime: gVisor (runsc) for syscall-level isolation Seccomp Mode: Allowlist-based (deny all, allow specific syscalls) Resource Limits: cgroups v2 with memory, CPU, and I/O constraints Network Policy: Default deny egress, explicit allow for required services Storage: Ephemeral volumes only (no persistent data in sandboxes)
Tasks
gVisor Integration (10 hours)
-
Install gVisor Runtime (2 hours)
- Install runsc on Kubernetes nodes
- Configure containerd to use runsc
- Test runtime with sample workload
- Code example:
# Install gVisor on Kubernetes nodes # scripts/install-gvisor.sh #!/bin/bash set -e echo "Installing gVisor runtime..." # Download runsc binary ARCH=$(uname -m) URL=https://storage.googleapis.com/gvisor/releases/release/latest/${ARCH} wget ${URL}/runsc ${URL}/runsc.sha512 sha512sum -c runsc.sha512 rm -f runsc.sha512 # Install runsc chmod +x runsc sudo mv runsc /usr/local/bin/ # Configure containerd cat <<EOF | sudo tee /etc/containerd/config.toml version = 2 [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runsc] runtime_type = "io.containerd.runsc.v1" EOF # Restart containerd sudo systemctl restart containerd echo "gVisor runtime installed successfully" - Files to create:
scripts/install-gvisor.sh
-
Create RuntimeClass for gVisor (1 hour)
- Define RuntimeClass resource
- Configure platform-specific settings
- Code example:
# k8s/security/gvisor-runtimeclass.yaml apiVersion: node.k8s.io/v1 kind: RuntimeClass metadata: name: gvisor handler: runsc scheduling: nodeSelector: gvisor: "enabled" tolerations: - key: gvisor operator: Exists effect: NoSchedule - Files to create:
k8s/security/gvisor-runtimeclass.yaml
-
Update Executor Arm Pod Spec (2 hours)
- Add runtimeClassName to pod spec
- Configure security context
- Test execution under gVisor
- Code example:
# k8s/arms/executor-deployment.yaml (updated) apiVersion: apps/v1 kind: Deployment metadata: name: executor-arm namespace: octollm spec: replicas: 3 template: spec: runtimeClassName: gvisor # Use gVisor runtime securityContext: runAsNonRoot: true runAsUser: 1000 fsGroup: 1000 seccompProfile: type: Localhost localhostProfile: executor-arm.json containers: - name: executor-arm image: octollm/executor-arm:latest securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true capabilities: drop: - ALL resources: limits: memory: "2Gi" cpu: "1000m" ephemeral-storage: "1Gi" requests: memory: "1Gi" cpu: "500m" ephemeral-storage: "500Mi" volumeMounts: - name: tmp mountPath: /tmp volumes: - name: tmp emptyDir: sizeLimit: 500Mi - Files to update:
k8s/arms/executor-deployment.yaml
-
Benchmark gVisor Performance (3 hours)
- Measure syscall overhead
- Compare runc vs runsc latency
- Optimize for common workloads
- Code example:
# scripts/benchmark_gvisor.py import subprocess import time import statistics from typing import List, Dict def benchmark_runtime(runtime: str, iterations: int = 100) -> Dict[str, float]: """Benchmark container runtime performance.""" results = { "startup_times": [], "syscall_times": [], "network_times": [] } for i in range(iterations): # Test 1: Container startup time start = time.time() subprocess.run([ "kubectl", "run", f"test-{runtime}-{i}", "--image=alpine:latest", "--restart=Never", "--rm", f"--overrides={{\"spec\":{{\"runtimeClassName\":\"{runtime}\"}}}}", "--", "echo", "hello" ], check=True, capture_output=True) startup_time = time.time() - start results["startup_times"].append(startup_time) time.sleep(0.5) # Avoid rate limiting # Calculate statistics return { "startup_p50": statistics.median(results["startup_times"]), "startup_p95": statistics.quantiles(results["startup_times"], n=20)[18], "startup_p99": statistics.quantiles(results["startup_times"], n=100)[98], } if __name__ == "__main__": print("Benchmarking runc (default runtime)...") runc_results = benchmark_runtime("runc") print("\nBenchmarking runsc (gVisor)...") runsc_results = benchmark_runtime("gvisor") print("\n=== Results ===") print("\nrunc (default):") for metric, value in runc_results.items(): print(f" {metric}: {value:.3f}s") print("\nrunsc (gVisor):") for metric, value in runsc_results.items(): print(f" {metric}: {value:.3f}s") print("\nOverhead:") for metric in runc_results: overhead = ((runsc_results[metric] - runc_results[metric]) / runc_results[metric]) * 100 print(f" {metric}: +{overhead:.1f}%") - Files to create:
scripts/benchmark_gvisor.py
-
Document gVisor Limitations (2 hours)
- Incompatible syscalls and features
- Performance characteristics
- Troubleshooting guide
- Files to create:
docs/security/gvisor-limitations.md
Seccomp Profiles (8 hours)
-
Create Seccomp Profile for Executor Arm (4 hours)
- Audit required syscalls
- Create allowlist profile
- Test with realistic workloads
- Code example:
{ "defaultAction": "SCMP_ACT_ERRNO", "architectures": [ "SCMP_ARCH_X86_64", "SCMP_ARCH_X86", "SCMP_ARCH_X32" ], "syscalls": [ { "names": [ "accept", "accept4", "access", "arch_prctl", "bind", "brk", "capget", "capset", "chdir", "clone", "close", "connect", "dup", "dup2", "dup3", "epoll_create", "epoll_create1", "epoll_ctl", "epoll_pwait", "epoll_wait", "execve", "exit", "exit_group", "fchdir", "fchown", "fcntl", "fstat", "fstatfs", "futex", "getcwd", "getdents", "getdents64", "getegid", "geteuid", "getgid", "getpid", "getppid", "getrlimit", "getsockname", "getsockopt", "gettid", "getuid", "ioctl", "listen", "lseek", "madvise", "memfd_create", "mmap", "mprotect", "munmap", "nanosleep", "newfstatat", "open", "openat", "pipe", "pipe2", "poll", "ppoll", "prctl", "pread64", "prlimit64", "pwrite64", "read", "readlink", "readv", "recvfrom", "recvmsg", "rt_sigaction", "rt_sigprocmask", "rt_sigreturn", "sched_getaffinity", "sched_yield", "sendmsg", "sendto", "set_robust_list", "set_tid_address", "setgid", "setgroups", "setsockopt", "setuid", "shutdown", "sigaltstack", "socket", "socketpair", "stat", "statfs", "tgkill", "uname", "unlink", "wait4", "write", "writev" ], "action": "SCMP_ACT_ALLOW" } ] } - Files to create:
k8s/security/seccomp-profiles/executor-arm.json
-
Audit Syscall Usage (2 hours)
- Use strace to capture syscalls
- Identify minimum required set
- Code example:
# scripts/audit_syscalls.sh #!/bin/bash set -e echo "Auditing syscalls for executor-arm..." # Run executor-arm under strace POD_NAME=$(kubectl get pods -n octollm -l app=executor-arm -o jsonpath='{.items[0].metadata.name}') kubectl exec -n octollm $POD_NAME -- \ strace -c -f -o /tmp/strace.log \ /usr/local/bin/executor-arm --dry-run # Extract syscall names kubectl exec -n octollm $POD_NAME -- \ cat /tmp/strace.log | \ awk '{print $6}' | \ sort | uniq > required_syscalls.txt echo "Required syscalls saved to required_syscalls.txt" - Files to create:
scripts/audit_syscalls.sh
-
Test Seccomp Profile (2 hours)
- Deploy with profile enabled
- Verify functionality
- Test syscall blocking
- Code example:
# tests/test_seccomp.py import pytest import subprocess def test_allowed_syscalls(): """Test that allowed syscalls work.""" # Deploy executor-arm with seccomp profile subprocess.run([ "kubectl", "apply", "-f", "k8s/arms/executor-deployment.yaml" ], check=True) # Wait for pod to be ready subprocess.run([ "kubectl", "wait", "--for=condition=ready", "pod", "-l", "app=executor-arm", "-n", "octollm", "--timeout=60s" ], check=True) # Test basic functionality (should succeed) result = subprocess.run([ "kubectl", "exec", "-n", "octollm", "deployment/executor-arm", "--", "ls", "/tmp" ], capture_output=True) assert result.returncode == 0 def test_blocked_syscalls(): """Test that blocked syscalls are denied.""" # Attempt to use ptrace (should be blocked) result = subprocess.run([ "kubectl", "exec", "-n", "octollm", "deployment/executor-arm", "--", "strace", "ls" ], capture_output=True) # Should fail due to seccomp blocking ptrace assert result.returncode != 0 assert b"Operation not permitted" in result.stderr - Files to create:
tests/test_seccomp.py
Network Policies (4 hours)
-
Create Default Deny Policy (1 hour)
- Block all ingress by default
- Block all egress by default
- Code example:
# k8s/security/network-policies/default-deny.yaml apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: default-deny-all namespace: octollm spec: podSelector: {} policyTypes: - Ingress - Egress - Files to create:
k8s/security/network-policies/default-deny.yaml
-
Create Executor Arm Egress Policy (2 hours)
- Allow DNS resolution
- Allow orchestrator communication
- Allow allowlisted external hosts
- Code example:
# k8s/security/network-policies/executor-arm-egress.yaml apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: executor-arm-egress namespace: octollm spec: podSelector: matchLabels: app: executor-arm policyTypes: - Egress egress: # Allow DNS resolution - to: - namespaceSelector: matchLabels: name: kube-system podSelector: matchLabels: k8s-app: kube-dns ports: - protocol: UDP port: 53 # Allow orchestrator communication - to: - podSelector: matchLabels: app: orchestrator ports: - protocol: TCP port: 8000 # Allow Redis - to: - podSelector: matchLabels: app: redis ports: - protocol: TCP port: 6379 # Allow specific external hosts (e.g., package registries) - to: - namespaceSelector: {} ports: - protocol: TCP port: 443 # Note: This allows HTTPS to any host. In production, use egress # gateways with FQDN filtering for more granular control. - Files to create:
k8s/security/network-policies/executor-arm-egress.yaml
-
Test Network Isolation (1 hour)
- Verify blocked connections fail
- Verify allowed connections succeed
- Code example:
# scripts/test_network_policy.sh #!/bin/bash set -e echo "Testing network policies..." POD_NAME=$(kubectl get pods -n octollm -l app=executor-arm -o jsonpath='{.items[0].metadata.name}') # Test 1: DNS should work echo "Test 1: DNS resolution (should succeed)" kubectl exec -n octollm $POD_NAME -- nslookup google.com echo "✓ DNS resolution works" # Test 2: Orchestrator communication should work echo "Test 2: Orchestrator communication (should succeed)" kubectl exec -n octollm $POD_NAME -- \ curl -f http://orchestrator:8000/health echo "✓ Orchestrator communication works" # Test 3: Blocked host should fail echo "Test 3: Blocked host (should fail)" if kubectl exec -n octollm $POD_NAME -- \ curl -f --max-time 5 http://malicious-host.com; then echo "✗ FAIL: Blocked host was accessible" exit 1 else echo "✓ Blocked host correctly denied" fi echo "All network policy tests passed" - Files to create:
scripts/test_network_policy.sh
Resource Limits (2 hours)
-
Configure Resource Quotas (1 hour)
- Set namespace-level quotas
- Prevent resource exhaustion attacks
- Code example:
# k8s/security/resource-quota.yaml apiVersion: v1 kind: ResourceQuota metadata: name: octollm-quota namespace: octollm spec: hard: requests.cpu: "100" requests.memory: 200Gi limits.cpu: "200" limits.memory: 400Gi persistentvolumeclaims: "50" pods: "200" --- apiVersion: v1 kind: LimitRange metadata: name: octollm-limits namespace: octollm spec: limits: - max: cpu: "4" memory: 8Gi min: cpu: "100m" memory: 128Mi default: cpu: "1" memory: 2Gi defaultRequest: cpu: "500m" memory: 1Gi type: Container - max: cpu: "8" memory: 16Gi min: cpu: "200m" memory: 256Mi type: Pod - Files to create:
k8s/security/resource-quota.yaml
-
Test Resource Limit Enforcement (1 hour)
- Test OOM kill behavior
- Test CPU throttling
- Verify graceful degradation
- Files to create:
tests/test_resource_limits.py
Testing Requirements
Unit Tests
- Seccomp profile validation (10 test cases)
- Network policy syntax (5 test cases)
- Resource limit calculations (5 test cases)
Integration Tests
- gVisor runtime execution
- Syscall blocking enforcement
- Network policy enforcement
- Resource limit enforcement
- Container escape attempts (should all fail)
Security Tests
- Kernel exploit attempts (CVE-based tests)
- Container breakout scenarios
- Resource exhaustion attacks
- Network scanning from containers
Documentation Deliverables
- gVisor deployment guide
- Seccomp profile maintenance runbook
- Network policy design patterns
- Resource sizing guidelines
- Container escape test report
Success Criteria
- All executor containers run under gVisor
- Seccomp profiles block >99% of unnecessary syscalls
- Network policies enforce zero-trust model
- Resource limits prevent DoS attacks
- Zero successful container escapes in testing
Common Pitfalls
- gVisor Compatibility: Some syscalls are not supported—audit carefully before deployment
- Performance Overhead: gVisor adds 10-30% latency—budget accordingly in SLAs
- Debugging Difficulty: strace doesn't work with seccomp—use audit logs instead
- Network Policy Gaps: DNS caching can mask policy violations—test with cache cleared
- OOM Kill Loops: Set memory requests = limits to avoid unexpected evictions
Estimated Effort
- Development: 24 hours
- Testing: 6 hours
- Documentation: 3 hours
- Total: 33 hours (~2 weeks for 2 engineers)
Dependencies
- Prerequisites: Sprint 5.1 (capability system for token validation)
- Blocking: None
- Blocked By: None (can run in parallel with Sprint 5.3)
Sprint 5.3: PII Protection [Week 27-28]
Duration: 2 weeks Team: 2 engineers (1 ML, 1 Python) Prerequisites: Phase 2 complete (Safety Guardian Arm deployed) Priority: HIGH
Sprint Goals
- Implement multi-layer PII detection (regex + NER + LLM)
- Create redaction strategies (masking, tokenization, suppression)
- Add differential privacy for aggregated data
- Achieve >99% PII detection accuracy (F1 score)
- Ensure GDPR/CCPA compliance
- Document PII handling procedures
Architecture Decisions
Detection Layers:
- Regex Layer: Fast pattern matching for common formats (SSN, credit cards, emails)
- NER Layer: Presidio with spaCy models for contextual detection (names, locations)
- LLM Layer: GPT-4 for ambiguous cases and false positive reduction
Redaction Strategy: Context-dependent (complete suppression for SSNs, partial masking for emails) Storage: Never store raw PII—always redact before persisting Compliance: GDPR right to erasure, CCPA opt-out, audit trail for all PII access
Tasks
Multi-Layer Detection (12 hours)
-
Enhance Regex Patterns (3 hours)
- Add patterns for all major PII types
- Implement confidence scoring
- Reduce false positives
- Code example:
# arms/safety_guardian/pii/regex_detector.py import re from typing import List, Dict, Any, Tuple from dataclasses import dataclass @dataclass class PIIMatch: """A detected PII instance.""" pii_type: str value: str start: int end: int confidence: float class RegexPIIDetector: """Fast regex-based PII detection.""" # Comprehensive regex patterns with confidence scores PATTERNS = { "ssn": ( r"\b\d{3}-\d{2}-\d{4}\b", # 123-45-6789 0.95 ), "ssn_no_dashes": ( r"\b\d{9}\b", # 123456789 (lower confidence, many false positives) 0.50 ), "credit_card": ( r"\b(?:\d{4}[-\s]?){3}\d{4}\b", # 1234-5678-9012-3456 0.90 ), "email": ( r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", 0.85 ), "phone_us": ( r"\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b", 0.80 ), "ip_address": ( r"\b(?:\d{1,3}\.){3}\d{1,3}\b", 0.70 # Many false positives (version numbers, etc.) ), "passport_us": ( r"\b[0-9]{9}\b", # US passport number 0.60 # Low confidence without context ), "drivers_license": ( r"\b[A-Z]{1,2}\d{5,7}\b", # State-dependent format 0.65 ), "bank_account": ( r"\b\d{8,17}\b", # Generic account number 0.50 # Very low confidence without context ), "date_of_birth": ( r"\b(?:0[1-9]|1[0-2])[/-](?:0[1-9]|[12]\d|3[01])[/-](?:19|20)\d{2}\b", 0.75 ), "address": ( r"\b\d{1,5}\s\w+\s(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Circle|Cir)\b", 0.70 ), } def __init__(self, confidence_threshold: float = 0.70): """Initialize detector with confidence threshold.""" self.confidence_threshold = confidence_threshold self.compiled_patterns = { pii_type: (re.compile(pattern, re.IGNORECASE), confidence) for pii_type, (pattern, confidence) in self.PATTERNS.items() } def detect(self, text: str) -> List[PIIMatch]: """Detect PII in text using regex patterns.""" matches = [] for pii_type, (pattern, base_confidence) in self.compiled_patterns.items(): for match in pattern.finditer(text): value = match.group() # Apply heuristics to adjust confidence confidence = self._adjust_confidence( pii_type, value, base_confidence, text, match.start() ) if confidence >= self.confidence_threshold: matches.append(PIIMatch( pii_type=pii_type, value=value, start=match.start(), end=match.end(), confidence=confidence )) # Remove overlapping matches (keep highest confidence) matches = self._remove_overlaps(matches) return matches def _adjust_confidence( self, pii_type: str, value: str, base_confidence: float, text: str, position: int ) -> float: """Adjust confidence based on context and validation.""" confidence = base_confidence # Validation checks if pii_type == "credit_card": if not self._luhn_check(value.replace("-", "").replace(" ", "")): confidence *= 0.5 # Failed Luhn check elif pii_type == "ssn": # SSNs can't start with 000, 666, or 900-999 ssn_digits = value.replace("-", "") area = int(ssn_digits[:3]) if area == 0 or area == 666 or area >= 900: confidence *= 0.3 elif pii_type == "email": # Check for common non-PII email patterns if any(domain in value.lower() for domain in ["example.com", "test.com", "localhost"]): confidence *= 0.5 # Context checks context_window = 50 context_start = max(0, position - context_window) context_end = min(len(text), position + len(value) + context_window) context = text[context_start:context_end].lower() # Boost confidence if PII-related keywords are nearby pii_keywords = ["ssn", "social security", "credit card", "phone", "email", "address"] if any(keyword in context for keyword in pii_keywords): confidence *= 1.1 # Boost by 10% # Reduce confidence if in code or structured data code_indicators = ["```", "def ", "class ", "function", "var ", "const ", "{", "}"] if any(indicator in context for indicator in code_indicators): confidence *= 0.7 # Reduce by 30% return min(confidence, 1.0) def _luhn_check(self, card_number: str) -> bool: """Validate credit card using Luhn algorithm.""" def digits_of(n): return [int(d) for d in str(n)] digits = digits_of(card_number) odd_digits = digits[-1::-2] even_digits = digits[-2::-2] checksum = sum(odd_digits) for d in even_digits: checksum += sum(digits_of(d * 2)) return checksum % 10 == 0 def _remove_overlaps(self, matches: List[PIIMatch]) -> List[PIIMatch]: """Remove overlapping matches, keeping highest confidence.""" if not matches: return [] # Sort by start position matches = sorted(matches, key=lambda m: m.start) # Remove overlaps result = [matches[0]] for match in matches[1:]: prev = result[-1] if match.start < prev.end: # Overlapping - keep higher confidence if match.confidence > prev.confidence: result[-1] = match else: result.append(match) return result - Files to update:
arms/safety_guardian/pii/regex_detector.py
-
Integrate Presidio NER (4 hours)
- Install Presidio framework
- Configure spaCy models
- Create custom recognizers
- Code example:
# arms/safety_guardian/pii/ner_detector.py from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, Pattern, PatternRecognizer from presidio_analyzer.nlp_engine import NlpEngineProvider from typing import List, Dict, Any import spacy class NERPIIDetector: """NER-based PII detection using Presidio.""" def __init__(self, model_name: str = "en_core_web_lg"): """Initialize Presidio with spaCy model.""" # Configure NLP engine configuration = { "nlp_engine_name": "spacy", "models": [{"lang_code": "en", "model_name": model_name}], } provider = NlpEngineProvider(nlp_configuration=configuration) nlp_engine = provider.create_engine() # Create custom recognizers registry = RecognizerRegistry() registry.load_predefined_recognizers(nlp_engine=nlp_engine) # Add custom recognizers self._add_custom_recognizers(registry) # Create analyzer self.analyzer = AnalyzerEngine( nlp_engine=nlp_engine, registry=registry ) def _add_custom_recognizers(self, registry: RecognizerRegistry): """Add custom PII recognizers.""" # Medical record numbers mrn_recognizer = PatternRecognizer( supported_entity="MEDICAL_RECORD_NUMBER", patterns=[ Pattern( name="mrn_pattern", regex=r"\bMRN[-:\s]?\d{6,10}\b", score=0.85 ) ] ) registry.add_recognizer(mrn_recognizer) # Employee IDs employee_id_recognizer = PatternRecognizer( supported_entity="EMPLOYEE_ID", patterns=[ Pattern( name="employee_id_pattern", regex=r"\bEMP[-:\s]?\d{5,8}\b", score=0.80 ) ] ) registry.add_recognizer(employee_id_recognizer) def detect(self, text: str, language: str = "en") -> List[PIIMatch]: """Detect PII using NER.""" results = self.analyzer.analyze( text=text, language=language, entities=None, # All entity types score_threshold=0.70 ) # Convert to PIIMatch format matches = [] for result in results: matches.append(PIIMatch( pii_type=result.entity_type.lower(), value=text[result.start:result.end], start=result.start, end=result.end, confidence=result.score )) return matches - Files to create:
arms/safety_guardian/pii/ner_detector.py
-
Implement LLM-Based Detection (3 hours)
- Use GPT-4 for ambiguous cases
- Few-shot prompting for PII identification
- Code example:
# arms/safety_guardian/pii/llm_detector.py from openai import AsyncOpenAI from typing import List, Dict, Any import json class LLMPIIDetector: """LLM-based PII detection for ambiguous cases.""" def __init__(self, openai_client: AsyncOpenAI): self.client = openai_client async def detect(self, text: str, uncertain_spans: List[Tuple[int, int]]) -> List[PIIMatch]: """Use LLM to classify uncertain text spans as PII or not.""" if not uncertain_spans: return [] # Build prompt with few-shot examples prompt = self._build_prompt(text, uncertain_spans) # Call LLM response = await self.client.chat.completions.create( model="gpt-4-turbo-preview", messages=[ {"role": "system", "content": "You are a PII detection expert. Identify personally identifiable information in the given text spans."}, {"role": "user", "content": prompt} ], temperature=0.0, response_format={"type": "json_object"} ) # Parse response result = json.loads(response.choices[0].message.content) matches = [] for item in result.get("detections", []): matches.append(PIIMatch( pii_type=item["type"], value=item["value"], start=item["start"], end=item["end"], confidence=item["confidence"] )) return matches def _build_prompt(self, text: str, spans: List[Tuple[int, int]]) -> str: """Build few-shot prompt for PII detection.""" prompt = """Analyze the following text spans and determine if they contain PII (Personally Identifiable Information).
For each span, return:
- type: The type of PII (e.g., "name", "ssn", "email", "phone", "address", "none")
- value: The detected PII value
- start: Start position in text
- end: End position in text
- confidence: Detection confidence (0.0-1.0)
Examples:
Text: "Contact John Smith at john@example.com" Spans: [(8, 18), (22, 39)] Output: { "detections": [ {"type": "name", "value": "John Smith", "start": 8, "end": 18, "confidence": 0.95}, {"type": "email", "value": "john@example.com", "start": 22, "end": 39, "confidence": 0.90} ] }
Text: "The patient's glucose level was 120 mg/dL" Spans: [(34, 37)] Output: { "detections": [ {"type": "none", "value": "120", "start": 34, "end": 37, "confidence": 0.85} ] }
Now analyze:
Text: """ prompt += f""{text}"\n\nSpans: {spans}\n\nOutput:"
return prompt
```
-
Files to create:
arms/safety_guardian/pii/llm_detector.py -
Create Unified Detection Pipeline (2 hours)
- Combine all detection layers
- Aggregate results with confidence voting
- Code example:
# arms/safety_guardian/pii/unified_detector.py from typing import List, Dict, Any from collections import defaultdict class UnifiedPIIDetector: """Multi-layer PII detection with confidence aggregation.""" def __init__( self, regex_detector: RegexPIIDetector, ner_detector: NERPIIDetector, llm_detector: LLMPIIDetector ): self.regex = regex_detector self.ner = ner_detector self.llm = llm_detector async def detect(self, text: str) -> List[PIIMatch]: """Detect PII using all layers and aggregate results.""" # Layer 1: Regex detection (fast) regex_matches = self.regex.detect(text) # Layer 2: NER detection (medium speed) ner_matches = self.ner.detect(text) # Combine regex and NER results all_matches = regex_matches + ner_matches # Identify uncertain spans (low confidence or conflicting) uncertain_spans = self._find_uncertain_spans(all_matches) # Layer 3: LLM detection for uncertain spans (slow) if uncertain_spans: llm_matches = await self.llm.detect(text, uncertain_spans) all_matches.extend(llm_matches) # Aggregate overlapping detections final_matches = self._aggregate_matches(all_matches) return final_matches def _find_uncertain_spans( self, matches: List[PIIMatch], uncertainty_threshold: float = 0.80 ) -> List[Tuple[int, int]]: """Identify spans with low confidence or conflicts.""" uncertain = [] # Group matches by position position_groups = defaultdict(list) for match in matches: position_groups[(match.start, match.end)].append(match) for (start, end), group in position_groups.items(): # Check for low confidence max_confidence = max(m.confidence for m in group) if max_confidence < uncertainty_threshold: uncertain.append((start, end)) continue # Check for conflicting types types = set(m.pii_type for m in group) if len(types) > 1: uncertain.append((start, end)) return uncertain def _aggregate_matches(self, matches: List[PIIMatch]) -> List[PIIMatch]: """Aggregate overlapping matches using confidence voting.""" if not matches: return [] # Group overlapping matches groups = [] sorted_matches = sorted(matches, key=lambda m: m.start) current_group = [sorted_matches[0]] for match in sorted_matches[1:]: # Check if overlaps with current group if any(self._overlaps(match, m) for m in current_group): current_group.append(match) else: groups.append(current_group) current_group = [match] groups.append(current_group) # For each group, select best match final_matches = [] for group in groups: # Weighted voting by confidence type_scores = defaultdict(float) for match in group: type_scores[match.pii_type] += match.confidence best_type = max(type_scores, key=type_scores.get) best_match = max( (m for m in group if m.pii_type == best_type), key=lambda m: m.confidence ) final_matches.append(best_match) return final_matches def _overlaps(self, match1: PIIMatch, match2: PIIMatch) -> bool: """Check if two matches overlap.""" return not (match1.end <= match2.start or match2.end <= match1.start) - Files to create:
arms/safety_guardian/pii/unified_detector.py
Redaction Strategies (8 hours)
-
Implement Context-Aware Redaction (4 hours)
- Different strategies per PII type
- Preserve data utility where possible
- Code example:
# arms/safety_guardian/pii/redactor.py from typing import List, Dict, Any, Callable import hashlib import secrets class PIIRedactor: """Context-aware PII redaction.""" def __init__(self, salt: str = None): """Initialize redactor with salt for tokenization.""" self.salt = salt or secrets.token_hex(16) # Define redaction strategies per PII type self.strategies: Dict[str, Callable] = { "ssn": self._redact_complete, "credit_card": self._redact_complete, "bank_account": self._redact_complete, "passport_us": self._redact_complete, "email": self._redact_partial_email, "phone_us": self._redact_partial_phone, "name": self._redact_tokenize, "address": self._redact_partial_address, "date_of_birth": self._redact_partial_date, "ip_address": self._redact_partial_ip, } def redact(self, text: str, matches: List[PIIMatch]) -> str: """Redact PII from text using context-aware strategies.""" # Sort matches by position (reverse order to preserve positions) sorted_matches = sorted(matches, key=lambda m: m.start, reverse=True) redacted_text = text for match in sorted_matches: strategy = self.strategies.get( match.pii_type, self._redact_complete # Default to complete redaction ) replacement = strategy(match) redacted_text = ( redacted_text[:match.start] + replacement + redacted_text[match.end:] ) return redacted_text def _redact_complete(self, match: PIIMatch) -> str: """Completely redact PII (replace with placeholder).""" return f"[REDACTED_{match.pii_type.upper()}]" def _redact_partial_email(self, match: PIIMatch) -> str: """Partially redact email (keep domain).""" email = match.value if "@" in email: local, domain = email.split("@", 1) # Keep first character of local part redacted_local = local[0] + "***" if local else "***" return f"{redacted_local}@{domain}" return "[REDACTED_EMAIL]" def _redact_partial_phone(self, match: PIIMatch) -> str: """Partially redact phone number (keep last 4 digits).""" import re digits = re.sub(r'\D', '', match.value) if len(digits) >= 10: return f"***-***-{digits[-4:]}" return "[REDACTED_PHONE]" def _redact_partial_address(self, match: PIIMatch) -> str: """Partially redact address (keep city/state if present).""" # Simplistic: Just redact street number import re return re.sub(r'\d+', '***', match.value) def _redact_partial_date(self, match: PIIMatch) -> str: """Partially redact date of birth (keep year).""" import re # Attempt to extract year year_match = re.search(r'(19|20)\d{2}', match.value) if year_match: year = year_match.group() return f"**/**/{ year}" return "[REDACTED_DOB]" def _redact_partial_ip(self, match: PIIMatch) -> str: """Partially redact IP address (keep first two octets).""" parts = match.value.split(".") if len(parts) == 4: return f"{parts[0]}.{parts[1]}.*.*" return "[REDACTED_IP]" def _redact_tokenize(self, match: PIIMatch) -> str: """Tokenize PII (consistent hash for same value).""" # Create deterministic hash token_input = f"{match.value}{self.salt}" hash_value = hashlib.sha256(token_input.encode()).hexdigest()[:12] return f"[TOKEN_{match.pii_type.upper()}_{hash_value}]" - Files to create:
arms/safety_guardian/pii/redactor.py
-
Add Differential Privacy (2 hours)
- Implement Laplace mechanism for aggregated data
- Configure privacy budget (epsilon)
- Code example:
# arms/safety_guardian/privacy/differential_privacy.py import numpy as np from typing import List, Dict, Any class DifferentialPrivacy: """Differential privacy for aggregated data.""" def __init__(self, epsilon: float = 1.0, delta: float = 1e-5): """Initialize with privacy budget.""" self.epsilon = epsilon self.delta = delta def add_laplace_noise( self, true_value: float, sensitivity: float = 1.0 ) -> float: """Add Laplace noise to a numeric value.""" scale = sensitivity / self.epsilon noise = np.random.laplace(0, scale) return true_value + noise def add_gaussian_noise( self, true_value: float, sensitivity: float = 1.0 ) -> float: """Add Gaussian noise (for (epsilon, delta)-DP).""" sigma = np.sqrt(2 * np.log(1.25 / self.delta)) * sensitivity / self.epsilon noise = np.random.normal(0, sigma) return true_value + noise def privatize_histogram( self, histogram: Dict[str, int], sensitivity: float = 1.0 ) -> Dict[str, int]: """Add noise to histogram counts.""" noisy_histogram = {} for key, count in histogram.items(): noisy_count = self.add_laplace_noise(count, sensitivity) # Ensure non-negative noisy_histogram[key] = max(0, int(round(noisy_count))) return noisy_histogram def privatize_average( self, values: List[float], lower_bound: float, upper_bound: float ) -> float: """Compute differentially private average.""" # Clip values to bounds clipped = [max(lower_bound, min(upper_bound, v)) for v in values] # Sensitivity is (upper_bound - lower_bound) / n sensitivity = (upper_bound - lower_bound) / len(clipped) true_avg = sum(clipped) / len(clipped) return self.add_laplace_noise(true_avg, sensitivity) - Files to create:
arms/safety_guardian/privacy/differential_privacy.py
-
Create Audit Trail for PII Access (2 hours)
- Log all PII detection events
- Track redaction decisions
- GDPR/CCPA compliance reporting
- Files to update:
orchestrator/audit/pii_logger.py
Testing and Compliance (4 hours)
-
Create PII Detection Test Suite (2 hours)
- Benchmark dataset with labeled PII
- Calculate precision, recall, F1 score
- Target: >99% F1 score
- Code example:
# tests/test_pii_detection.py import pytest from typing import List, Tuple # Test dataset with labeled PII TEST_CASES = [ ( "My SSN is 123-45-6789 and email is john@example.com", [("ssn", 10, 21), ("email", 36, 53)] ), ( "Call me at (555) 123-4567 or 555-987-6543", [("phone_us", 11, 25), ("phone_us", 29, 41)] ), ( "John Smith lives at 123 Main Street, New York, NY 10001", [("name", 0, 10), ("address", 20, 56)] ), # ... 100+ more test cases ] @pytest.mark.asyncio async def test_pii_detection_accuracy(unified_detector): """Test PII detection accuracy on benchmark dataset.""" true_positives = 0 false_positives = 0 false_negatives = 0 for text, expected_pii in TEST_CASES: detected = await unified_detector.detect(text) # Convert to set of (type, start, end) tuples detected_set = {(m.pii_type, m.start, m.end) for m in detected} expected_set = set(expected_pii) tp = len(detected_set & expected_set) fp = len(detected_set - expected_set) fn = len(expected_set - detected_set) true_positives += tp false_positives += fp false_negatives += fn # Calculate metrics precision = true_positives / (true_positives + false_positives) recall = true_positives / (true_positives + false_negatives) f1_score = 2 * (precision * recall) / (precision + recall) print(f"Precision: {precision:.3f}") print(f"Recall: {recall:.3f}") print(f"F1 Score: {f1_score:.3f}") # Assert F1 score > 99% assert f1_score >= 0.99, f"F1 score {f1_score:.3f} below target 0.99" - Files to create:
tests/test_pii_detection.py
-
GDPR Compliance Verification (1 hour)
- Right to erasure (delete all user data)
- Data portability (export user data)
- Consent management
- Files to create:
docs/compliance/gdpr-procedures.md
-
CCPA Compliance Verification (1 hour)
- Opt-out mechanisms
- Data disclosure reporting
- Files to create:
docs/compliance/ccpa-procedures.md
Testing Requirements
Unit Tests
- Regex pattern accuracy (30 test cases per pattern)
- NER model accuracy (50 test cases)
- LLM detection accuracy (20 test cases)
- Redaction strategies (15 test cases)
- Differential privacy noise distribution (10 test cases)
Integration Tests
- End-to-end detection pipeline
- Multi-layer aggregation
- Redaction preservation of data utility
- Audit log completeness
Performance Tests
- Detection latency (<100ms for regex, <500ms for NER, <2s for LLM)
- Throughput (>100 requests/second)
Documentation Deliverables
- PII detection architecture diagram
- Supported PII types reference
- Redaction strategy guide
- Differential privacy parameter tuning
- GDPR/CCPA compliance procedures
Success Criteria
- F1 score >99% on benchmark dataset
- Zero PII stored in database (all redacted)
- Audit trail for 100% of PII access
- GDPR/CCPA compliance verified
- Detection latency <2s (P95)
Common Pitfalls
- False Positives: Version numbers (e.g., "1.2.3.4") detected as IP addresses—use context checks
- False Negatives: International formats (non-US phone numbers, addresses)—expand regex patterns
- Performance: LLM detection is slow—only use for uncertain spans
- Context Loss: Aggressive redaction removes too much context—use partial redaction
- Compliance Gaps: Missing audit logs for read operations—log all PII access, not just writes
Estimated Effort
- Development: 24 hours
- Testing: 6 hours
- Documentation: 3 hours
- Total: 33 hours (~2 weeks for 2 engineers)
Dependencies
- Prerequisites: Safety Guardian Arm deployed (Phase 2)
- Blocking: None
- Blocked By: None (can run in parallel with other sprints)
Sprint 5.4: Security Testing [Week 29-30]
(Abbreviated for space - full version would be 1,000-1,200 lines)
Sprint Goals
- Set up SAST (Bandit, Semgrep, cargo-audit)
- Set up DAST (ZAP, Burp Suite, custom scanners)
- Implement dependency vulnerability scanning
- Conduct penetration testing
- Automate security testing in CI/CD
- Create security testing runbooks
Key Tasks (Summary)
-
SAST Integration (8 hours)
- Configure Bandit for Python code scanning
- Configure Semgrep with custom rules
- Configure cargo-audit for Rust dependencies
- Integrate into GitHub Actions CI
-
DAST Integration (8 hours)
- Set up OWASP ZAP for API testing
- Create custom exploit scripts
- Test for OWASP Top 10 vulnerabilities
- Automate in staging environment
-
Dependency Scanning (4 hours)
- Configure Dependabot for automated PRs
- Set up Snyk for vulnerability monitoring
- Create dependency update policy
-
Penetration Testing (12 hours)
- Contract external security firm
- Conduct internal testing (OWASP testing guide)
- Document findings and remediation
- Retest after fixes
-
CI/CD Integration (4 hours)
- Add security gates to pipeline
- Block deploys on critical vulnerabilities
- Generate security reports
Estimated Effort: 36 hours (~2 weeks for 2 engineers)
Sprint 5.5: Audit Logging [Week 31-32]
(Abbreviated for space - full version would be 800-1,000 lines)
Sprint Goals
- Implement provenance tracking for all artifacts
- Create immutable audit log storage (WORM)
- Build compliance reporting dashboards
- Ensure 100% coverage of security events
- Document audit log retention policies
- Create forensic analysis procedures
Key Tasks (Summary)
-
Provenance Tracking (8 hours)
- Track artifact lineage (inputs → processing → outputs)
- Record all LLM calls with prompts and responses
- Store task execution graphs
- Cryptographic signing of artifacts
-
Immutable Audit Logs (8 hours)
- Use PostgreSQL with append-only tables
- Implement Write-Once-Read-Many (WORM) storage
- Merkle tree for tamper detection
- Archive to S3 Glacier for long-term retention
-
Compliance Reporting (6 hours)
- Build Grafana dashboards for SOC 2, ISO 27001
- Automate report generation
- GDPR/CCPA data access reports
-
Security Event Monitoring (6 hours)
- Monitor for anomalous access patterns
- Alert on suspicious activities
- Integration with SIEM systems
-
Forensic Procedures (4 hours)
- Document incident response runbooks
- Create audit log analysis tools
- Train team on forensic investigation
Estimated Effort: 32 hours (~2 weeks for 2 engineers)
Phase 5 Summary
Total Tasks: 60+ security hardening tasks across 5 sprints Estimated Duration: 8-10 weeks with 3-4 engineers Total Estimated Hours: ~160 hours development + ~30 hours testing + ~20 hours documentation = 210 hours
Deliverables:
- Capability-based access control system
- Container sandboxing with gVisor
- Multi-layer PII protection (>99% accuracy)
- Comprehensive security testing automation
- Immutable audit logging with compliance reporting
Completion Checklist:
- All API calls require capability tokens
- All containers run under gVisor with seccomp
- PII detection F1 score >99%
- Zero high-severity vulnerabilities in production
- 100% security event audit coverage
- GDPR/CCPA compliance verified
- Penetration test passed
Next Phase: Phase 6 (Production Readiness)
Document Version: 1.0 Last Updated: 2025-11-10 Maintained By: OctoLLM Security Team