Integration Patterns for OctoLLM
Document: Implementation Guide Version: 1.0 Last Updated: 2025-11-10 Estimated Time: 60-90 minutes
← Back to Documentation | Implementation Guides
Table of Contents
- Overview
- Arm-to-Arm Communication
- Orchestrator Integration
- External API Integration
- Database Integration
- Message Queue Patterns
- Webhook Integration
- Batch Processing
- Real-Time Streaming
- Testing Integration
Overview
This guide provides comprehensive integration patterns for building and connecting OctoLLM components. Each pattern includes concrete code examples, architectural diagrams, error handling strategies, and best practices.
Integration Philosophy
OctoLLM follows these integration principles:
- Loose Coupling: Components communicate through well-defined contracts
- Resilience: Graceful degradation and automatic recovery
- Observability: All integrations are traceable and measurable
- Security: Defense-in-depth with capability-based access control
- Performance: Async-first with intelligent caching
Design Principles
graph TD
subgraph "Integration Principles"
A[Contract-First<br/>API Design]
B[Fail Fast<br/>with Retries]
C[Observable<br/>by Default]
D[Capability-Based<br/>Security]
end
subgraph "Implementation"
E[Pydantic Schemas]
F[Tenacity Retries]
G[Structlog Logging]
H[JWT Tokens]
end
A --> E
B --> F
C --> G
D --> H
Pattern Categories
| Category | Use Case | Complexity | Examples |
|---|---|---|---|
| Arm-to-Arm | Direct collaboration | Medium | Coder → Judge validation |
| Orchestrator | Central coordination | High | Task routing, aggregation |
| External API | Third-party services | Medium | OpenAI API, GitHub API |
| Database | Data persistence | Medium | PostgreSQL, Qdrant, Redis |
| Message Queue | Async processing | High | Task queues, events |
| Webhook | Event notifications | Low | Status updates, callbacks |
| Batch | Bulk operations | Medium | Mass data processing |
| Streaming | Real-time updates | High | WebSocket, SSE |
Arm-to-Arm Communication
Arms can communicate directly or through the orchestrator. The choice depends on coupling requirements, security constraints, and performance needs.
Direct HTTP Communication
Use Case: Fast, direct collaboration between arms when orchestrator mediation is unnecessary.
When to Use:
- Low-latency requirements
- Arm trust established
- Simple request/response pattern
- No complex orchestration needed
Architecture:
sequenceDiagram
participant Coder as Coder Arm
participant Judge as Judge Arm
participant Memory as Shared Memory
Coder->>Coder: Generate code
Coder->>Judge: POST /validate
Note over Judge: Validate code quality,<br/>security, style
Judge->>Memory: Store validation report
Judge-->>Coder: ValidationResult
Coder->>Coder: Apply fixes if needed
Implementation:
# coder_arm/client.py
import httpx
from typing import Optional
from pydantic import BaseModel, HttpUrl
import structlog
logger = structlog.get_logger()
class ValidationRequest(BaseModel):
"""Request schema for code validation."""
code: str
language: str
context: dict
validation_rules: list[str] = []
class ValidationResult(BaseModel):
"""Response from Judge Arm."""
is_valid: bool
confidence: float
issues: list[dict]
suggestions: list[str]
execution_time_ms: int
class JudgeArmClient:
"""Client for direct Judge Arm communication."""
def __init__(
self,
base_url: HttpUrl,
timeout: int = 30,
retries: int = 3
):
self.base_url = base_url
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(timeout),
limits=httpx.Limits(max_connections=10)
)
self.retries = retries
async def validate_code(
self,
request: ValidationRequest
) -> ValidationResult:
"""
Send code to Judge Arm for validation.
Args:
request: Validation request with code and context
Returns:
ValidationResult with issues and suggestions
Raises:
httpx.HTTPError: On communication failure
"""
logger.info(
"judge.validate.request",
language=request.language,
code_length=len(request.code)
)
for attempt in range(self.retries):
try:
response = await self.client.post(
f"{self.base_url}/validate",
json=request.dict(),
headers={
"Content-Type": "application/json",
"X-Arm-ID": "coder-001",
"X-Request-ID": str(uuid4())
}
)
response.raise_for_status()
result = ValidationResult(**response.json())
logger.info(
"judge.validate.success",
is_valid=result.is_valid,
confidence=result.confidence,
issues_count=len(result.issues)
)
return result
except httpx.HTTPError as e:
logger.warning(
"judge.validate.retry",
attempt=attempt + 1,
error=str(e)
)
if attempt == self.retries - 1:
logger.error(
"judge.validate.failed",
error=str(e)
)
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def close(self):
"""Close HTTP client."""
await self.client.aclose()
# Usage in Coder Arm
async def generate_and_validate(task: TaskContract) -> dict:
"""Generate code and validate it."""
# Step 1: Generate code
code = await generate_code(task.goal)
# Step 2: Validate with Judge Arm
judge_client = JudgeArmClient(base_url="http://judge-arm:8080")
try:
validation = await judge_client.validate_code(
ValidationRequest(
code=code,
language="python",
context=task.context,
validation_rules=["security", "style", "complexity"]
)
)
# Step 3: Apply fixes if needed
if not validation.is_valid:
code = await apply_fixes(code, validation.suggestions)
# Re-validate
validation = await judge_client.validate_code(...)
return {
"code": code,
"validation": validation.dict(),
"confidence": validation.confidence
}
finally:
await judge_client.close()
Error Handling:
# Error handling wrapper
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
class ArmCommunicationError(Exception):
"""Base exception for arm communication errors."""
pass
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(httpx.NetworkError)
)
async def resilient_arm_call(client, endpoint, payload):
"""
Make resilient HTTP call to another arm.
Automatically retries on network errors with exponential backoff.
"""
try:
response = await client.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500:
# Retry on server errors
raise
else:
# Don't retry on client errors
raise ArmCommunicationError(f"HTTP {e.response.status_code}: {e.response.text}")
except httpx.NetworkError as e:
logger.error("arm.communication.network_error", error=str(e))
raise
Best Practices:
- Use connection pooling for frequent communication
- Implement circuit breaker for failing arms
- Always include request IDs for tracing
- Set appropriate timeouts (typically 30s)
- Log all communication attempts
Orchestrator-Mediated Pattern
Use Case: When orchestrator needs full visibility and control over arm collaboration.
When to Use:
- Complex multi-step workflows
- Need for result aggregation
- Security isolation requirements
- Orchestrator needs to track dependencies
Architecture:
sequenceDiagram
participant Orch as Orchestrator
participant Planner as Planner Arm
participant Retriever as Retriever Arm
participant Coder as Coder Arm
participant Judge as Judge Arm
Orch->>Planner: Decompose task
Planner-->>Orch: Plan with 3 steps
Note over Orch: Step 1: Research
Orch->>Retriever: Search documentation
Retriever-->>Orch: Search results
Note over Orch: Step 2: Code generation
Orch->>Coder: Generate code<br/>(with retrieval context)
Coder-->>Orch: Generated code
Note over Orch: Step 3: Validation
Orch->>Judge: Validate code
Judge-->>Orch: Validation result
Orch->>Orch: Aggregate results
Orch-->>Orch: Complete task
Implementation:
# orchestrator/workflow.py
from typing import List, Dict, Any
from dataclasses import dataclass
import structlog
logger = structlog.get_logger()
@dataclass
class WorkflowStep:
"""Single step in orchestrated workflow."""
step_id: str
arm_type: str
input_data: dict
dependencies: List[str] = None
status: str = "pending" # pending, running, complete, failed
result: Any = None
error: str = None
class OrchestratedWorkflow:
"""
Orchestrator-mediated workflow execution.
The orchestrator maintains full control and visibility.
"""
def __init__(self, arm_registry: dict):
self.arm_registry = arm_registry
self.step_results = {}
async def execute_workflow(
self,
steps: List[WorkflowStep],
task_context: dict
) -> Dict[str, Any]:
"""
Execute multi-step workflow with dependency resolution.
Args:
steps: List of workflow steps
task_context: Shared context across steps
Returns:
Aggregated workflow result
"""
logger.info(
"workflow.start",
total_steps=len(steps),
task_id=task_context.get("task_id")
)
# Build dependency graph
dep_graph = self._build_dependency_graph(steps)
# Execute in topological order
execution_order = self._topological_sort(dep_graph)
for step_id in execution_order:
step = next(s for s in steps if s.step_id == step_id)
# Wait for dependencies
await self._wait_for_dependencies(step, steps)
# Enrich input with dependency results
enriched_input = self._enrich_with_dependencies(
step,
task_context
)
# Execute step
try:
logger.info("workflow.step.start", step_id=step_id, arm=step.arm_type)
step.status = "running"
result = await self._execute_arm(
arm_type=step.arm_type,
input_data=enriched_input
)
step.result = result
step.status = "complete"
self.step_results[step_id] = result
logger.info("workflow.step.complete", step_id=step_id)
except Exception as e:
step.status = "failed"
step.error = str(e)
logger.error(
"workflow.step.failed",
step_id=step_id,
error=str(e)
)
# Decide whether to continue or abort
if step.dependencies:
# Critical step failed, abort workflow
raise
# Aggregate results
final_result = self._aggregate_results(steps, task_context)
logger.info("workflow.complete", task_id=task_context.get("task_id"))
return final_result
async def _execute_arm(
self,
arm_type: str,
input_data: dict
) -> dict:
"""
Execute a single arm with input data.
Args:
arm_type: Type of arm (e.g., "retriever", "coder")
input_data: Input payload for the arm
Returns:
Arm execution result
"""
arm_config = self.arm_registry[arm_type]
endpoint = arm_config["endpoint"]
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=input_data,
timeout=arm_config.get("timeout", 60)
)
response.raise_for_status()
return response.json()
def _enrich_with_dependencies(
self,
step: WorkflowStep,
context: dict
) -> dict:
"""
Enrich step input with results from dependencies.
Example:
Step 2 (code generation) gets results from Step 1 (research).
"""
enriched = step.input_data.copy()
enriched["context"] = context.copy()
if step.dependencies:
enriched["dependency_results"] = {
dep_id: self.step_results[dep_id]
for dep_id in step.dependencies
if dep_id in self.step_results
}
return enriched
def _aggregate_results(
self,
steps: List[WorkflowStep],
context: dict
) -> dict:
"""
Combine results from all steps into final output.
Strategies:
- Sequential: Last step result
- Accumulative: Merge all step results
- Hierarchical: Nested structure
"""
return {
"task_id": context.get("task_id"),
"success": all(s.status == "complete" for s in steps),
"steps": [
{
"step_id": s.step_id,
"arm": s.arm_type,
"status": s.status,
"result": s.result
}
for s in steps
],
"final_result": steps[-1].result if steps else None
}
def _build_dependency_graph(self, steps: List[WorkflowStep]) -> dict:
"""Build directed graph of step dependencies."""
graph = {step.step_id: step.dependencies or [] for step in steps}
return graph
def _topological_sort(self, graph: dict) -> List[str]:
"""Sort steps by dependencies (topological order)."""
from collections import deque
in_degree = {node: 0 for node in graph}
for node in graph:
for neighbor in graph[node]:
in_degree[neighbor] += 1
queue = deque([node for node in in_degree if in_degree[node] == 0])
result = []
while queue:
node = queue.popleft()
result.append(node)
for neighbor in graph.get(node, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
async def _wait_for_dependencies(
self,
step: WorkflowStep,
all_steps: List[WorkflowStep]
):
"""Wait for all dependencies to complete."""
if not step.dependencies:
return
while True:
deps_complete = all(
next(s for s in all_steps if s.step_id == dep_id).status == "complete"
for dep_id in step.dependencies
)
if deps_complete:
break
await asyncio.sleep(0.1)
# Usage example
async def handle_complex_task(task: TaskContract):
"""Example: Research → Code → Validate workflow."""
workflow = OrchestratedWorkflow(arm_registry={
"retriever": {"endpoint": "http://retriever-arm:8080/search"},
"coder": {"endpoint": "http://coder-arm:8080/generate"},
"judge": {"endpoint": "http://judge-arm:8080/validate"}
})
steps = [
WorkflowStep(
step_id="research",
arm_type="retriever",
input_data={
"query": task.goal,
"max_results": 10
},
dependencies=None
),
WorkflowStep(
step_id="code_generation",
arm_type="coder",
input_data={
"goal": task.goal,
"language": "python"
},
dependencies=["research"] # Depends on research step
),
WorkflowStep(
step_id="validation",
arm_type="judge",
input_data={
"validation_rules": ["security", "style"]
},
dependencies=["code_generation"] # Depends on code step
)
]
result = await workflow.execute_workflow(
steps=steps,
task_context={"task_id": task.task_id}
)
return result
Shared Memory Pattern
Use Case: Arms coordinate through shared memory instead of direct communication.
When to Use:
- Asynchronous collaboration
- Decoupled communication
- Need for persistent context
- Multiple readers/writers
Architecture:
flowchart TD
subgraph "Shared Memory Layer"
Redis[(Redis Cache)]
Qdrant[(Qdrant Vector DB)]
Postgres[(PostgreSQL KG)]
end
ARM1[Arm 1: Coder] -->|Write| Redis
ARM1 -->|Write Vector| Qdrant
ARM1 -->|Write Entity| Postgres
ARM2[Arm 2: Judge] -->|Read| Redis
ARM2 -->|Query Vector| Qdrant
ARM2 -->|Query Graph| Postgres
ARM3[Arm 3: Retriever] -->|Read| Redis
ARM3 -->|Query Vector| Qdrant
Implementation:
# shared_memory/client.py
from typing import Optional, List, Dict, Any
import redis.asyncio as redis
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import asyncpg
import structlog
logger = structlog.get_logger()
class SharedMemoryClient:
"""
Unified client for shared memory access across arms.
Provides abstraction over Redis, Qdrant, and PostgreSQL.
"""
def __init__(
self,
redis_url: str,
qdrant_url: str,
postgres_url: str
):
self.redis_client = None
self.qdrant_client = QdrantClient(url=qdrant_url)
self.pg_pool = None
self.redis_url = redis_url
self.postgres_url = postgres_url
async def connect(self):
"""Initialize connections to all backends."""
self.redis_client = await redis.from_url(self.redis_url)
self.pg_pool = await asyncpg.create_pool(self.postgres_url)
logger.info("shared_memory.connected")
# ===== Redis Operations (L1 Cache) =====
async def cache_set(
self,
key: str,
value: Any,
ttl_seconds: int = 300
):
"""
Store value in Redis cache with TTL.
Args:
key: Cache key (use namespaced keys, e.g., "arm:coder:result:123")
value: Value to cache (will be JSON serialized)
ttl_seconds: Time to live (default 5 minutes)
"""
await self.redis_client.setex(
key,
ttl_seconds,
json.dumps(value)
)
logger.debug("cache.set", key=key, ttl=ttl_seconds)
async def cache_get(self, key: str) -> Optional[Any]:
"""Get value from Redis cache."""
value = await self.redis_client.get(key)
if value:
logger.debug("cache.hit", key=key)
return json.loads(value)
logger.debug("cache.miss", key=key)
return None
async def cache_delete(self, pattern: str):
"""Delete keys matching pattern."""
keys = []
async for key in self.redis_client.scan_iter(match=pattern):
keys.append(key)
if keys:
await self.redis_client.delete(*keys)
logger.info("cache.delete", count=len(keys), pattern=pattern)
# ===== Qdrant Operations (Vector Search) =====
async def vector_store(
self,
collection_name: str,
text: str,
vector: List[float],
metadata: Dict[str, Any],
point_id: Optional[str] = None
):
"""
Store text with embedding in Qdrant.
Args:
collection_name: Collection name (e.g., "coder_context")
text: Original text
vector: Embedding vector
metadata: Additional metadata (author, timestamp, etc.)
point_id: Optional point ID (auto-generated if not provided)
"""
# Ensure collection exists
collections = await self.qdrant_client.get_collections()
if collection_name not in [c.name for c in collections.collections]:
await self.qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=len(vector),
distance=Distance.COSINE
)
)
point_id = point_id or str(uuid4())
await self.qdrant_client.upsert(
collection_name=collection_name,
points=[
PointStruct(
id=point_id,
vector=vector,
payload={"text": text, **metadata}
)
]
)
logger.info(
"vector.store",
collection=collection_name,
point_id=point_id
)
async def vector_search(
self,
collection_name: str,
query_vector: List[float],
limit: int = 10,
filter_conditions: Optional[dict] = None
) -> List[Dict[str, Any]]:
"""
Search for similar vectors in Qdrant.
Args:
collection_name: Collection to search
query_vector: Query embedding
limit: Maximum number of results
filter_conditions: Optional metadata filters
Returns:
List of search results with text and metadata
"""
results = await self.qdrant_client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit,
query_filter=filter_conditions
)
logger.info(
"vector.search",
collection=collection_name,
results_count=len(results)
)
return [
{
"id": hit.id,
"score": hit.score,
"text": hit.payload.get("text"),
"metadata": {k: v for k, v in hit.payload.items() if k != "text"}
}
for hit in results
]
# ===== PostgreSQL Operations (Knowledge Graph) =====
async def entity_create(
self,
entity_type: str,
name: str,
properties: dict
) -> str:
"""
Create entity in knowledge graph.
Args:
entity_type: Type (e.g., "function", "file", "bug")
name: Entity name
properties: Additional properties as JSONB
Returns:
UUID of created entity
"""
async with self.pg_pool.acquire() as conn:
entity_id = await conn.fetchval(
"""
INSERT INTO entities (entity_type, name, properties)
VALUES ($1, $2, $3)
RETURNING id
""",
entity_type,
name,
json.dumps(properties)
)
logger.info(
"entity.create",
entity_id=str(entity_id),
entity_type=entity_type
)
return str(entity_id)
async def relationship_create(
self,
from_entity_id: str,
to_entity_id: str,
relationship_type: str,
properties: dict = None
):
"""
Create relationship between entities.
Example: "function_A" --calls--> "function_B"
"""
async with self.pg_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO relationships (from_entity_id, to_entity_id, relationship_type, properties)
VALUES ($1, $2, $3, $4)
""",
from_entity_id,
to_entity_id,
relationship_type,
json.dumps(properties or {})
)
logger.info(
"relationship.create",
relationship_type=relationship_type
)
async def graph_query(
self,
entity_id: str,
relationship_type: Optional[str] = None,
max_depth: int = 2
) -> Dict[str, Any]:
"""
Query knowledge graph from starting entity.
Args:
entity_id: Starting entity UUID
relationship_type: Optional filter by relationship type
max_depth: Maximum traversal depth
Returns:
Subgraph as nested dict
"""
async with self.pg_pool.acquire() as conn:
# Recursive CTE for graph traversal
query = """
WITH RECURSIVE graph_traversal AS (
-- Base case: starting entity
SELECT e.id, e.entity_type, e.name, e.properties, 0 as depth
FROM entities e
WHERE e.id = $1
UNION ALL
-- Recursive case: follow relationships
SELECT e.id, e.entity_type, e.name, e.properties, gt.depth + 1
FROM entities e
INNER JOIN relationships r ON e.id = r.to_entity_id
INNER JOIN graph_traversal gt ON r.from_entity_id = gt.id
WHERE gt.depth < $2
AND ($3::text IS NULL OR r.relationship_type = $3)
)
SELECT * FROM graph_traversal
"""
rows = await conn.fetch(query, entity_id, max_depth, relationship_type)
# Build nested structure
nodes = {str(row["id"]): dict(row) for row in rows}
logger.info(
"graph.query",
start_entity=entity_id,
nodes_found=len(nodes)
)
return nodes
async def close(self):
"""Close all connections."""
if self.redis_client:
await self.redis_client.close()
if self.pg_pool:
await self.pg_pool.close()
logger.info("shared_memory.closed")
# Usage in Arms
class CoderArm:
"""Example: Coder Arm using shared memory."""
def __init__(self, memory: SharedMemoryClient):
self.memory = memory
async def generate_code(self, task: TaskContract) -> dict:
"""Generate code and store in shared memory."""
# 1. Check cache first
cache_key = f"arm:coder:result:{hash(task.goal)}"
cached = await self.memory.cache_get(cache_key)
if cached:
return cached
# 2. Query relevant context from vector DB
query_embedding = await self.embed_text(task.goal)
context = await self.memory.vector_search(
collection_name="code_context",
query_vector=query_embedding,
limit=5
)
# 3. Generate code
code = await self._generate(task.goal, context)
# 4. Store in shared memory for other arms
result = {
"code": code,
"language": "python",
"timestamp": datetime.utcnow().isoformat()
}
# Cache in Redis (5 minutes)
await self.memory.cache_set(cache_key, result, ttl_seconds=300)
# Store code embedding in Qdrant
code_embedding = await self.embed_text(code)
await self.memory.vector_store(
collection_name="generated_code",
text=code,
vector=code_embedding,
metadata={
"task_id": task.task_id,
"language": "python",
"timestamp": datetime.utcnow().isoformat()
}
)
# Store entity in knowledge graph
entity_id = await self.memory.entity_create(
entity_type="code",
name=f"generated_{task.task_id}",
properties={
"code": code,
"task_id": task.task_id
}
)
return result
class JudgeArm:
"""Example: Judge Arm reading from shared memory."""
def __init__(self, memory: SharedMemoryClient):
self.memory = memory
async def validate_code(self, task: TaskContract) -> dict:
"""Validate code from shared memory."""
# 1. Get code from cache (written by Coder Arm)
cache_key = f"arm:coder:result:{hash(task.goal)}"
code_result = await self.memory.cache_get(cache_key)
if not code_result:
raise ValueError("No code found in shared memory")
# 2. Query similar code for comparison
code_embedding = await self.embed_text(code_result["code"])
similar_code = await self.memory.vector_search(
collection_name="generated_code",
query_vector=code_embedding,
limit=10
)
# 3. Validate
is_valid = await self._validate(code_result["code"], similar_code)
# 4. Store validation result
validation_result = {
"is_valid": is_valid,
"code_hash": hash(code_result["code"]),
"timestamp": datetime.utcnow().isoformat()
}
await self.memory.cache_set(
f"arm:judge:validation:{hash(task.goal)}",
validation_result,
ttl_seconds=300
)
return validation_result
Best Practices:
- Use namespaced keys:
arm:{arm_name}:{data_type}:{id} - Set appropriate TTLs for cache entries
- Clean up expired entries periodically
- Use transactions for related operations
- Index frequently queried fields
Event-Driven Pattern
Use Case: Arms react to events published by other arms.
When to Use:
- Loose coupling required
- Fan-out notifications
- Asynchronous processing
- Event sourcing architecture
Architecture:
flowchart TD
subgraph "Event Bus (Redis Pub/Sub)"
CHANNEL1[code.generated]
CHANNEL2[validation.complete]
CHANNEL3[task.complete]
end
ARM1[Coder Arm] -->|Publish| CHANNEL1
ARM2[Judge Arm] -->|Subscribe| CHANNEL1
ARM2 -->|Publish| CHANNEL2
ARM3[Orchestrator] -->|Subscribe| CHANNEL2
ARM3 -->|Publish| CHANNEL3
ARM4[Webhook Service] -->|Subscribe| CHANNEL3
Implementation:
# event_bus/client.py
from typing import Callable, Awaitable
import redis.asyncio as redis
from pydantic import BaseModel
import structlog
import json
logger = structlog.get_logger()
class Event(BaseModel):
"""Base event model."""
event_type: str
source_arm: str
timestamp: str
data: dict
class EventBus:
"""
Redis-based event bus for arm-to-arm communication.
Uses pub/sub for loose coupling between arms.
"""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.pub_client = None
self.sub_client = None
self.handlers = {}
async def connect(self):
"""Connect to Redis."""
self.pub_client = await redis.from_url(self.redis_url)
self.sub_client = await redis.from_url(self.redis_url)
logger.info("event_bus.connected")
async def publish(self, channel: str, event: Event):
"""
Publish event to channel.
Args:
channel: Channel name (e.g., "code.generated")
event: Event to publish
"""
await self.pub_client.publish(
channel,
event.json()
)
logger.info(
"event.published",
channel=channel,
event_type=event.event_type,
source=event.source_arm
)
async def subscribe(
self,
channel: str,
handler: Callable[[Event], Awaitable[None]]
):
"""
Subscribe to channel and process events.
Args:
channel: Channel to subscribe to
handler: Async function to process events
"""
pubsub = self.sub_client.pubsub()
await pubsub.subscribe(channel)
logger.info("event.subscribed", channel=channel)
async for message in pubsub.listen():
if message["type"] == "message":
try:
event = Event(**json.loads(message["data"]))
logger.info(
"event.received",
channel=channel,
event_type=event.event_type
)
await handler(event)
except Exception as e:
logger.error(
"event.handler.error",
channel=channel,
error=str(e)
)
async def close(self):
"""Close connections."""
if self.pub_client:
await self.pub_client.close()
if self.sub_client:
await self.sub_client.close()
# Example: Coder Arm publishes events
class CoderArmWithEvents:
"""Coder Arm that publishes events."""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
async def generate_code(self, task: TaskContract) -> dict:
"""Generate code and publish event."""
code = await self._generate(task.goal)
result = {
"task_id": task.task_id,
"code": code,
"language": "python"
}
# Publish event
await self.event_bus.publish(
channel="code.generated",
event=Event(
event_type="code.generated",
source_arm="coder",
timestamp=datetime.utcnow().isoformat(),
data=result
)
)
return result
# Example: Judge Arm subscribes to events
class JudgeArmWithEvents:
"""Judge Arm that reacts to code generation events."""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
async def start_listening(self):
"""Start listening for code generation events."""
await self.event_bus.subscribe(
channel="code.generated",
handler=self.handle_code_generated
)
async def handle_code_generated(self, event: Event):
"""
React to code generation event.
Automatically validates newly generated code.
"""
logger.info(
"judge.event.received",
task_id=event.data.get("task_id")
)
# Validate code
code = event.data.get("code")
is_valid = await self._validate(code)
# Publish validation result
await self.event_bus.publish(
channel="validation.complete",
event=Event(
event_type="validation.complete",
source_arm="judge",
timestamp=datetime.utcnow().isoformat(),
data={
"task_id": event.data.get("task_id"),
"is_valid": is_valid,
"original_event": event.data
}
)
)
# Usage
async def run_event_driven_system():
"""Run event-driven arm system."""
event_bus = EventBus(redis_url="redis://localhost:6379")
await event_bus.connect()
# Start Judge Arm listening
judge = JudgeArmWithEvents(event_bus)
asyncio.create_task(judge.start_listening())
# Coder Arm generates code (triggers event)
coder = CoderArmWithEvents(event_bus)
await coder.generate_code(
TaskContract(
task_id="task-123",
goal="Write a function to sort a list"
)
)
# Event flows automatically:
# Coder --[code.generated]--> Judge --[validation.complete]--> Orchestrator
Best Practices:
- Use structured event schemas (Pydantic models)
- Include timestamp and source in all events
- Handle failures gracefully (dead letter queue)
- Log all published and received events
- Consider event ordering guarantees
Orchestrator Integration
Patterns for integrating with the central orchestrator.
Task Submission Pattern
Use Case: Submit tasks to orchestrator for processing.
Implementation:
# client/orchestrator_client.py
class OrchestratorClient:
"""Client for submitting tasks to orchestrator."""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient()
async def submit_task(
self,
goal: str,
constraints: List[str] = None,
priority: str = "medium",
budget: dict = None
) -> dict:
"""
Submit task to orchestrator.
Args:
goal: Natural language task description
constraints: Hard constraints
priority: Task priority (low, medium, high, critical)
budget: Resource limits
Returns:
Task ID and estimated completion time
"""
payload = {
"goal": goal,
"constraints": constraints or [],
"priority": priority,
"budget": budget or {
"max_tokens": 4000,
"max_time_seconds": 30
},
"acceptance_criteria": []
}
response = await self.client.post(
f"{self.base_url}/api/v1/tasks",
json=payload
)
response.raise_for_status()
return response.json()
async def get_task_status(self, task_id: str) -> dict:
"""Get task status and results."""
response = await self.client.get(
f"{self.base_url}/api/v1/tasks/{task_id}"
)
response.raise_for_status()
return response.json()
async def wait_for_completion(
self,
task_id: str,
timeout: int = 300,
poll_interval: float = 2.0
) -> dict:
"""
Wait for task to complete.
Args:
task_id: Task ID to wait for
timeout: Maximum wait time in seconds
poll_interval: Time between status checks
Returns:
Final task result
"""
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise TimeoutError(f"Task {task_id} did not complete within {timeout}s")
status = await self.get_task_status(task_id)
if status["status"] in ["completed", "failed"]:
return status
await asyncio.sleep(poll_interval)
# Usage
async def main():
client = OrchestratorClient(base_url="http://localhost:8001")
# Submit task
task = await client.submit_task(
goal="Find and fix bugs in auth/login.py",
constraints=["No database schema changes"],
priority="high"
)
print(f"Task submitted: {task['task_id']}")
# Wait for completion
result = await client.wait_for_completion(task["task_id"])
print(f"Task complete: {result['result']}")
Arm Registration Pattern
Use Case: Register new arms with orchestrator dynamically.
Implementation:
# arm/registration.py
from dataclasses import dataclass
from typing import List
@dataclass
class ArmCapability:
"""Capability definition for arm registration."""
capability_name: str
description: str
input_schema: dict
output_schema: dict
cost_tier: int # 1-5, higher = more expensive
avg_latency_ms: int
class ArmRegistry:
"""Arm registry client for dynamic registration."""
def __init__(self, registry_url: str):
self.registry_url = registry_url
async def register_arm(
self,
arm_id: str,
arm_type: str,
endpoint: str,
capabilities: List[ArmCapability],
health_check_endpoint: str = "/health"
):
"""
Register arm with orchestrator.
Args:
arm_id: Unique arm identifier
arm_type: Arm type (planner, coder, executor, etc.)
endpoint: HTTP endpoint for task execution
capabilities: List of arm capabilities
health_check_endpoint: Health check endpoint
"""
payload = {
"arm_id": arm_id,
"arm_type": arm_type,
"endpoint": endpoint,
"health_check_endpoint": health_check_endpoint,
"capabilities": [
{
"capability_name": cap.capability_name,
"description": cap.description,
"input_schema": cap.input_schema,
"output_schema": cap.output_schema,
"cost_tier": cap.cost_tier,
"avg_latency_ms": cap.avg_latency_ms
}
for cap in capabilities
],
"metadata": {
"version": "1.0.0",
"registered_at": datetime.utcnow().isoformat()
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.registry_url}/registry/arms",
json=payload
)
response.raise_for_status()
logger.info("arm.registered", arm_id=arm_id, arm_type=arm_type)
# Usage in arm startup
async def startup_arm():
"""Register arm on startup."""
registry = ArmRegistry(registry_url="http://orchestrator:8000")
await registry.register_arm(
arm_id="coder-001",
arm_type="coder",
endpoint="http://coder-arm:8080/execute",
capabilities=[
ArmCapability(
capability_name="code_generation",
description="Generate code from natural language",
input_schema={"goal": "string", "language": "string"},
output_schema={"code": "string", "confidence": "float"},
cost_tier=4,
avg_latency_ms=5000
),
ArmCapability(
capability_name="code_refactoring",
description="Refactor existing code",
input_schema={"code": "string", "style": "string"},
output_schema={"refactored_code": "string"},
cost_tier=3,
avg_latency_ms=3000
)
]
)
External API Integration
Patterns for integrating with external APIs (OpenAI, GitHub, etc.).
HTTP Client Pattern
Implementation:
# external/api_client.py
from tenacity import retry, stop_after_attempt, wait_exponential
import httpx
class ExternalAPIClient:
"""Base client for external API integration."""
def __init__(
self,
base_url: str,
api_key: str,
timeout: int = 60,
max_retries: int = 3
):
self.base_url = base_url
self.api_key = api_key
self.client = httpx.AsyncClient(
base_url=base_url,
timeout=httpx.Timeout(timeout),
headers={"Authorization": f"Bearer {api_key}"}
)
self.max_retries = max_retries
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def request(
self,
method: str,
endpoint: str,
**kwargs
) -> dict:
"""
Make HTTP request with automatic retries.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint
**kwargs: Additional request parameters
Returns:
Parsed JSON response
"""
logger.info(
"external_api.request",
method=method,
endpoint=endpoint
)
response = await self.client.request(
method=method,
url=endpoint,
**kwargs
)
response.raise_for_status()
logger.info(
"external_api.success",
method=method,
endpoint=endpoint,
status=response.status_code
)
return response.json()
# Example: OpenAI API Client
class OpenAIClient(ExternalAPIClient):
"""Client for OpenAI API."""
def __init__(self, api_key: str):
super().__init__(
base_url="https://api.openai.com/v1",
api_key=api_key
)
async def chat_completion(
self,
messages: List[dict],
model: str = "gpt-4",
temperature: float = 0.7
) -> dict:
"""Request chat completion."""
return await self.request(
method="POST",
endpoint="/chat/completions",
json={
"model": model,
"messages": messages,
"temperature": temperature
}
)
# Example: GitHub API Client
class GitHubClient(ExternalAPIClient):
"""Client for GitHub API."""
def __init__(self, token: str):
super().__init__(
base_url="https://api.github.com",
api_key=token
)
self.client.headers["Accept"] = "application/vnd.github.v3+json"
async def get_repository(self, owner: str, repo: str) -> dict:
"""Get repository information."""
return await self.request(
method="GET",
endpoint=f"/repos/{owner}/{repo}"
)
async def list_issues(
self,
owner: str,
repo: str,
state: str = "open"
) -> List[dict]:
"""List repository issues."""
return await self.request(
method="GET",
endpoint=f"/repos/{owner}/{repo}/issues",
params={"state": state}
)
Circuit Breaker Pattern
Use Case: Prevent cascading failures from external service outages.
Implementation:
# resilience/circuit_breaker.py
from enum import Enum
from datetime import datetime, timedelta
import structlog
logger = structlog.get_logger()
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""
Circuit breaker for external service calls.
Prevents cascading failures by stopping requests to failing services.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs):
"""
Execute function with circuit breaker protection.
Args:
func: Async function to execute
*args, **kwargs: Function arguments
Returns:
Function result
Raises:
CircuitBreakerOpenError: If circuit is open
"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
logger.info("circuit_breaker.half_open")
else:
logger.warning("circuit_breaker.open")
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset."""
return (
self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
"""Handle successful call."""
if self.state == CircuitState.HALF_OPEN:
logger.info("circuit_breaker.closed")
self.state = CircuitState.CLOSED
self.failure_count = 0
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = datetime.now()
logger.warning(
"circuit_breaker.failure",
failure_count=self.failure_count,
threshold=self.failure_threshold
)
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error("circuit_breaker.open")
# Usage
async def call_external_api_with_circuit_breaker():
"""Example: Protect external API call."""
circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=httpx.HTTPError
)
try:
result = await circuit_breaker.call(
external_api_call,
param1="value1"
)
return result
except CircuitBreakerOpenError:
# Circuit is open, use fallback
return fallback_response()
Database Integration
Patterns for working with PostgreSQL, Qdrant, and Redis.
PostgreSQL Knowledge Graph
Implementation (see earlier in document - Shared Memory Pattern section)
Transaction Patterns
Use Case: Atomic operations across multiple tables.
Implementation:
# database/transactions.py
async def atomic_knowledge_update(
pool: asyncpg.Pool,
entities: List[dict],
relationships: List[dict]
):
"""
Atomically update knowledge graph.
All entities and relationships are inserted within a transaction.
If any operation fails, all changes are rolled back.
"""
async with pool.acquire() as conn:
async with conn.transaction():
# Insert entities
entity_ids = []
for entity in entities:
entity_id = await conn.fetchval(
"""
INSERT INTO entities (entity_type, name, properties)
VALUES ($1, $2, $3)
RETURNING id
""",
entity["type"],
entity["name"],
json.dumps(entity["properties"])
)
entity_ids.append(entity_id)
# Insert relationships
for rel in relationships:
await conn.execute(
"""
INSERT INTO relationships (from_entity_id, to_entity_id, relationship_type)
VALUES ($1, $2, $3)
""",
entity_ids[rel["from_index"]],
entity_ids[rel["to_index"]],
rel["type"]
)
logger.info(
"knowledge_graph.updated",
entities_count=len(entities),
relationships_count=len(relationships)
)
Message Queue Patterns
Async Task Processing
Use Case: Offload long-running tasks to background workers.
Architecture:
flowchart LR
API[API Server] -->|Enqueue Task| REDIS[(Redis Queue)]
REDIS -->|Dequeue| WORKER1[Worker 1]
REDIS -->|Dequeue| WORKER2[Worker 2]
REDIS -->|Dequeue| WORKER3[Worker 3]
WORKER1 -->|Store Result| DB[(Database)]
WORKER2 -->|Store Result| DB
WORKER3 -->|Store Result| DB
Implementation:
# queue/task_queue.py
from rq import Queue
from redis import Redis
import structlog
logger = structlog.get_logger()
# Connect to Redis
redis_conn = Redis(host='localhost', port=6379, db=0)
task_queue = Queue('octollm_tasks', connection=redis_conn)
def enqueue_task(func: Callable, *args, **kwargs) -> str:
"""
Enqueue task for background processing.
Args:
func: Function to execute
*args, **kwargs: Function arguments
Returns:
Job ID
"""
job = task_queue.enqueue(func, *args, **kwargs)
logger.info("task.enqueued", job_id=job.id, func=func.__name__)
return job.id
def get_task_result(job_id: str):
"""Get result of completed task."""
from rq.job import Job
job = Job.fetch(job_id, connection=redis_conn)
if job.is_finished:
return job.result
elif job.is_failed:
raise Exception(f"Task failed: {job.exc_info}")
else:
return None # Still processing
# Example: Long-running code generation
def generate_code_background(goal: str, constraints: list) -> dict:
"""Background task for code generation."""
# This runs in a separate worker process
logger.info("background_task.start", goal=goal)
# Expensive operation
code = generate_code(goal, constraints)
logger.info("background_task.complete")
return {"code": code, "status": "complete"}
# Usage
async def handle_code_generation_request(request: dict):
"""API endpoint handler."""
# Enqueue task (returns immediately)
job_id = enqueue_task(
generate_code_background,
goal=request["goal"],
constraints=request.get("constraints", [])
)
return {
"job_id": job_id,
"status": "queued",
"message": "Code generation started"
}
async def check_code_generation_status(job_id: str):
"""Check status of background task."""
result = get_task_result(job_id)
if result is None:
return {"status": "processing"}
else:
return {"status": "complete", "result": result}
Priority Queue Pattern
Use Case: Process high-priority tasks first.
Implementation:
# queue/priority_queue.py
from rq import Queue
# Create priority queues
high_priority_queue = Queue('high', connection=redis_conn)
default_queue = Queue('default', connection=redis_conn)
low_priority_queue = Queue('low', connection=redis_conn)
def enqueue_with_priority(func: Callable, priority: str, *args, **kwargs):
"""Enqueue task with priority."""
queue_map = {
"high": high_priority_queue,
"medium": default_queue,
"low": low_priority_queue
}
queue = queue_map.get(priority, default_queue)
job = queue.enqueue(func, *args, **kwargs)
logger.info(
"task.enqueued",
job_id=job.id,
priority=priority,
func=func.__name__
)
return job.id
# Worker startup (prioritize high queue)
# $ rq worker high default low
Webhook Integration
Callback Registration
Use Case: Notify external systems when tasks complete.
Implementation:
# webhook/client.py
class WebhookClient:
"""Client for sending webhook notifications."""
def __init__(self):
self.client = httpx.AsyncClient(timeout=10)
async def send_webhook(
self,
url: str,
event_type: str,
payload: dict,
secret: Optional[str] = None
):
"""
Send webhook notification.
Args:
url: Webhook URL
event_type: Event type (e.g., "task.completed")
payload: Event payload
secret: Optional HMAC secret for signature
"""
headers = {
"Content-Type": "application/json",
"X-Event-Type": event_type,
"X-Timestamp": datetime.utcnow().isoformat()
}
# Add HMAC signature if secret provided
if secret:
signature = self._compute_signature(payload, secret)
headers["X-Signature"] = signature
try:
response = await self.client.post(
url,
json=payload,
headers=headers
)
response.raise_for_status()
logger.info(
"webhook.sent",
url=url,
event_type=event_type,
status=response.status_code
)
except httpx.HTTPError as e:
logger.error(
"webhook.failed",
url=url,
error=str(e)
)
# Queue for retry
await self._queue_retry(url, event_type, payload, secret)
def _compute_signature(self, payload: dict, secret: str) -> str:
"""Compute HMAC signature for webhook."""
import hmac
import hashlib
message = json.dumps(payload, sort_keys=True).encode()
signature = hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
return f"sha256={signature}"
async def _queue_retry(
self,
url: str,
event_type: str,
payload: dict,
secret: Optional[str]
):
"""Queue webhook for retry."""
# Store in Redis for background retry
retry_data = {
"url": url,
"event_type": event_type,
"payload": payload,
"secret": secret,
"retry_count": 0,
"queued_at": datetime.utcnow().isoformat()
}
await redis_client.lpush(
"webhook:retry_queue",
json.dumps(retry_data)
)
# Usage in orchestrator
async def notify_task_completion(task_id: str, result: dict):
"""Notify registered webhooks of task completion."""
# Get registered webhooks for this task
webhooks = await get_task_webhooks(task_id)
webhook_client = WebhookClient()
for webhook in webhooks:
await webhook_client.send_webhook(
url=webhook["url"],
event_type="task.completed",
payload={
"task_id": task_id,
"status": "completed",
"result": result,
"completed_at": datetime.utcnow().isoformat()
},
secret=webhook.get("secret")
)
Batch Processing
Bulk Operation Pattern
Use Case: Process large datasets efficiently.
Implementation:
# batch/processor.py
from typing import List, Callable, TypeVar, Generic
import asyncio
T = TypeVar('T')
R = TypeVar('R')
class BatchProcessor(Generic[T, R]):
"""
Process items in batches for efficiency.
Useful for bulk database operations, API calls with rate limits, etc.
"""
def __init__(
self,
batch_size: int = 100,
max_concurrent: int = 5
):
self.batch_size = batch_size
self.max_concurrent = max_concurrent
async def process_batches(
self,
items: List[T],
processor: Callable[[List[T]], Awaitable[List[R]]]
) -> List[R]:
"""
Process items in batches.
Args:
items: List of items to process
processor: Async function that processes a batch
Returns:
List of all results
"""
logger.info(
"batch.start",
total_items=len(items),
batch_size=self.batch_size
)
# Split into batches
batches = [
items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)
]
logger.info("batch.created", batch_count=len(batches))
# Process batches with concurrency limit
semaphore = asyncio.Semaphore(self.max_concurrent)
async def process_batch_with_semaphore(batch):
async with semaphore:
return await processor(batch)
# Execute all batches
results = await asyncio.gather(*[
process_batch_with_semaphore(batch)
for batch in batches
])
# Flatten results
flattened = [item for batch_result in results for item in batch_result]
logger.info("batch.complete", results_count=len(flattened))
return flattened
# Example: Bulk embedding generation
async def generate_embeddings_batch(texts: List[str]) -> List[List[float]]:
"""Generate embeddings for a batch of texts."""
# Call OpenAI API with batch
response = await openai_client.create_embeddings(
input=texts,
model="text-embedding-ada-002"
)
return [item.embedding for item in response.data]
# Usage
async def embed_large_dataset(texts: List[str]):
"""Embed 10,000 texts efficiently."""
processor = BatchProcessor(batch_size=100, max_concurrent=5)
embeddings = await processor.process_batches(
items=texts,
processor=generate_embeddings_batch
)
# Store in vector database
await store_embeddings(embeddings)
Real-Time Streaming
WebSocket Pattern
Use Case: Real-time bidirectional communication.
Implementation:
# streaming/websocket.py
from fastapi import WebSocket, WebSocketDisconnect
import structlog
logger = structlog.get_logger()
class ConnectionManager:
"""Manage WebSocket connections."""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, client_id: str, websocket: WebSocket):
"""Accept new WebSocket connection."""
await websocket.accept()
self.active_connections[client_id] = websocket
logger.info("websocket.connected", client_id=client_id)
def disconnect(self, client_id: str):
"""Remove disconnected client."""
if client_id in self.active_connections:
del self.active_connections[client_id]
logger.info("websocket.disconnected", client_id=client_id)
async def send_message(self, client_id: str, message: dict):
"""Send message to specific client."""
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
await websocket.send_json(message)
async def broadcast(self, message: dict):
"""Broadcast message to all connected clients."""
for client_id, websocket in self.active_connections.items():
try:
await websocket.send_json(message)
except Exception as e:
logger.error(
"websocket.broadcast.error",
client_id=client_id,
error=str(e)
)
# FastAPI WebSocket endpoint
from fastapi import FastAPI
app = FastAPI()
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
"""WebSocket endpoint for real-time updates."""
await manager.connect(client_id, websocket)
try:
while True:
# Receive message from client
data = await websocket.receive_json()
logger.info(
"websocket.message.received",
client_id=client_id,
message_type=data.get("type")
)
# Handle message
if data["type"] == "subscribe":
# Subscribe to task updates
task_id = data["task_id"]
await subscribe_to_task_updates(client_id, task_id)
elif data["type"] == "ping":
# Respond with pong
await manager.send_message(client_id, {"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(client_id)
# Send updates to subscribed clients
async def notify_task_progress(task_id: str, progress: dict):
"""Send task progress update via WebSocket."""
# Get subscribed clients
subscribers = await get_task_subscribers(task_id)
message = {
"type": "task.progress",
"task_id": task_id,
"progress": progress,
"timestamp": datetime.utcnow().isoformat()
}
for client_id in subscribers:
await manager.send_message(client_id, message)
Server-Sent Events (SSE)
Use Case: One-way streaming from server to client.
Implementation:
# streaming/sse.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.get("/stream/tasks/{task_id}")
async def stream_task_updates(task_id: str):
"""Stream task updates using Server-Sent Events."""
async def event_generator():
"""Generate SSE events."""
while True:
# Get current task status
status = await get_task_status(task_id)
# Format as SSE
yield f"data: {json.dumps(status)}\n\n"
# Stop if task complete
if status["status"] in ["completed", "failed"]:
break
# Wait before next update
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
# Client-side usage (JavaScript)
"""
const eventSource = new EventSource('/stream/tasks/task-123');
eventSource.onmessage = (event) => {
const status = JSON.parse(event.data);
console.log('Task progress:', status.progress);
if (status.status === 'completed') {
eventSource.close();
}
};
"""
Testing Integration
Mocking External Services
Implementation:
# tests/conftest.py
import pytest
from unittest.mock import AsyncMock, Mock
import httpx
@pytest.fixture
def mock_openai_client():
"""Mock OpenAI API client."""
client = AsyncMock()
client.chat_completion.return_value = {
"choices": [{
"message": {
"content": "Mocked response"
}
}]
}
return client
@pytest.fixture
def mock_arm_client():
"""Mock arm client for testing."""
client = AsyncMock()
client.execute.return_value = {
"result": "Mocked arm result",
"confidence": 0.95
}
return client
# Test using mocks
@pytest.mark.asyncio
async def test_orchestrator_with_mocked_arms(mock_arm_client):
"""Test orchestrator using mocked arms."""
orchestrator = Orchestrator(arm_registry={
"coder": mock_arm_client
})
result = await orchestrator.execute_task(
TaskContract(
task_id="test-123",
goal="Test goal"
)
)
# Verify arm was called
mock_arm_client.execute.assert_called_once()
# Verify result
assert result["status"] == "completed"
Contract Testing
Use Case: Verify API contracts between components.
Implementation:
# tests/contract_tests.py
import pytest
from pydantic import ValidationError
def test_task_contract_validation():
"""Test TaskContract schema validation."""
# Valid contract
valid_task = TaskContract(
task_id="task-123e4567-e89b-12d3-a456-426614174000",
goal="Write a function to sort a list",
constraints=["No external libraries"],
priority="medium"
)
assert valid_task.task_id.startswith("task-")
# Invalid contract (missing required field)
with pytest.raises(ValidationError):
TaskContract(
task_id="task-123",
# Missing 'goal' field
constraints=[]
)
# Invalid contract (wrong format)
with pytest.raises(ValidationError):
TaskContract(
task_id="invalid-id-format", # Should start with 'task-'
goal="Test"
)
def test_arm_response_contract():
"""Test arm response matches expected contract."""
response = ArmResponse(
result={"code": "print('hello')"},
confidence=0.95,
provenance=ProvenanceMetadata(
arm_id="coder",
timestamp=datetime.utcnow().isoformat(),
action_type="code_generation",
command_hash="abc123"
)
)
assert 0.0 <= response.confidence <= 1.0
assert response.provenance.arm_id == "coder"
Summary
This guide covered 10 major integration patterns for OctoLLM:
| Pattern Category | Key Takeaways |
|---|---|
| Arm-to-Arm | Use direct HTTP for low latency, orchestrator-mediated for visibility, shared memory for async |
| Orchestrator | Submit tasks via REST API, register arms dynamically, use swarm for parallel execution |
| External API | Use circuit breakers, implement retries, respect rate limits |
| Database | PostgreSQL for knowledge graph, Qdrant for vectors, Redis for cache |
| Message Queue | Use priority queues, implement dead letter queues, track progress |
| Webhook | Sign payloads with HMAC, implement retry logic, validate endpoints |
| Batch | Process in chunks, limit concurrency, track progress |
| Streaming | Use WebSocket for bidirectional, SSE for server-to-client, handle backpressure |
| Testing | Mock external services, test contracts, integration test patterns |
Best Practices Summary
- Always use structured logging with context
- Implement retries with exponential backoff
- Use circuit breakers for external services
- Validate all inputs with Pydantic schemas
- Set appropriate timeouts (typically 30-60s)
- Include request IDs for tracing
- Handle errors gracefully with fallbacks
- Test integrations with mocks and contracts
- Monitor all integrations with metrics
- Document API contracts with OpenAPI
Next Steps
- Orchestrator Implementation - Build the orchestrator
- Custom Arms Guide - Create specialized arms
- Memory Systems - Implement distributed memory
- Testing Guide - Test your integrations
- Deployment Guide - Deploy to production
Document Maintainers: OctoLLM Core Team Last Updated: 2025-11-10 Next Review: 2025-12-10