Phase 2: Core Capabilities
Status: Not Started Duration: 8-10 weeks Team Size: 4-5 engineers (3 Python, 1 Rust, 1 ML/data) Prerequisites: Phase 1 complete Start Date: TBD Target Completion: TBD
Overview
Phase 2 expands the OctoLLM system to include all 6 specialized arms, distributed memory systems, Kubernetes production deployment, and swarm decision-making capabilities. This phase transforms the POC into a production-capable system with all core functionality.
Key Deliverables:
- Retriever Arm (Python) - Hybrid search with Qdrant + PostgreSQL
- Coder Arm (Python) - Code generation with episodic memory
- Judge Arm (Python) - Multi-layer output validation
- Safety Guardian Arm (Python) - PII detection and content filtering
- Distributed Memory System - PostgreSQL + Qdrant + Redis with routing
- Kubernetes Production Deployment - StatefulSets, Deployments, HPA, Ingress
- Swarm Decision-Making - Parallel proposal generation and consensus
Success Criteria:
- ✅ All 6 arms deployed and operational
- ✅ Memory system handling 100,000+ entities
- ✅ Kubernetes deployment with autoscaling
- ✅ Swarm decision-making working
- ✅ Load tests passing (1,000 concurrent tasks)
- ✅ Documentation updated
Reference: docs/doc_phases/PHASE-2-COMPLETE-SPECIFICATIONS.md (10,500+ lines)
Sprint 2.1: Retriever Arm [Week 7-8]
Duration: 2 weeks Team: 1-2 engineers (Python + ML) Prerequisites: Phase 1 complete, Qdrant deployed Priority: HIGH
Sprint Goals
- Implement hybrid search (vector + keyword) with Reciprocal Rank Fusion
- Deploy Qdrant vector database with optimized collections
- Integrate semantic search with sentence-transformers
- Create knowledge base indexing pipeline
- Achieve >80% retrieval accuracy (relevant docs in top-5)
- Query latency <500ms for most queries
Architecture Decisions Required
-
Decision 1: Embedding Model Selection
- Option A: sentence-transformers/all-MiniLM-L6-v2 (fast, 384 dim)
- Option B: sentence-transformers/all-mpnet-base-v2 (better quality, 768 dim)
- Option C: OpenAI text-embedding-ada-002 (API-based, 1536 dim)
- Recommendation: Option A for cost/speed balance
-
Decision 2: Re-ranking Strategy
- Option A: Cross-encoder re-ranking (accurate but slow)
- Option B: Reciprocal Rank Fusion (RRF) only (fast)
- Option C: Hybrid approach (RRF + cross-encoder for top-10)
- Recommendation: Option B initially, Option C for production
Tasks
Qdrant Deployment and Configuration (8 hours)
-
Deploy Qdrant Vector Database (4 hours)
- Create Qdrant StatefulSet for Kubernetes:
# k8s/databases/qdrant-statefulset.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: qdrant namespace: octollm spec: serviceName: qdrant replicas: 1 # Single instance for Phase 2 selector: matchLabels: app: qdrant template: metadata: labels: app: qdrant spec: containers: - name: qdrant image: qdrant/qdrant:v1.7.0 ports: - containerPort: 6333 name: http - containerPort: 6334 name: grpc volumeMounts: - name: qdrant-storage mountPath: /qdrant/storage resources: requests: memory: "2Gi" cpu: "1000m" limits: memory: "4Gi" cpu: "2000m" volumeClaimTemplates: - metadata: name: qdrant-storage spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 50Gi - Create Qdrant Service (ClusterIP)
- Verify deployment with health check
- Files to create:
k8s/databases/qdrant-statefulset.yaml,k8s/databases/qdrant-service.yaml - Reference:
docs/operations/kubernetes-deployment.md
- Create Qdrant StatefulSet for Kubernetes:
-
Create Collection Schema (2 hours)
- Define collection structure for documents:
# arms/retriever/collections.py from qdrant_client import QdrantClient from qdrant_client.http import models COLLECTION_CONFIG = { "documents": { "vector_size": 384, # all-MiniLM-L6-v2 "distance": "Cosine", "on_disk_payload": True, "hnsw_config": { "m": 16, "ef_construct": 100, "full_scan_threshold": 10000 }, "quantization_config": { "scalar": { "type": "int8", "quantile": 0.99, "always_ram": True } } } } def initialize_collections(client: QdrantClient): """Initialize Qdrant collections with optimized configuration.""" for collection_name, config in COLLECTION_CONFIG.items(): if not client.collection_exists(collection_name): client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams( size=config["vector_size"], distance=models.Distance[config["distance"].upper()] ), hnsw_config=models.HnswConfigDiff(**config["hnsw_config"]), quantization_config=models.ScalarQuantization(**config["quantization_config"]["scalar"]), on_disk_payload=config["on_disk_payload"] ) - Create indexes for metadata filtering
- Configure HNSW parameters for performance
- Files to create:
arms/retriever/collections.py
- Define collection structure for documents:
-
Implement Qdrant Client Wrapper (2 hours)
- Connection pooling and retry logic
- Health check integration
- Batch operations for indexing
- Code example:
# arms/retriever/qdrant_client.py from typing import List, Dict, Any from qdrant_client import QdrantClient from qdrant_client.http import models import asyncio from functools import lru_cache class QdrantClientWrapper: def __init__(self, url: str, api_key: str = None, timeout: int = 30): self.client = QdrantClient(url=url, api_key=api_key, timeout=timeout) async def search( self, collection_name: str, query_vector: List[float], limit: int = 10, filter_conditions: Dict = None, score_threshold: float = 0.0 ) -> List[Dict[str, Any]]: """Async semantic search with optional filtering.""" search_result = await asyncio.to_thread( self.client.search, collection_name=collection_name, query_vector=query_vector, limit=limit, query_filter=models.Filter(**filter_conditions) if filter_conditions else None, score_threshold=score_threshold, with_payload=True ) return [ { "id": hit.id, "score": hit.score, "payload": hit.payload } for hit in search_result ] - Files to create:
arms/retriever/qdrant_client.py
Hybrid Search Implementation (12 hours)
-
Implement Semantic Search with Embeddings (4 hours)
- sentence-transformers integration
- Batch embedding generation
- Caching for common queries
- Code example:
# arms/retriever/embeddings.py from sentence_transformers import SentenceTransformer from typing import List import torch from functools import lru_cache class EmbeddingGenerator: def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): self.model = SentenceTransformer(model_name) self.model.eval() @lru_cache(maxsize=1000) def encode_cached(self, text: str) -> List[float]: """Generate embeddings with caching for common queries.""" return self.encode([text])[0] def encode(self, texts: List[str]) -> List[List[float]]: """Generate embeddings for a batch of texts.""" with torch.no_grad(): embeddings = self.model.encode( texts, batch_size=32, show_progress_bar=False, normalize_embeddings=True ) return embeddings.tolist() - Files to create:
arms/retriever/embeddings.py - Reference:
docs/components/arms/retriever-arm.md
-
Implement PostgreSQL Full-Text Search (3 hours)
- Create GIN indexes for text columns
- ts_vector and ts_query integration
- Relevance ranking with ts_rank
- SQL schema:
-- Add full-text search to entities table ALTER TABLE entities ADD COLUMN search_vector tsvector GENERATED ALWAYS AS ( setweight(to_tsvector('english', coalesce(name, '')), 'A') || setweight(to_tsvector('english', coalesce(description, '')), 'B') || setweight(to_tsvector('english', coalesce(properties::text, '')), 'C') ) STORED; CREATE INDEX entities_search_idx ON entities USING GIN (search_vector); -- Full-text search function CREATE OR REPLACE FUNCTION search_entities(query_text text, max_results int DEFAULT 20) RETURNS TABLE ( entity_id uuid, name text, description text, relevance_score real ) AS $$ BEGIN RETURN QUERY SELECT e.entity_id, e.name, e.description, ts_rank(e.search_vector, websearch_to_tsquery('english', query_text)) as relevance_score FROM entities e WHERE e.search_vector @@ websearch_to_tsquery('english', query_text) ORDER BY relevance_score DESC LIMIT max_results; END; $$ LANGUAGE plpgsql; - Files to create:
db/migrations/004_fulltext_search.sql
-
Implement Reciprocal Rank Fusion (RRF) (3 hours)
- Combine vector and keyword search results
- Configurable fusion weights
- Deduplication logic
- Code example:
# arms/retriever/fusion.py from typing import List, Dict, Any from collections import defaultdict class ReciprocalRankFusion: def __init__(self, k: int = 60): """ Reciprocal Rank Fusion algorithm. k: constant for smoothing (typically 60) """ self.k = k def fuse( self, semantic_results: List[Dict[str, Any]], keyword_results: List[Dict[str, Any]], semantic_weight: float = 0.6, keyword_weight: float = 0.4 ) -> List[Dict[str, Any]]: """ Fuse semantic and keyword search results using RRF. """ scores = defaultdict(float) doc_map = {} # Process semantic results for rank, doc in enumerate(semantic_results, start=1): doc_id = doc["id"] scores[doc_id] += semantic_weight / (self.k + rank) doc_map[doc_id] = doc # Process keyword results for rank, doc in enumerate(keyword_results, start=1): doc_id = doc["id"] scores[doc_id] += keyword_weight / (self.k + rank) doc_map[doc_id] = doc # Sort by fused score sorted_ids = sorted(scores.items(), key=lambda x: x[1], reverse=True) return [ { **doc_map[doc_id], "fused_score": score, "fusion_method": "RRF" } for doc_id, score in sorted_ids ] - Files to create:
arms/retriever/fusion.py
-
Implement Context Ranking and Reranking (2 hours)
- Cross-encoder reranking (optional)
- Maximal Marginal Relevance (MMR) for diversity
- Relevance scoring thresholds
- Code example:
# arms/retriever/reranking.py from typing import List, Dict, Any import numpy as np from sklearn.metrics.pairwise import cosine_similarity class MaximalMarginalRelevance: def __init__(self, lambda_param: float = 0.5): """ MMR for result diversification. lambda_param: 0=max diversity, 1=max relevance """ self.lambda_param = lambda_param def rerank( self, query_embedding: List[float], documents: List[Dict[str, Any]], top_k: int = 10 ) -> List[Dict[str, Any]]: """Apply MMR to diversify results.""" if not documents: return [] # Extract embeddings doc_embeddings = np.array([doc["embedding"] for doc in documents]) query_emb = np.array([query_embedding]) # Compute similarities query_sim = cosine_similarity(query_emb, doc_embeddings)[0] selected = [] remaining = list(range(len(documents))) # Iterative selection while remaining and len(selected) < top_k: mmr_scores = [] for i in remaining: relevance = query_sim[i] if selected: selected_embs = doc_embeddings[selected] diversity = max(cosine_similarity([doc_embeddings[i]], selected_embs)[0]) else: diversity = 0 mmr_score = self.lambda_param * relevance - (1 - self.lambda_param) * diversity mmr_scores.append((i, mmr_score)) # Select best MMR score best_idx, best_score = max(mmr_scores, key=lambda x: x[1]) selected.append(best_idx) remaining.remove(best_idx) return [documents[i] for i in selected] - Files to create:
arms/retriever/reranking.py
Retriever Arm Service Implementation (8 hours)
-
Create FastAPI Service Structure (2 hours)
- Service initialization and configuration
- Dependency injection for clients
- Health check endpoints
- Files to create:
arms/retriever/main.py,arms/retriever/config.py
-
Implement Hybrid Search Endpoint (3 hours)
- POST /search endpoint with query and filters
- Pagination support
- Response caching with Redis
- Code example:
# arms/retriever/main.py from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional from .embeddings import EmbeddingGenerator from .qdrant_client import QdrantClientWrapper from .fusion import ReciprocalRankFusion from .reranking import MaximalMarginalRelevance import asyncio app = FastAPI(title="Retriever Arm") class SearchRequest(BaseModel): query: str = Field(..., min_length=1, max_length=1000) top_k: int = Field(default=10, ge=1, le=100) filters: Optional[Dict[str, Any]] = None enable_reranking: bool = Field(default=True) class SearchResponse(BaseModel): results: List[Dict[str, Any]] total_found: int search_time_ms: float @app.post("/search", response_model=SearchResponse) async def hybrid_search(request: SearchRequest): """Hybrid search combining semantic and keyword search.""" import time start_time = time.time() # Generate query embedding embedding_gen = get_embedding_generator() query_embedding = embedding_gen.encode_cached(request.query) # Parallel search execution semantic_task = asyncio.create_task( semantic_search(query_embedding, request.top_k, request.filters) ) keyword_task = asyncio.create_task( keyword_search(request.query, request.top_k, request.filters) ) semantic_results, keyword_results = await asyncio.gather( semantic_task, keyword_task ) # Fuse results rrf = ReciprocalRankFusion(k=60) fused_results = rrf.fuse( semantic_results, keyword_results, semantic_weight=0.6, keyword_weight=0.4 ) # Optional reranking if request.enable_reranking: mmr = MaximalMarginalRelevance(lambda_param=0.7) fused_results = mmr.rerank(query_embedding, fused_results, request.top_k) search_time_ms = (time.time() - start_time) * 1000 return SearchResponse( results=fused_results[:request.top_k], total_found=len(fused_results), search_time_ms=search_time_ms ) - Files to create:
arms/retriever/api/search.py
-
Implement Document Indexing Endpoint (2 hours)
- POST /index endpoint for adding documents
- Batch indexing support
- Embedding generation and storage
- Files to create:
arms/retriever/api/indexing.py
-
Add Caching Layer with Redis (1 hour)
- Cache search results for common queries
- TTL-based cache expiration (1 hour)
- Cache key generation from query hash
- Code example:
# arms/retriever/cache.py import hashlib import json from typing import Optional, Any import redis.asyncio as redis class SearchCache: def __init__(self, redis_url: str, ttl: int = 3600): self.redis = redis.from_url(redis_url) self.ttl = ttl def _generate_key(self, query: str, filters: dict = None) -> str: """Generate cache key from query and filters.""" cache_input = { "query": query, "filters": filters or {} } cache_str = json.dumps(cache_input, sort_keys=True) return f"search_cache:{hashlib.sha256(cache_str.encode()).hexdigest()}" async def get(self, query: str, filters: dict = None) -> Optional[Any]: """Retrieve cached search results.""" key = self._generate_key(query, filters) cached = await self.redis.get(key) if cached: return json.loads(cached) return None async def set(self, query: str, results: Any, filters: dict = None): """Cache search results.""" key = self._generate_key(query, filters) await self.redis.setex( key, self.ttl, json.dumps(results) ) - Files to create:
arms/retriever/cache.py
Testing Requirements
-
Unit Tests (6 hours)
- Test embedding generation (consistency, caching)
- Test RRF fusion algorithm (correctness, edge cases)
- Test MMR reranking (diversity improvement)
- Test cache hit/miss scenarios
- Target coverage: >85%
- Test file:
arms/retriever/tests/test_retrieval.py - Example tests:
# arms/retriever/tests/test_retrieval.py import pytest from retriever.fusion import ReciprocalRankFusion from retriever.embeddings import EmbeddingGenerator def test_rrf_fusion(): """Test Reciprocal Rank Fusion combines results correctly.""" rrf = ReciprocalRankFusion(k=60) semantic = [ {"id": "doc1", "score": 0.95}, {"id": "doc2", "score": 0.85}, {"id": "doc3", "score": 0.75} ] keyword = [ {"id": "doc2", "score": 0.90}, {"id": "doc4", "score": 0.80}, {"id": "doc1", "score": 0.70} ] fused = rrf.fuse(semantic, keyword) # doc2 should rank highest (appears in both) assert fused[0]["id"] == "doc2" assert "fused_score" in fused[0] def test_embedding_caching(): """Test embedding caching improves performance.""" gen = EmbeddingGenerator() import time # First call (uncached) start = time.time() emb1 = gen.encode_cached("test query") first_time = time.time() - start # Second call (cached) start = time.time() emb2 = gen.encode_cached("test query") second_time = time.time() - start # Cached call should be much faster assert second_time < first_time * 0.1 assert emb1 == emb2
-
Integration Tests (4 hours)
- Test Qdrant integration (search, indexing)
- Test PostgreSQL full-text search
- Test end-to-end hybrid search flow
- Test file:
tests/integration/test_retriever_integration.py - Scenarios:
- Document indexing → Search retrieval
- Hybrid search with filters
- Cache hit/miss behavior
Documentation Deliverables
-
API Documentation (2 hours)
- OpenAPI spec for all endpoints (auto-generated by FastAPI)
- Request/response examples
- Error code reference
- Files: Auto-generated at
/docsendpoint
-
Component README (1 hour)
- Architecture overview
- Configuration guide
- Deployment instructions
- Files to create:
arms/retriever/README.md
Success Criteria
- Hybrid search retrieves relevant documents >80% of time (top-5)
- Query latency P95 <500ms
- Cache hit rate >60% for common queries after warm-up
- All tests passing with >85% coverage
- API documentation complete
- Successfully integrated with Orchestrator
Common Pitfalls & Tips
⚠️ Pitfall 1: Poor embedding quality leads to low retrieval accuracy ✅ Solution: Use high-quality embedding models (all-mpnet-base-v2) and normalize embeddings
⚠️ Pitfall 2: RRF weights favor one search method too heavily ✅ Solution: A/B test different weight combinations (0.5/0.5, 0.6/0.4, 0.7/0.3)
⚠️ Pitfall 3: Qdrant memory usage grows unbounded ✅ Solution: Enable quantization and on-disk payload storage
Estimated Effort
- Development: 28 hours
- Testing: 10 hours
- Documentation: 3 hours
- Total: 41 hours (~2 weeks for 1 engineer)
Dependencies
- Blocks: Sprint 2.3 (Judge arm needs retrieval for fact-checking)
- Blocked by: Phase 1 complete, Qdrant deployed
Sprint 2.2: Coder Arm [Week 8-9]
Duration: 2 weeks Team: 1-2 engineers (Python + LLM experience) Prerequisites: Qdrant deployed, Memory systems basic structure Priority: HIGH
Sprint Goals
- Implement code generation with GPT-4/Claude integration
- Create episodic memory for code snippets (Qdrant-based)
- Add static analysis integration (Ruff for Python, Clippy for Rust)
- Implement debugging assistance
- Code refactoring suggestions
- Generated code passes linters >90% of time
Architecture Decisions Required
-
Decision 1: LLM Model Selection
- Option A: GPT-4 (best quality, expensive)
- Option B: GPT-3.5-turbo (fast, cheaper)
- Option C: Claude 3 Sonnet (good balance)
- Recommendation: GPT-4 for complex, GPT-3.5 for simple
-
Decision 2: Static Analysis Integration
- Option A: Pre-generation (analyze context before generation)
- Option B: Post-generation (validate generated code)
- Option C: Both (comprehensive but slower)
- Recommendation: Option B for simplicity
Tasks
Episodic Memory Setup (6 hours)
-
Create Qdrant Collection for Code Snippets (2 hours)
- Language-specific collections (Python, Rust, JavaScript)
- Metadata schema (language, framework, complexity)
- Code example:
# arms/coder/memory.py from qdrant_client import QdrantClient from qdrant_client.http import models from typing import List, Dict, Any LANGUAGE_COLLECTIONS = { "python_code": {"vector_size": 384, "distance": "Cosine"}, "rust_code": {"vector_size": 384, "distance": "Cosine"}, "javascript_code": {"vector_size": 384, "distance": "Cosine"} } def initialize_code_collections(client: QdrantClient): """Initialize language-specific code collections.""" for collection_name, config in LANGUAGE_COLLECTIONS.items(): if not client.collection_exists(collection_name): client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams( size=config["vector_size"], distance=models.Distance[config["distance"].upper()] ), hnsw_config=models.HnswConfigDiff(m=16, ef_construct=100) ) # Create payload indexes for filtering client.create_payload_index( collection_name=collection_name, field_name="language", field_schema="keyword" ) - Files to create:
arms/coder/memory.py
-
Implement CoderMemory Class (4 hours)
- Store code snippets with embeddings
- Semantic search for similar code
- Context retrieval for generation
- Code example:
# arms/coder/memory.py (continued) from sentence_transformers import SentenceTransformer import uuid class CoderMemory: def __init__(self, qdrant_client: QdrantClient, embedding_model: str = "all-MiniLM-L6-v2"): self.client = qdrant_client self.model = SentenceTransformer(embedding_model) async def store_code_snippet( self, code: str, language: str, description: str, metadata: Dict[str, Any] = None ) -> str: """Store code snippet with embedding.""" # Generate embedding from code + description text = f"{description}\n\n{code}" embedding = self.model.encode(text).tolist() snippet_id = str(uuid.uuid4()) collection_name = f"{language.lower()}_code" self.client.upsert( collection_name=collection_name, points=[ models.PointStruct( id=snippet_id, vector=embedding, payload={ "code": code, "language": language, "description": description, **(metadata or {}) } ) ] ) return snippet_id async def search_similar_code( self, query: str, language: str, limit: int = 5 ) -> List[Dict[str, Any]]: """Search for similar code snippets.""" query_embedding = self.model.encode(query).tolist() collection_name = f"{language.lower()}_code" results = self.client.search( collection_name=collection_name, query_vector=query_embedding, limit=limit, with_payload=True ) return [ { "code": hit.payload["code"], "description": hit.payload.get("description"), "similarity": hit.score } for hit in results ] - Files to create:
arms/coder/memory.py
LLM Integration for Code Generation (8 hours)
-
Implement OpenAI/Anthropic Code Generation (4 hours)
- GPT-4 integration with code-specific prompts
- Claude 3 integration as fallback
- Temperature and parameter tuning
- Code example:
# arms/coder/generator.py from openai import AsyncOpenAI from anthropic import AsyncAnthropic from typing import Optional, Dict, Any class CodeGenerator: def __init__(self, openai_key: str, anthropic_key: str): self.openai = AsyncOpenAI(api_key=openai_key) self.anthropic = AsyncAnthropic(api_key=anthropic_key) async def generate_code( self, prompt: str, language: str, context: Optional[str] = None, model: str = "gpt-4" ) -> Dict[str, Any]: """Generate code using LLM.""" system_prompt = f"""You are an expert {language} programmer.
Generate clean, idiomatic, well-documented {language} code. Include type hints, error handling, and follow best practices. """
if context:
system_prompt += f"\n\nRelevant context:\n{context}"
try:
if model.startswith("gpt"):
response = await self.openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
temperature=0.2, # Lower temp for code
max_tokens=2000
)
return {
"code": response.choices[0].message.content,
"model": model,
"tokens": response.usage.total_tokens
}
else:
# Claude fallback
response = await self.anthropic.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=2000,
system=system_prompt,
messages=[
{"role": "user", "content": prompt}
]
)
return {
"code": response.content[0].text,
"model": "claude-3-sonnet",
"tokens": response.usage.input_tokens + response.usage.output_tokens
}
except Exception as e:
raise CodeGenerationError(f"Code generation failed: {str(e)}")
```
-
Files to create:
arms/coder/generator.py -
Implement Context-Aware Generation (2 hours)
- Retrieve similar code from memory
- Include relevant examples in prompt
- Improve generation quality with context
-
Add Token Usage Tracking (2 hours)
- Prometheus metrics for LLM API calls
- Cost tracking per request
- Rate limiting to prevent overuse
Static Analysis Integration (6 hours)
-
Integrate Python Linters (Ruff, Black) (3 hours)
- Post-generation validation
- Automatic formatting
- Error reporting
- Code example:
# arms/coder/validators.py import subprocess import tempfile from pathlib import Path from typing import Dict, Any, List class PythonValidator: def validate_code(self, code: str) -> Dict[str, Any]: """Validate Python code with Ruff and Black.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(code) temp_path = Path(f.name) try: # Run Ruff for linting ruff_result = subprocess.run( ['ruff', 'check', str(temp_path)], capture_output=True, text=True ) # Run Black for formatting check black_result = subprocess.run( ['black', '--check', str(temp_path)], capture_output=True, text=True ) issues = [] if ruff_result.returncode != 0: issues.append({ "tool": "ruff", "message": ruff_result.stdout }) if black_result.returncode != 0: issues.append({ "tool": "black", "message": "Code formatting issues detected" }) return { "valid": len(issues) == 0, "issues": issues } finally: temp_path.unlink() - Files to create:
arms/coder/validators.py
-
Integrate Rust Linters (Clippy) (2 hours)
- Similar validation for Rust code
- Cargo check integration
-
Add Syntax Validation (1 hour)
- AST parsing to verify syntax
- Early error detection
Coder Arm Service Implementation (8 hours)
-
Create FastAPI Service (2 hours)
- Service initialization
- Dependency injection
- Health checks
- Files to create:
arms/coder/main.py
-
Implement /code Endpoint (3 hours)
- POST /code for code generation
- Language and framework parameters
- Context retrieval from memory
- Validation and formatting
- Code example:
# arms/coder/api/generation.py from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from typing import Optional, Dict, Any from ..generator import CodeGenerator from ..validators import PythonValidator, RustValidator from ..memory import CoderMemory router = APIRouter() class CodeRequest(BaseModel): prompt: str = Field(..., min_length=10, max_length=2000) language: str = Field(..., regex="^(python|rust|javascript|typescript)$") framework: Optional[str] = None include_context: bool = True validate: bool = True class CodeResponse(BaseModel): code: str language: str validation_result: Dict[str, Any] tokens_used: int similar_examples: List[Dict[str, Any]] @router.post("/code", response_model=CodeResponse) async def generate_code(request: CodeRequest): """Generate code based on natural language prompt.""" # Retrieve similar code from memory similar_code = [] if request.include_context: memory = get_coder_memory() similar_code = await memory.search_similar_code( query=request.prompt, language=request.language, limit=3 ) # Build context from similar examples context = "\n\n".join([ f"Example {i+1}:\n{ex['code']}" for i, ex in enumerate(similar_code) ]) # Generate code generator = get_code_generator() result = await generator.generate_code( prompt=request.prompt, language=request.language, context=context if similar_code else None ) # Validate generated code validation_result = {"valid": True, "issues": []} if request.validate: if request.language == "python": validator = PythonValidator() validation_result = validator.validate_code(result["code"]) elif request.language == "rust": validator = RustValidator() validation_result = validator.validate_code(result["code"]) # Store in memory if valid if validation_result["valid"]: memory = get_coder_memory() await memory.store_code_snippet( code=result["code"], language=request.language, description=request.prompt ) return CodeResponse( code=result["code"], language=request.language, validation_result=validation_result, tokens_used=result["tokens"], similar_examples=similar_code ) - Files to create:
arms/coder/api/generation.py
-
Implement /debug Endpoint (2 hours)
- POST /debug for debugging assistance
- Error analysis and suggestions
- Files to create:
arms/coder/api/debugging.py
-
Implement /refactor Endpoint (1 hour)
- POST /refactor for code improvements
- Refactoring suggestions
- Files to create:
arms/coder/api/refactoring.py
Testing Requirements
-
Unit Tests (6 hours)
- Test code generation quality (syntax correctness)
- Test memory retrieval (similar code search)
- Test validators (catch syntax errors)
- Target coverage: >85%
- Test file:
arms/coder/tests/test_generation.py
-
Integration Tests (4 hours)
- Test end-to-end code generation flow
- Test memory integration
- Test validation pipeline
- Scenarios:
- Generate Python function → Validate → Store
- Search similar code → Generate with context
Documentation Deliverables
-
API Documentation (2 hours)
- OpenAPI spec
- Code generation examples
- Best practices
-
Component README (1 hour)
- Architecture overview
- Supported languages
- Configuration guide
- Files to create:
arms/coder/README.md
Success Criteria
- Generated code passes linters >90% of time
- Memory retrieval finds relevant examples
- Static analysis integrated
- All tests passing with >85% coverage
- API documentation complete
Common Pitfalls & Tips
⚠️ Pitfall 1: Generated code has syntax errors ✅ Solution: Use temperature=0.2 and validate with AST parsing
⚠️ Pitfall 2: Context retrieval returns irrelevant examples ✅ Solution: Fine-tune embedding model on code corpus
⚠️ Pitfall 3: High LLM API costs ✅ Solution: Use GPT-3.5-turbo for simple tasks, cache results
Estimated Effort
- Development: 28 hours
- Testing: 10 hours
- Documentation: 3 hours
- Total: 41 hours (~2 weeks for 1 engineer)
Dependencies
- Blocks: Sprint 2.7 (Swarm needs multiple arms operational)
- Blocked by: Qdrant deployed, basic memory structure
Sprint 2.3: Judge Arm [Week 9-10]
Duration: 2 weeks Team: 1 engineer (Python + ML) Prerequisites: Retriever Arm complete (for fact-checking) Priority: HIGH
Sprint Goals
- Implement multi-layer validation (schema, facts, criteria, hallucination)
- Create quality scoring system with weighted rubrics
- Integrate with Retriever for fact-checking
- Implement hallucination detection
- Generate actionable feedback for failed validations
- Validation catches >95% of schema errors, >90% fact accuracy
Architecture Decisions Required
-
Decision 1: Hallucination Detection Method
- Option A: NLI (Natural Language Inference) model
- Option B: Fact extraction + verification against retrieval
- Option C: LLM-based consistency checking
- Recommendation: Option B for explainability
-
Decision 2: Scoring Methodology
- Option A: Binary pass/fail
- Option B: Weighted rubric (0-100 score)
- Option C: Multi-dimensional scoring
- Recommendation: Option B for flexibility
Tasks
Validation Framework (8 hours)
-
Implement Schema Validation (2 hours)
- Pydantic model validation
- JSON schema validation
- Custom validators
- Code example:
# arms/judge/validators/schema.py from pydantic import BaseModel, ValidationError, validator from typing import Any, Dict, List import jsonschema class SchemaValidator: def validate_pydantic(self, data: Dict, model_class: type) -> Dict[str, Any]: """Validate data against Pydantic model.""" try: validated = model_class(**data) return { "valid": True, "validated_data": validated.dict(), "errors": [] } except ValidationError as e: return { "valid": False, "validated_data": None, "errors": [ { "field": err["loc"][0] if err["loc"] else "root", "message": err["msg"], "type": err["type"] } for err in e.errors() ] } def validate_json_schema(self, data: Dict, schema: Dict) -> Dict[str, Any]: """Validate data against JSON schema.""" try: jsonschema.validate(instance=data, schema=schema) return { "valid": True, "errors": [] } except jsonschema.exceptions.ValidationError as e: return { "valid": False, "errors": [ { "field": ".".join(str(p) for p in e.path), "message": e.message, "schema_path": ".".join(str(p) for p in e.schema_path) } ] } - Files to create:
arms/judge/validators/schema.py
-
Implement Fact-Checking (3 hours)
- Extract claims from output
- Verify against Retriever knowledge base
- k-evidence rule (require k=3 supporting documents)
- Code example:
# arms/judge/validators/facts.py from typing import List, Dict, Any import re from retriever.client import RetrieverClient class FactChecker: def __init__(self, retriever_client: RetrieverClient, k: int = 3): """ Fact checker with k-evidence rule. k: number of supporting documents required """ self.retriever = retriever_client self.k = k def extract_claims(self, text: str) -> List[str]: """Extract factual claims from text.""" # Simple heuristic: sentences with specific entities or numbers sentences = re.split(r'[.!?]+', text) claims = [] for sentence in sentences: sentence = sentence.strip() # Claims often contain specific details if any([ re.search(r'\d+', sentence), # Numbers re.search(r'[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+', sentence), # Proper nouns any(word in sentence.lower() for word in ['is', 'was', 'are', 'were']) # Assertions ]): claims.append(sentence) return claims async def verify_claim(self, claim: str) -> Dict[str, Any]: """Verify a single claim against knowledge base.""" # Search for supporting evidence search_results = await self.retriever.search( query=claim, top_k=10 ) # Count supporting vs contradicting documents supporting = [] contradicting = [] for result in search_results: # Simple similarity threshold if result["score"] > 0.7: supporting.append(result) elif result["score"] < 0.3: contradicting.append(result) verified = len(supporting) >= self.k return { "claim": claim, "verified": verified, "supporting_count": len(supporting), "supporting_docs": supporting[:3], # Top 3 "confidence": len(supporting) / self.k if self.k > 0 else 0 } async def check_facts(self, text: str) -> Dict[str, Any]: """Check all factual claims in text.""" claims = self.extract_claims(text) if not claims: return { "valid": True, "message": "No factual claims to verify", "claims_checked": 0 } # Verify all claims results = [await self.verify_claim(claim) for claim in claims] verified_count = sum(1 for r in results if r["verified"]) accuracy = verified_count / len(results) if results else 0 return { "valid": accuracy >= 0.8, # 80% threshold "accuracy": accuracy, "claims_checked": len(results), "claims_verified": verified_count, "failed_claims": [r for r in results if not r["verified"]] } - Files to create:
arms/judge/validators/facts.py
-
Implement Acceptance Criteria Checking (2 hours)
- Compare output against task acceptance criteria
- Rule-based validation
- LLM-based semantic validation
- Code example:
# arms/judge/validators/criteria.py from typing import List, Dict, Any from openai import AsyncOpenAI class CriteriaChecker: def __init__(self, openai_client: AsyncOpenAI): self.client = openai_client async def check_criteria( self, output: str, criteria: List[str] ) -> Dict[str, Any]: """Check if output meets acceptance criteria.""" results = [] for criterion in criteria: # Use LLM for semantic checking prompt = f"""Does the following output meet this criterion?
Criterion: {criterion}
Output: {output}
Answer with YES or NO, followed by a brief explanation."""
response = await self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": prompt}
],
temperature=0.0
)
answer = response.choices[0].message.content
met = answer.strip().upper().startswith("YES")
results.append({
"criterion": criterion,
"met": met,
"explanation": answer
})
met_count = sum(1 for r in results if r["met"])
return {
"valid": met_count == len(criteria),
"criteria_met": met_count,
"total_criteria": len(criteria),
"results": results
}
```
-
Files to create:
arms/judge/validators/criteria.py -
Implement Hallucination Detection (1 hour)
- Detect unverifiable claims
- Consistency checking
- Confidence scoring
- Files to create:
arms/judge/validators/hallucination.py
Quality Scoring System (6 hours)
-
Implement Weighted Rubric System (3 hours)
- Configurable scoring dimensions
- Weighted aggregation
- Threshold-based pass/fail
- Code example:
# arms/judge/scoring.py from typing import Dict, List, Any from pydantic import BaseModel, Field class ScoringDimension(BaseModel): name: str weight: float = Field(ge=0.0, le=1.0) description: str min_score: float = 0.0 max_score: float = 100.0 class QualityScorer: def __init__(self, dimensions: List[ScoringDimension]): """ Initialize quality scorer with weighted dimensions. Weights must sum to 1.0. """ total_weight = sum(d.weight for d in dimensions) if abs(total_weight - 1.0) > 0.01: raise ValueError(f"Weights must sum to 1.0, got {total_weight}") self.dimensions = dimensions def score(self, dimension_scores: Dict[str, float]) -> Dict[str, Any]: """ Calculate weighted score across dimensions. Args: dimension_scores: Dict mapping dimension name to score (0-100) Returns: Dict with overall score and breakdown """ weighted_score = 0.0 breakdown = [] for dimension in self.dimensions: score = dimension_scores.get(dimension.name, 0.0) weighted = score * dimension.weight weighted_score += weighted breakdown.append({ "dimension": dimension.name, "score": score, "weight": dimension.weight, "weighted_score": weighted }) return { "overall_score": weighted_score, "breakdown": breakdown, "passed": weighted_score >= 70.0 # Default threshold } # Default rubric for OctoLLM outputs DEFAULT_RUBRIC = [ ScoringDimension( name="correctness", weight=0.4, description="Accuracy and factual correctness" ), ScoringDimension( name="completeness", weight=0.25, description="All requirements addressed" ), ScoringDimension( name="quality", weight=0.20, description="Code/output quality and best practices" ), ScoringDimension( name="safety", weight=0.15, description="Security and safety considerations" ) ] - Files to create:
arms/judge/scoring.py
-
Implement Feedback Generation (2 hours)
- Generate actionable recommendations
- Repair suggestions for failures
- Prioritized issue list
-
Add Confidence Scoring (1 hour)
- Uncertainty quantification
- Confidence intervals
- Flags for human review
Judge Arm Service Implementation (8 hours)
-
Create FastAPI Service (2 hours)
- Service initialization
- Dependency injection
- Health checks
- Files to create:
arms/judge/main.py
-
Implement /validate Endpoint (4 hours)
- POST /validate for output validation
- Multi-layer validation pipeline
- Detailed validation report
- Code example:
# arms/judge/api/validation.py from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional from ..validators.schema import SchemaValidator from ..validators.facts import FactChecker from ..validators.criteria import CriteriaChecker from ..validators.hallucination import HallucinationDetector from ..scoring import QualityScorer, DEFAULT_RUBRIC router = APIRouter() class ValidationRequest(BaseModel): output: str = Field(..., min_length=1) schema: Optional[Dict] = None acceptance_criteria: Optional[List[str]] = None enable_fact_checking: bool = True enable_hallucination_detection: bool = True class ValidationResponse(BaseModel): valid: bool overall_score: float validations: Dict[str, Any] feedback: List[str] confidence: float @router.post("/validate", response_model=ValidationResponse) async def validate_output(request: ValidationRequest): """Multi-layer validation of task output.""" validations = {} dimension_scores = {} feedback = [] # Layer 1: Schema validation if request.schema: schema_validator = SchemaValidator() schema_result = schema_validator.validate_json_schema( data=request.output, schema=request.schema ) validations["schema"] = schema_result dimension_scores["correctness"] = 100.0 if schema_result["valid"] else 0.0 if not schema_result["valid"]: feedback.extend([ f"Schema error in {err['field']}: {err['message']}" for err in schema_result["errors"] ]) # Layer 2: Fact-checking if request.enable_fact_checking: fact_checker = get_fact_checker() fact_result = await fact_checker.check_facts(request.output) validations["facts"] = fact_result dimension_scores["correctness"] = min( dimension_scores.get("correctness", 100.0), fact_result["accuracy"] * 100 ) if not fact_result["valid"]: feedback.extend([ f"Unverified claim: {claim['claim']}" for claim in fact_result["failed_claims"] ]) # Layer 3: Acceptance criteria if request.acceptance_criteria: criteria_checker = get_criteria_checker() criteria_result = await criteria_checker.check_criteria( output=request.output, criteria=request.acceptance_criteria ) validations["criteria"] = criteria_result dimension_scores["completeness"] = ( criteria_result["criteria_met"] / criteria_result["total_criteria"] * 100 ) if not criteria_result["valid"]: feedback.extend([ f"Criterion not met: {r['criterion']}" for r in criteria_result["results"] if not r["met"] ]) # Layer 4: Hallucination detection if request.enable_hallucination_detection: hallucination_detector = get_hallucination_detector() hallucination_result = await hallucination_detector.detect(request.output) validations["hallucination"] = hallucination_result if hallucination_result["detected"]: feedback.append(f"Potential hallucinations detected: {hallucination_result['count']}") # Calculate overall score scorer = QualityScorer(DEFAULT_RUBRIC) score_result = scorer.score(dimension_scores) return ValidationResponse( valid=score_result["passed"] and all( v.get("valid", True) for v in validations.values() ), overall_score=score_result["overall_score"], validations=validations, feedback=feedback, confidence=min(1.0, sum(dimension_scores.values()) / (len(dimension_scores) * 100)) ) - Files to create:
arms/judge/api/validation.py
-
Implement /fact-check Endpoint (2 hours)
- POST /fact-check for standalone fact verification
- Claim-by-claim breakdown
- Supporting evidence links
- Files to create:
arms/judge/api/facts.py
Testing Requirements
-
Unit Tests (6 hours)
- Test schema validation (catch format errors)
- Test fact-checking (k-evidence rule)
- Test scoring system (weighted aggregation)
- Target coverage: >85%
- Test file:
arms/judge/tests/test_validation.py - Example tests:
# arms/judge/tests/test_validation.py import pytest from judge.validators.schema import SchemaValidator from judge.validators.facts import FactChecker from judge.scoring import QualityScorer, ScoringDimension def test_schema_validation_catches_errors(): """Test schema validation detects type mismatches.""" validator = SchemaValidator() schema = { "type": "object", "properties": { "name": {"type": "string"}, "age": {"type": "integer"} }, "required": ["name", "age"] } # Valid data result = validator.validate_json_schema( {"name": "John", "age": 30}, schema ) assert result["valid"] == True # Invalid data (wrong type) result = validator.validate_json_schema( {"name": "John", "age": "thirty"}, schema ) assert result["valid"] == False assert len(result["errors"]) > 0 @pytest.mark.asyncio async def test_fact_checking_accuracy(): """Test fact checker verifies claims correctly.""" mock_retriever = MockRetrieverClient() fact_checker = FactChecker(mock_retriever, k=3) # Text with verifiable claim text = "Python was created by Guido van Rossum in 1991." result = await fact_checker.check_facts(text) assert result["claims_checked"] > 0 assert result["accuracy"] >= 0.8 def test_quality_scoring(): """Test weighted quality scoring.""" dimensions = [ ScoringDimension(name="correctness", weight=0.5, description=""), ScoringDimension(name="completeness", weight=0.5, description="") ] scorer = QualityScorer(dimensions) result = scorer.score({ "correctness": 90.0, "completeness": 80.0 }) assert result["overall_score"] == 85.0 # (90*0.5 + 80*0.5) assert result["passed"] == True
-
Integration Tests (4 hours)
- Test end-to-end validation flow
- Test Retriever integration for fact-checking
- Test validation report generation
- Scenarios:
- Valid output → All layers pass
- Invalid schema → Schema validation fails
- False claims → Fact-checking fails
Documentation Deliverables
-
API Documentation (2 hours)
- OpenAPI spec
- Validation examples
- Scoring rubric documentation
-
Component README (1 hour)
- Validation layers overview
- Configuration guide
- Custom rubric creation
- Files to create:
arms/judge/README.md
Success Criteria
- Validation catches >95% of schema errors
- Fact-checking >90% accurate on known facts
- Hallucination detection >80% effective
- All tests passing with >85% coverage
- API documentation complete
Common Pitfalls & Tips
⚠️ Pitfall 1: Fact-checking too strict causes false negatives ✅ Solution: Tune k-evidence threshold based on domain
⚠️ Pitfall 2: LLM-based criteria checking is slow ✅ Solution: Cache results for similar outputs
⚠️ Pitfall 3: Hallucination detector has high false positive rate ✅ Solution: Use multiple detection methods and consensus
Estimated Effort
- Development: 28 hours
- Testing: 10 hours
- Documentation: 3 hours
- Total: 41 hours (~2 weeks for 1 engineer)
Dependencies
- Blocks: All workflows (every task needs validation)
- Blocked by: Retriever Arm complete (for fact-checking)
Sprint 2.4: Safety Guardian Arm [Week 10-11]
(Content abbreviated for space - full sprint would be 1,500-2,000 lines with complete task breakdown, code examples, testing strategy, documentation, and acceptance criteria similar to Sprints 2.1-2.3)
Sprint Goals
- Implement comprehensive PII detection (18+ types with regex + NER)
- Create automatic redaction (type-based, hash-based, reversible)
- Add content filtering (profanity, hate speech, NSFW)
- Implement policy enforcement (capability validation, rate limiting)
- Build audit logging system (provenance tracking, immutable logs)
- Achieve >95% PII detection recall, <5% false positive rate
Key Tasks (Summary)
- PII Detection Engine (regex patterns + spaCy NER)
- Redaction Strategies (multiple approaches with AES-256)
- Content Filtering (keyword lists + ML models)
- Policy Enforcement Framework
- Audit Logging with Provenance
- GDPR/CCPA Compliance Helpers
Sprint 2.5: Distributed Memory System [Week 11-13]
(Content abbreviated for space - full sprint would be 1,800-2,200 lines)
Sprint Goals
- Implement complete PostgreSQL schema (entities, relationships, task_history, action_log)
- Deploy Qdrant per-arm episodic memory collections
- Create memory routing with query classification
- Implement data diodes for security isolation
- Build multi-tier caching (L1 in-memory, L2 Redis)
- Achieve >90% routing accuracy, <100ms query latency
Key Tasks (Summary)
- PostgreSQL Global Memory (full schema + indexes)
- Qdrant Local Memory (per-arm collections)
- Memory Router (query classification logic)
- Data Diode Implementation (PII filtering, capability checks)
- Multi-Tier Cache Layer
- Connection Pooling and Optimization
Reference: docs/implementation/memory-systems.md (2,850+ lines)
Sprint 2.6: Kubernetes Migration [Week 13-15]
(Content abbreviated for space - full sprint would be 2,000-2,500 lines)
Sprint Goals
- Deploy all services to Kubernetes production cluster
- Implement Horizontal Pod Autoscaling (HPA) for all services
- Configure Ingress with TLS (cert-manager + Let's Encrypt)
- Set up Pod Disruption Budgets (PDB) for high availability
- Deploy monitoring stack (Prometheus, Grafana)
- Achieve successful load test (1,000 concurrent tasks)
Key Tasks (Summary)
- Kubernetes Manifests (Namespace, ResourceQuota, RBAC)
- StatefulSets for Databases (PostgreSQL, Redis, Qdrant)
- Deployments for Services (Orchestrator, Reflex, 6 Arms)
- HPA Configuration (CPU, memory, custom metrics)
- Ingress and TLS Setup
- Load Testing and Verification
Reference: docs/operations/kubernetes-deployment.md (1,481 lines)
Sprint 2.7: Swarm Decision-Making [Week 15-16]
(Content abbreviated for space - full sprint would be 1,200-1,500 lines)
Sprint Goals
- Implement parallel arm invocation (N proposals for high-priority tasks)
- Create result aggregation strategies (voting, Borda count, learned)
- Build conflict resolution policies
- Add confidence scoring and uncertainty quantification
- Implement active learning feedback loops
- Achieve >95% success rate on critical tasks, <2x latency overhead
Key Tasks (Summary)
- Swarm Executor Class (parallel execution with asyncio)
- Voting and Aggregation Algorithms
- Conflict Resolution Strategies
- Confidence Scoring System
- Active Learning Integration
Reference: docs/architecture/swarm-decision-making.md
Phase 2 Summary
Total Tasks: 80+ implementation tasks across 7 sprints Estimated Duration: 8-10 weeks with 4-5 engineers Total Estimated Hours: ~290 hours development + ~70 hours testing + ~20 hours documentation = 380 hours
Deliverables:
- 4 additional arms (Retriever, Coder, Judge, Guardian)
- Distributed memory system (PostgreSQL + Qdrant + Redis)
- Kubernetes production deployment
- Swarm decision-making
- Integration tests and load tests
Completion Checklist:
- All 6 arms deployed and operational
- Memory system handling 100,000+ entities
- Kubernetes deployment with autoscaling
- Swarm decision-making working
- Load tests passing (1,000 concurrent tasks)
- Documentation updated
- Code reviewed and approved
- Security audit complete
Next Phase: Phase 3 (Operations) + Phase 4 (Engineering) - Can run in parallel
Document Version: 1.0 Last Updated: 2025-11-10 Maintained By: OctoLLM Project Management Team