Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Integration Patterns for OctoLLM

Document: Implementation Guide Version: 1.0 Last Updated: 2025-11-10 Estimated Time: 60-90 minutes

← Back to Documentation | Implementation Guides


Table of Contents

  1. Overview
  2. Arm-to-Arm Communication
  3. Orchestrator Integration
  4. External API Integration
  5. Database Integration
  6. Message Queue Patterns
  7. Webhook Integration
  8. Batch Processing
  9. Real-Time Streaming
  10. Testing Integration

Overview

This guide provides comprehensive integration patterns for building and connecting OctoLLM components. Each pattern includes concrete code examples, architectural diagrams, error handling strategies, and best practices.

Integration Philosophy

OctoLLM follows these integration principles:

  1. Loose Coupling: Components communicate through well-defined contracts
  2. Resilience: Graceful degradation and automatic recovery
  3. Observability: All integrations are traceable and measurable
  4. Security: Defense-in-depth with capability-based access control
  5. Performance: Async-first with intelligent caching

Design Principles

graph TD
    subgraph "Integration Principles"
        A[Contract-First<br/>API Design]
        B[Fail Fast<br/>with Retries]
        C[Observable<br/>by Default]
        D[Capability-Based<br/>Security]
    end

    subgraph "Implementation"
        E[Pydantic Schemas]
        F[Tenacity Retries]
        G[Structlog Logging]
        H[JWT Tokens]
    end

    A --> E
    B --> F
    C --> G
    D --> H

Pattern Categories

CategoryUse CaseComplexityExamples
Arm-to-ArmDirect collaborationMediumCoder → Judge validation
OrchestratorCentral coordinationHighTask routing, aggregation
External APIThird-party servicesMediumOpenAI API, GitHub API
DatabaseData persistenceMediumPostgreSQL, Qdrant, Redis
Message QueueAsync processingHighTask queues, events
WebhookEvent notificationsLowStatus updates, callbacks
BatchBulk operationsMediumMass data processing
StreamingReal-time updatesHighWebSocket, SSE

Arm-to-Arm Communication

Arms can communicate directly or through the orchestrator. The choice depends on coupling requirements, security constraints, and performance needs.

Direct HTTP Communication

Use Case: Fast, direct collaboration between arms when orchestrator mediation is unnecessary.

When to Use:

  • Low-latency requirements
  • Arm trust established
  • Simple request/response pattern
  • No complex orchestration needed

Architecture:

sequenceDiagram
    participant Coder as Coder Arm
    participant Judge as Judge Arm
    participant Memory as Shared Memory

    Coder->>Coder: Generate code
    Coder->>Judge: POST /validate
    Note over Judge: Validate code quality,<br/>security, style
    Judge->>Memory: Store validation report
    Judge-->>Coder: ValidationResult
    Coder->>Coder: Apply fixes if needed

Implementation:

# coder_arm/client.py
import httpx
from typing import Optional
from pydantic import BaseModel, HttpUrl
import structlog

logger = structlog.get_logger()

class ValidationRequest(BaseModel):
    """Request schema for code validation."""
    code: str
    language: str
    context: dict
    validation_rules: list[str] = []

class ValidationResult(BaseModel):
    """Response from Judge Arm."""
    is_valid: bool
    confidence: float
    issues: list[dict]
    suggestions: list[str]
    execution_time_ms: int

class JudgeArmClient:
    """Client for direct Judge Arm communication."""

    def __init__(
        self,
        base_url: HttpUrl,
        timeout: int = 30,
        retries: int = 3
    ):
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            timeout=httpx.Timeout(timeout),
            limits=httpx.Limits(max_connections=10)
        )
        self.retries = retries

    async def validate_code(
        self,
        request: ValidationRequest
    ) -> ValidationResult:
        """
        Send code to Judge Arm for validation.

        Args:
            request: Validation request with code and context

        Returns:
            ValidationResult with issues and suggestions

        Raises:
            httpx.HTTPError: On communication failure
        """
        logger.info(
            "judge.validate.request",
            language=request.language,
            code_length=len(request.code)
        )

        for attempt in range(self.retries):
            try:
                response = await self.client.post(
                    f"{self.base_url}/validate",
                    json=request.dict(),
                    headers={
                        "Content-Type": "application/json",
                        "X-Arm-ID": "coder-001",
                        "X-Request-ID": str(uuid4())
                    }
                )
                response.raise_for_status()

                result = ValidationResult(**response.json())
                logger.info(
                    "judge.validate.success",
                    is_valid=result.is_valid,
                    confidence=result.confidence,
                    issues_count=len(result.issues)
                )
                return result

            except httpx.HTTPError as e:
                logger.warning(
                    "judge.validate.retry",
                    attempt=attempt + 1,
                    error=str(e)
                )
                if attempt == self.retries - 1:
                    logger.error(
                        "judge.validate.failed",
                        error=str(e)
                    )
                    raise

                await asyncio.sleep(2 ** attempt)  # Exponential backoff

    async def close(self):
        """Close HTTP client."""
        await self.client.aclose()

# Usage in Coder Arm
async def generate_and_validate(task: TaskContract) -> dict:
    """Generate code and validate it."""
    # Step 1: Generate code
    code = await generate_code(task.goal)

    # Step 2: Validate with Judge Arm
    judge_client = JudgeArmClient(base_url="http://judge-arm:8080")
    try:
        validation = await judge_client.validate_code(
            ValidationRequest(
                code=code,
                language="python",
                context=task.context,
                validation_rules=["security", "style", "complexity"]
            )
        )

        # Step 3: Apply fixes if needed
        if not validation.is_valid:
            code = await apply_fixes(code, validation.suggestions)
            # Re-validate
            validation = await judge_client.validate_code(...)

        return {
            "code": code,
            "validation": validation.dict(),
            "confidence": validation.confidence
        }

    finally:
        await judge_client.close()

Error Handling:

# Error handling wrapper
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)

class ArmCommunicationError(Exception):
    """Base exception for arm communication errors."""
    pass

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(httpx.NetworkError)
)
async def resilient_arm_call(client, endpoint, payload):
    """
    Make resilient HTTP call to another arm.

    Automatically retries on network errors with exponential backoff.
    """
    try:
        response = await client.post(endpoint, json=payload)
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        if e.response.status_code >= 500:
            # Retry on server errors
            raise
        else:
            # Don't retry on client errors
            raise ArmCommunicationError(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.NetworkError as e:
        logger.error("arm.communication.network_error", error=str(e))
        raise

Best Practices:

  • Use connection pooling for frequent communication
  • Implement circuit breaker for failing arms
  • Always include request IDs for tracing
  • Set appropriate timeouts (typically 30s)
  • Log all communication attempts

Orchestrator-Mediated Pattern

Use Case: When orchestrator needs full visibility and control over arm collaboration.

When to Use:

  • Complex multi-step workflows
  • Need for result aggregation
  • Security isolation requirements
  • Orchestrator needs to track dependencies

Architecture:

sequenceDiagram
    participant Orch as Orchestrator
    participant Planner as Planner Arm
    participant Retriever as Retriever Arm
    participant Coder as Coder Arm
    participant Judge as Judge Arm

    Orch->>Planner: Decompose task
    Planner-->>Orch: Plan with 3 steps

    Note over Orch: Step 1: Research

    Orch->>Retriever: Search documentation
    Retriever-->>Orch: Search results

    Note over Orch: Step 2: Code generation

    Orch->>Coder: Generate code<br/>(with retrieval context)
    Coder-->>Orch: Generated code

    Note over Orch: Step 3: Validation

    Orch->>Judge: Validate code
    Judge-->>Orch: Validation result

    Orch->>Orch: Aggregate results
    Orch-->>Orch: Complete task

Implementation:

# orchestrator/workflow.py
from typing import List, Dict, Any
from dataclasses import dataclass
import structlog

logger = structlog.get_logger()

@dataclass
class WorkflowStep:
    """Single step in orchestrated workflow."""
    step_id: str
    arm_type: str
    input_data: dict
    dependencies: List[str] = None
    status: str = "pending"  # pending, running, complete, failed
    result: Any = None
    error: str = None

class OrchestratedWorkflow:
    """
    Orchestrator-mediated workflow execution.

    The orchestrator maintains full control and visibility.
    """

    def __init__(self, arm_registry: dict):
        self.arm_registry = arm_registry
        self.step_results = {}

    async def execute_workflow(
        self,
        steps: List[WorkflowStep],
        task_context: dict
    ) -> Dict[str, Any]:
        """
        Execute multi-step workflow with dependency resolution.

        Args:
            steps: List of workflow steps
            task_context: Shared context across steps

        Returns:
            Aggregated workflow result
        """
        logger.info(
            "workflow.start",
            total_steps=len(steps),
            task_id=task_context.get("task_id")
        )

        # Build dependency graph
        dep_graph = self._build_dependency_graph(steps)

        # Execute in topological order
        execution_order = self._topological_sort(dep_graph)

        for step_id in execution_order:
            step = next(s for s in steps if s.step_id == step_id)

            # Wait for dependencies
            await self._wait_for_dependencies(step, steps)

            # Enrich input with dependency results
            enriched_input = self._enrich_with_dependencies(
                step,
                task_context
            )

            # Execute step
            try:
                logger.info("workflow.step.start", step_id=step_id, arm=step.arm_type)
                step.status = "running"

                result = await self._execute_arm(
                    arm_type=step.arm_type,
                    input_data=enriched_input
                )

                step.result = result
                step.status = "complete"
                self.step_results[step_id] = result

                logger.info("workflow.step.complete", step_id=step_id)

            except Exception as e:
                step.status = "failed"
                step.error = str(e)
                logger.error(
                    "workflow.step.failed",
                    step_id=step_id,
                    error=str(e)
                )

                # Decide whether to continue or abort
                if step.dependencies:
                    # Critical step failed, abort workflow
                    raise

        # Aggregate results
        final_result = self._aggregate_results(steps, task_context)

        logger.info("workflow.complete", task_id=task_context.get("task_id"))
        return final_result

    async def _execute_arm(
        self,
        arm_type: str,
        input_data: dict
    ) -> dict:
        """
        Execute a single arm with input data.

        Args:
            arm_type: Type of arm (e.g., "retriever", "coder")
            input_data: Input payload for the arm

        Returns:
            Arm execution result
        """
        arm_config = self.arm_registry[arm_type]
        endpoint = arm_config["endpoint"]

        async with httpx.AsyncClient() as client:
            response = await client.post(
                endpoint,
                json=input_data,
                timeout=arm_config.get("timeout", 60)
            )
            response.raise_for_status()
            return response.json()

    def _enrich_with_dependencies(
        self,
        step: WorkflowStep,
        context: dict
    ) -> dict:
        """
        Enrich step input with results from dependencies.

        Example:
            Step 2 (code generation) gets results from Step 1 (research).
        """
        enriched = step.input_data.copy()
        enriched["context"] = context.copy()

        if step.dependencies:
            enriched["dependency_results"] = {
                dep_id: self.step_results[dep_id]
                for dep_id in step.dependencies
                if dep_id in self.step_results
            }

        return enriched

    def _aggregate_results(
        self,
        steps: List[WorkflowStep],
        context: dict
    ) -> dict:
        """
        Combine results from all steps into final output.

        Strategies:
        - Sequential: Last step result
        - Accumulative: Merge all step results
        - Hierarchical: Nested structure
        """
        return {
            "task_id": context.get("task_id"),
            "success": all(s.status == "complete" for s in steps),
            "steps": [
                {
                    "step_id": s.step_id,
                    "arm": s.arm_type,
                    "status": s.status,
                    "result": s.result
                }
                for s in steps
            ],
            "final_result": steps[-1].result if steps else None
        }

    def _build_dependency_graph(self, steps: List[WorkflowStep]) -> dict:
        """Build directed graph of step dependencies."""
        graph = {step.step_id: step.dependencies or [] for step in steps}
        return graph

    def _topological_sort(self, graph: dict) -> List[str]:
        """Sort steps by dependencies (topological order)."""
        from collections import deque

        in_degree = {node: 0 for node in graph}
        for node in graph:
            for neighbor in graph[node]:
                in_degree[neighbor] += 1

        queue = deque([node for node in in_degree if in_degree[node] == 0])
        result = []

        while queue:
            node = queue.popleft()
            result.append(node)
            for neighbor in graph.get(node, []):
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        return result

    async def _wait_for_dependencies(
        self,
        step: WorkflowStep,
        all_steps: List[WorkflowStep]
    ):
        """Wait for all dependencies to complete."""
        if not step.dependencies:
            return

        while True:
            deps_complete = all(
                next(s for s in all_steps if s.step_id == dep_id).status == "complete"
                for dep_id in step.dependencies
            )
            if deps_complete:
                break
            await asyncio.sleep(0.1)


# Usage example
async def handle_complex_task(task: TaskContract):
    """Example: Research → Code → Validate workflow."""

    workflow = OrchestratedWorkflow(arm_registry={
        "retriever": {"endpoint": "http://retriever-arm:8080/search"},
        "coder": {"endpoint": "http://coder-arm:8080/generate"},
        "judge": {"endpoint": "http://judge-arm:8080/validate"}
    })

    steps = [
        WorkflowStep(
            step_id="research",
            arm_type="retriever",
            input_data={
                "query": task.goal,
                "max_results": 10
            },
            dependencies=None
        ),
        WorkflowStep(
            step_id="code_generation",
            arm_type="coder",
            input_data={
                "goal": task.goal,
                "language": "python"
            },
            dependencies=["research"]  # Depends on research step
        ),
        WorkflowStep(
            step_id="validation",
            arm_type="judge",
            input_data={
                "validation_rules": ["security", "style"]
            },
            dependencies=["code_generation"]  # Depends on code step
        )
    ]

    result = await workflow.execute_workflow(
        steps=steps,
        task_context={"task_id": task.task_id}
    )

    return result

Shared Memory Pattern

Use Case: Arms coordinate through shared memory instead of direct communication.

When to Use:

  • Asynchronous collaboration
  • Decoupled communication
  • Need for persistent context
  • Multiple readers/writers

Architecture:

flowchart TD
    subgraph "Shared Memory Layer"
        Redis[(Redis Cache)]
        Qdrant[(Qdrant Vector DB)]
        Postgres[(PostgreSQL KG)]
    end

    ARM1[Arm 1: Coder] -->|Write| Redis
    ARM1 -->|Write Vector| Qdrant
    ARM1 -->|Write Entity| Postgres

    ARM2[Arm 2: Judge] -->|Read| Redis
    ARM2 -->|Query Vector| Qdrant
    ARM2 -->|Query Graph| Postgres

    ARM3[Arm 3: Retriever] -->|Read| Redis
    ARM3 -->|Query Vector| Qdrant

Implementation:

# shared_memory/client.py
from typing import Optional, List, Dict, Any
import redis.asyncio as redis
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import asyncpg
import structlog

logger = structlog.get_logger()

class SharedMemoryClient:
    """
    Unified client for shared memory access across arms.

    Provides abstraction over Redis, Qdrant, and PostgreSQL.
    """

    def __init__(
        self,
        redis_url: str,
        qdrant_url: str,
        postgres_url: str
    ):
        self.redis_client = None
        self.qdrant_client = QdrantClient(url=qdrant_url)
        self.pg_pool = None
        self.redis_url = redis_url
        self.postgres_url = postgres_url

    async def connect(self):
        """Initialize connections to all backends."""
        self.redis_client = await redis.from_url(self.redis_url)
        self.pg_pool = await asyncpg.create_pool(self.postgres_url)
        logger.info("shared_memory.connected")

    # ===== Redis Operations (L1 Cache) =====

    async def cache_set(
        self,
        key: str,
        value: Any,
        ttl_seconds: int = 300
    ):
        """
        Store value in Redis cache with TTL.

        Args:
            key: Cache key (use namespaced keys, e.g., "arm:coder:result:123")
            value: Value to cache (will be JSON serialized)
            ttl_seconds: Time to live (default 5 minutes)
        """
        await self.redis_client.setex(
            key,
            ttl_seconds,
            json.dumps(value)
        )
        logger.debug("cache.set", key=key, ttl=ttl_seconds)

    async def cache_get(self, key: str) -> Optional[Any]:
        """Get value from Redis cache."""
        value = await self.redis_client.get(key)
        if value:
            logger.debug("cache.hit", key=key)
            return json.loads(value)
        logger.debug("cache.miss", key=key)
        return None

    async def cache_delete(self, pattern: str):
        """Delete keys matching pattern."""
        keys = []
        async for key in self.redis_client.scan_iter(match=pattern):
            keys.append(key)
        if keys:
            await self.redis_client.delete(*keys)
            logger.info("cache.delete", count=len(keys), pattern=pattern)

    # ===== Qdrant Operations (Vector Search) =====

    async def vector_store(
        self,
        collection_name: str,
        text: str,
        vector: List[float],
        metadata: Dict[str, Any],
        point_id: Optional[str] = None
    ):
        """
        Store text with embedding in Qdrant.

        Args:
            collection_name: Collection name (e.g., "coder_context")
            text: Original text
            vector: Embedding vector
            metadata: Additional metadata (author, timestamp, etc.)
            point_id: Optional point ID (auto-generated if not provided)
        """
        # Ensure collection exists
        collections = await self.qdrant_client.get_collections()
        if collection_name not in [c.name for c in collections.collections]:
            await self.qdrant_client.create_collection(
                collection_name=collection_name,
                vectors_config=VectorParams(
                    size=len(vector),
                    distance=Distance.COSINE
                )
            )

        point_id = point_id or str(uuid4())
        await self.qdrant_client.upsert(
            collection_name=collection_name,
            points=[
                PointStruct(
                    id=point_id,
                    vector=vector,
                    payload={"text": text, **metadata}
                )
            ]
        )
        logger.info(
            "vector.store",
            collection=collection_name,
            point_id=point_id
        )

    async def vector_search(
        self,
        collection_name: str,
        query_vector: List[float],
        limit: int = 10,
        filter_conditions: Optional[dict] = None
    ) -> List[Dict[str, Any]]:
        """
        Search for similar vectors in Qdrant.

        Args:
            collection_name: Collection to search
            query_vector: Query embedding
            limit: Maximum number of results
            filter_conditions: Optional metadata filters

        Returns:
            List of search results with text and metadata
        """
        results = await self.qdrant_client.search(
            collection_name=collection_name,
            query_vector=query_vector,
            limit=limit,
            query_filter=filter_conditions
        )

        logger.info(
            "vector.search",
            collection=collection_name,
            results_count=len(results)
        )

        return [
            {
                "id": hit.id,
                "score": hit.score,
                "text": hit.payload.get("text"),
                "metadata": {k: v for k, v in hit.payload.items() if k != "text"}
            }
            for hit in results
        ]

    # ===== PostgreSQL Operations (Knowledge Graph) =====

    async def entity_create(
        self,
        entity_type: str,
        name: str,
        properties: dict
    ) -> str:
        """
        Create entity in knowledge graph.

        Args:
            entity_type: Type (e.g., "function", "file", "bug")
            name: Entity name
            properties: Additional properties as JSONB

        Returns:
            UUID of created entity
        """
        async with self.pg_pool.acquire() as conn:
            entity_id = await conn.fetchval(
                """
                INSERT INTO entities (entity_type, name, properties)
                VALUES ($1, $2, $3)
                RETURNING id
                """,
                entity_type,
                name,
                json.dumps(properties)
            )
            logger.info(
                "entity.create",
                entity_id=str(entity_id),
                entity_type=entity_type
            )
            return str(entity_id)

    async def relationship_create(
        self,
        from_entity_id: str,
        to_entity_id: str,
        relationship_type: str,
        properties: dict = None
    ):
        """
        Create relationship between entities.

        Example: "function_A" --calls--> "function_B"
        """
        async with self.pg_pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO relationships (from_entity_id, to_entity_id, relationship_type, properties)
                VALUES ($1, $2, $3, $4)
                """,
                from_entity_id,
                to_entity_id,
                relationship_type,
                json.dumps(properties or {})
            )
            logger.info(
                "relationship.create",
                relationship_type=relationship_type
            )

    async def graph_query(
        self,
        entity_id: str,
        relationship_type: Optional[str] = None,
        max_depth: int = 2
    ) -> Dict[str, Any]:
        """
        Query knowledge graph from starting entity.

        Args:
            entity_id: Starting entity UUID
            relationship_type: Optional filter by relationship type
            max_depth: Maximum traversal depth

        Returns:
            Subgraph as nested dict
        """
        async with self.pg_pool.acquire() as conn:
            # Recursive CTE for graph traversal
            query = """
            WITH RECURSIVE graph_traversal AS (
                -- Base case: starting entity
                SELECT e.id, e.entity_type, e.name, e.properties, 0 as depth
                FROM entities e
                WHERE e.id = $1

                UNION ALL

                -- Recursive case: follow relationships
                SELECT e.id, e.entity_type, e.name, e.properties, gt.depth + 1
                FROM entities e
                INNER JOIN relationships r ON e.id = r.to_entity_id
                INNER JOIN graph_traversal gt ON r.from_entity_id = gt.id
                WHERE gt.depth < $2
                  AND ($3::text IS NULL OR r.relationship_type = $3)
            )
            SELECT * FROM graph_traversal
            """

            rows = await conn.fetch(query, entity_id, max_depth, relationship_type)

            # Build nested structure
            nodes = {str(row["id"]): dict(row) for row in rows}

            logger.info(
                "graph.query",
                start_entity=entity_id,
                nodes_found=len(nodes)
            )

            return nodes

    async def close(self):
        """Close all connections."""
        if self.redis_client:
            await self.redis_client.close()
        if self.pg_pool:
            await self.pg_pool.close()
        logger.info("shared_memory.closed")


# Usage in Arms
class CoderArm:
    """Example: Coder Arm using shared memory."""

    def __init__(self, memory: SharedMemoryClient):
        self.memory = memory

    async def generate_code(self, task: TaskContract) -> dict:
        """Generate code and store in shared memory."""

        # 1. Check cache first
        cache_key = f"arm:coder:result:{hash(task.goal)}"
        cached = await self.memory.cache_get(cache_key)
        if cached:
            return cached

        # 2. Query relevant context from vector DB
        query_embedding = await self.embed_text(task.goal)
        context = await self.memory.vector_search(
            collection_name="code_context",
            query_vector=query_embedding,
            limit=5
        )

        # 3. Generate code
        code = await self._generate(task.goal, context)

        # 4. Store in shared memory for other arms
        result = {
            "code": code,
            "language": "python",
            "timestamp": datetime.utcnow().isoformat()
        }

        # Cache in Redis (5 minutes)
        await self.memory.cache_set(cache_key, result, ttl_seconds=300)

        # Store code embedding in Qdrant
        code_embedding = await self.embed_text(code)
        await self.memory.vector_store(
            collection_name="generated_code",
            text=code,
            vector=code_embedding,
            metadata={
                "task_id": task.task_id,
                "language": "python",
                "timestamp": datetime.utcnow().isoformat()
            }
        )

        # Store entity in knowledge graph
        entity_id = await self.memory.entity_create(
            entity_type="code",
            name=f"generated_{task.task_id}",
            properties={
                "code": code,
                "task_id": task.task_id
            }
        )

        return result


class JudgeArm:
    """Example: Judge Arm reading from shared memory."""

    def __init__(self, memory: SharedMemoryClient):
        self.memory = memory

    async def validate_code(self, task: TaskContract) -> dict:
        """Validate code from shared memory."""

        # 1. Get code from cache (written by Coder Arm)
        cache_key = f"arm:coder:result:{hash(task.goal)}"
        code_result = await self.memory.cache_get(cache_key)

        if not code_result:
            raise ValueError("No code found in shared memory")

        # 2. Query similar code for comparison
        code_embedding = await self.embed_text(code_result["code"])
        similar_code = await self.memory.vector_search(
            collection_name="generated_code",
            query_vector=code_embedding,
            limit=10
        )

        # 3. Validate
        is_valid = await self._validate(code_result["code"], similar_code)

        # 4. Store validation result
        validation_result = {
            "is_valid": is_valid,
            "code_hash": hash(code_result["code"]),
            "timestamp": datetime.utcnow().isoformat()
        }

        await self.memory.cache_set(
            f"arm:judge:validation:{hash(task.goal)}",
            validation_result,
            ttl_seconds=300
        )

        return validation_result

Best Practices:

  • Use namespaced keys: arm:{arm_name}:{data_type}:{id}
  • Set appropriate TTLs for cache entries
  • Clean up expired entries periodically
  • Use transactions for related operations
  • Index frequently queried fields

Event-Driven Pattern

Use Case: Arms react to events published by other arms.

When to Use:

  • Loose coupling required
  • Fan-out notifications
  • Asynchronous processing
  • Event sourcing architecture

Architecture:

flowchart TD
    subgraph "Event Bus (Redis Pub/Sub)"
        CHANNEL1[code.generated]
        CHANNEL2[validation.complete]
        CHANNEL3[task.complete]
    end

    ARM1[Coder Arm] -->|Publish| CHANNEL1
    ARM2[Judge Arm] -->|Subscribe| CHANNEL1
    ARM2 -->|Publish| CHANNEL2
    ARM3[Orchestrator] -->|Subscribe| CHANNEL2
    ARM3 -->|Publish| CHANNEL3
    ARM4[Webhook Service] -->|Subscribe| CHANNEL3

Implementation:

# event_bus/client.py
from typing import Callable, Awaitable
import redis.asyncio as redis
from pydantic import BaseModel
import structlog
import json

logger = structlog.get_logger()

class Event(BaseModel):
    """Base event model."""
    event_type: str
    source_arm: str
    timestamp: str
    data: dict

class EventBus:
    """
    Redis-based event bus for arm-to-arm communication.

    Uses pub/sub for loose coupling between arms.
    """

    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.pub_client = None
        self.sub_client = None
        self.handlers = {}

    async def connect(self):
        """Connect to Redis."""
        self.pub_client = await redis.from_url(self.redis_url)
        self.sub_client = await redis.from_url(self.redis_url)
        logger.info("event_bus.connected")

    async def publish(self, channel: str, event: Event):
        """
        Publish event to channel.

        Args:
            channel: Channel name (e.g., "code.generated")
            event: Event to publish
        """
        await self.pub_client.publish(
            channel,
            event.json()
        )
        logger.info(
            "event.published",
            channel=channel,
            event_type=event.event_type,
            source=event.source_arm
        )

    async def subscribe(
        self,
        channel: str,
        handler: Callable[[Event], Awaitable[None]]
    ):
        """
        Subscribe to channel and process events.

        Args:
            channel: Channel to subscribe to
            handler: Async function to process events
        """
        pubsub = self.sub_client.pubsub()
        await pubsub.subscribe(channel)

        logger.info("event.subscribed", channel=channel)

        async for message in pubsub.listen():
            if message["type"] == "message":
                try:
                    event = Event(**json.loads(message["data"]))
                    logger.info(
                        "event.received",
                        channel=channel,
                        event_type=event.event_type
                    )
                    await handler(event)
                except Exception as e:
                    logger.error(
                        "event.handler.error",
                        channel=channel,
                        error=str(e)
                    )

    async def close(self):
        """Close connections."""
        if self.pub_client:
            await self.pub_client.close()
        if self.sub_client:
            await self.sub_client.close()


# Example: Coder Arm publishes events
class CoderArmWithEvents:
    """Coder Arm that publishes events."""

    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus

    async def generate_code(self, task: TaskContract) -> dict:
        """Generate code and publish event."""
        code = await self._generate(task.goal)

        result = {
            "task_id": task.task_id,
            "code": code,
            "language": "python"
        }

        # Publish event
        await self.event_bus.publish(
            channel="code.generated",
            event=Event(
                event_type="code.generated",
                source_arm="coder",
                timestamp=datetime.utcnow().isoformat(),
                data=result
            )
        )

        return result


# Example: Judge Arm subscribes to events
class JudgeArmWithEvents:
    """Judge Arm that reacts to code generation events."""

    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus

    async def start_listening(self):
        """Start listening for code generation events."""
        await self.event_bus.subscribe(
            channel="code.generated",
            handler=self.handle_code_generated
        )

    async def handle_code_generated(self, event: Event):
        """
        React to code generation event.

        Automatically validates newly generated code.
        """
        logger.info(
            "judge.event.received",
            task_id=event.data.get("task_id")
        )

        # Validate code
        code = event.data.get("code")
        is_valid = await self._validate(code)

        # Publish validation result
        await self.event_bus.publish(
            channel="validation.complete",
            event=Event(
                event_type="validation.complete",
                source_arm="judge",
                timestamp=datetime.utcnow().isoformat(),
                data={
                    "task_id": event.data.get("task_id"),
                    "is_valid": is_valid,
                    "original_event": event.data
                }
            )
        )


# Usage
async def run_event_driven_system():
    """Run event-driven arm system."""
    event_bus = EventBus(redis_url="redis://localhost:6379")
    await event_bus.connect()

    # Start Judge Arm listening
    judge = JudgeArmWithEvents(event_bus)
    asyncio.create_task(judge.start_listening())

    # Coder Arm generates code (triggers event)
    coder = CoderArmWithEvents(event_bus)
    await coder.generate_code(
        TaskContract(
            task_id="task-123",
            goal="Write a function to sort a list"
        )
    )

    # Event flows automatically:
    # Coder --[code.generated]--> Judge --[validation.complete]--> Orchestrator

Best Practices:

  • Use structured event schemas (Pydantic models)
  • Include timestamp and source in all events
  • Handle failures gracefully (dead letter queue)
  • Log all published and received events
  • Consider event ordering guarantees

Orchestrator Integration

Patterns for integrating with the central orchestrator.

Task Submission Pattern

Use Case: Submit tasks to orchestrator for processing.

Implementation:

# client/orchestrator_client.py
class OrchestratorClient:
    """Client for submitting tasks to orchestrator."""

    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient()

    async def submit_task(
        self,
        goal: str,
        constraints: List[str] = None,
        priority: str = "medium",
        budget: dict = None
    ) -> dict:
        """
        Submit task to orchestrator.

        Args:
            goal: Natural language task description
            constraints: Hard constraints
            priority: Task priority (low, medium, high, critical)
            budget: Resource limits

        Returns:
            Task ID and estimated completion time
        """
        payload = {
            "goal": goal,
            "constraints": constraints or [],
            "priority": priority,
            "budget": budget or {
                "max_tokens": 4000,
                "max_time_seconds": 30
            },
            "acceptance_criteria": []
        }

        response = await self.client.post(
            f"{self.base_url}/api/v1/tasks",
            json=payload
        )
        response.raise_for_status()

        return response.json()

    async def get_task_status(self, task_id: str) -> dict:
        """Get task status and results."""
        response = await self.client.get(
            f"{self.base_url}/api/v1/tasks/{task_id}"
        )
        response.raise_for_status()
        return response.json()

    async def wait_for_completion(
        self,
        task_id: str,
        timeout: int = 300,
        poll_interval: float = 2.0
    ) -> dict:
        """
        Wait for task to complete.

        Args:
            task_id: Task ID to wait for
            timeout: Maximum wait time in seconds
            poll_interval: Time between status checks

        Returns:
            Final task result
        """
        start_time = time.time()

        while True:
            if time.time() - start_time > timeout:
                raise TimeoutError(f"Task {task_id} did not complete within {timeout}s")

            status = await self.get_task_status(task_id)

            if status["status"] in ["completed", "failed"]:
                return status

            await asyncio.sleep(poll_interval)


# Usage
async def main():
    client = OrchestratorClient(base_url="http://localhost:8001")

    # Submit task
    task = await client.submit_task(
        goal="Find and fix bugs in auth/login.py",
        constraints=["No database schema changes"],
        priority="high"
    )

    print(f"Task submitted: {task['task_id']}")

    # Wait for completion
    result = await client.wait_for_completion(task["task_id"])
    print(f"Task complete: {result['result']}")

Arm Registration Pattern

Use Case: Register new arms with orchestrator dynamically.

Implementation:

# arm/registration.py
from dataclasses import dataclass
from typing import List

@dataclass
class ArmCapability:
    """Capability definition for arm registration."""
    capability_name: str
    description: str
    input_schema: dict
    output_schema: dict
    cost_tier: int  # 1-5, higher = more expensive
    avg_latency_ms: int

class ArmRegistry:
    """Arm registry client for dynamic registration."""

    def __init__(self, registry_url: str):
        self.registry_url = registry_url

    async def register_arm(
        self,
        arm_id: str,
        arm_type: str,
        endpoint: str,
        capabilities: List[ArmCapability],
        health_check_endpoint: str = "/health"
    ):
        """
        Register arm with orchestrator.

        Args:
            arm_id: Unique arm identifier
            arm_type: Arm type (planner, coder, executor, etc.)
            endpoint: HTTP endpoint for task execution
            capabilities: List of arm capabilities
            health_check_endpoint: Health check endpoint
        """
        payload = {
            "arm_id": arm_id,
            "arm_type": arm_type,
            "endpoint": endpoint,
            "health_check_endpoint": health_check_endpoint,
            "capabilities": [
                {
                    "capability_name": cap.capability_name,
                    "description": cap.description,
                    "input_schema": cap.input_schema,
                    "output_schema": cap.output_schema,
                    "cost_tier": cap.cost_tier,
                    "avg_latency_ms": cap.avg_latency_ms
                }
                for cap in capabilities
            ],
            "metadata": {
                "version": "1.0.0",
                "registered_at": datetime.utcnow().isoformat()
            }
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.registry_url}/registry/arms",
                json=payload
            )
            response.raise_for_status()

        logger.info("arm.registered", arm_id=arm_id, arm_type=arm_type)


# Usage in arm startup
async def startup_arm():
    """Register arm on startup."""
    registry = ArmRegistry(registry_url="http://orchestrator:8000")

    await registry.register_arm(
        arm_id="coder-001",
        arm_type="coder",
        endpoint="http://coder-arm:8080/execute",
        capabilities=[
            ArmCapability(
                capability_name="code_generation",
                description="Generate code from natural language",
                input_schema={"goal": "string", "language": "string"},
                output_schema={"code": "string", "confidence": "float"},
                cost_tier=4,
                avg_latency_ms=5000
            ),
            ArmCapability(
                capability_name="code_refactoring",
                description="Refactor existing code",
                input_schema={"code": "string", "style": "string"},
                output_schema={"refactored_code": "string"},
                cost_tier=3,
                avg_latency_ms=3000
            )
        ]
    )

External API Integration

Patterns for integrating with external APIs (OpenAI, GitHub, etc.).

HTTP Client Pattern

Implementation:

# external/api_client.py
from tenacity import retry, stop_after_attempt, wait_exponential
import httpx

class ExternalAPIClient:
    """Base client for external API integration."""

    def __init__(
        self,
        base_url: str,
        api_key: str,
        timeout: int = 60,
        max_retries: int = 3
    ):
        self.base_url = base_url
        self.api_key = api_key
        self.client = httpx.AsyncClient(
            base_url=base_url,
            timeout=httpx.Timeout(timeout),
            headers={"Authorization": f"Bearer {api_key}"}
        )
        self.max_retries = max_retries

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    async def request(
        self,
        method: str,
        endpoint: str,
        **kwargs
    ) -> dict:
        """
        Make HTTP request with automatic retries.

        Args:
            method: HTTP method (GET, POST, etc.)
            endpoint: API endpoint
            **kwargs: Additional request parameters

        Returns:
            Parsed JSON response
        """
        logger.info(
            "external_api.request",
            method=method,
            endpoint=endpoint
        )

        response = await self.client.request(
            method=method,
            url=endpoint,
            **kwargs
        )

        response.raise_for_status()

        logger.info(
            "external_api.success",
            method=method,
            endpoint=endpoint,
            status=response.status_code
        )

        return response.json()


# Example: OpenAI API Client
class OpenAIClient(ExternalAPIClient):
    """Client for OpenAI API."""

    def __init__(self, api_key: str):
        super().__init__(
            base_url="https://api.openai.com/v1",
            api_key=api_key
        )

    async def chat_completion(
        self,
        messages: List[dict],
        model: str = "gpt-4",
        temperature: float = 0.7
    ) -> dict:
        """Request chat completion."""
        return await self.request(
            method="POST",
            endpoint="/chat/completions",
            json={
                "model": model,
                "messages": messages,
                "temperature": temperature
            }
        )


# Example: GitHub API Client
class GitHubClient(ExternalAPIClient):
    """Client for GitHub API."""

    def __init__(self, token: str):
        super().__init__(
            base_url="https://api.github.com",
            api_key=token
        )
        self.client.headers["Accept"] = "application/vnd.github.v3+json"

    async def get_repository(self, owner: str, repo: str) -> dict:
        """Get repository information."""
        return await self.request(
            method="GET",
            endpoint=f"/repos/{owner}/{repo}"
        )

    async def list_issues(
        self,
        owner: str,
        repo: str,
        state: str = "open"
    ) -> List[dict]:
        """List repository issues."""
        return await self.request(
            method="GET",
            endpoint=f"/repos/{owner}/{repo}/issues",
            params={"state": state}
        )

Circuit Breaker Pattern

Use Case: Prevent cascading failures from external service outages.

Implementation:

# resilience/circuit_breaker.py
from enum import Enum
from datetime import datetime, timedelta
import structlog

logger = structlog.get_logger()

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Blocking requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    """
    Circuit breaker for external service calls.

    Prevents cascading failures by stopping requests to failing services.
    """

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func: Callable, *args, **kwargs):
        """
        Execute function with circuit breaker protection.

        Args:
            func: Async function to execute
            *args, **kwargs: Function arguments

        Returns:
            Function result

        Raises:
            CircuitBreakerOpenError: If circuit is open
        """
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                logger.info("circuit_breaker.half_open")
            else:
                logger.warning("circuit_breaker.open")
                raise CircuitBreakerOpenError("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except self.expected_exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset."""
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
        )

    def _on_success(self):
        """Handle successful call."""
        if self.state == CircuitState.HALF_OPEN:
            logger.info("circuit_breaker.closed")
            self.state = CircuitState.CLOSED

        self.failure_count = 0

    def _on_failure(self):
        """Handle failed call."""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        logger.warning(
            "circuit_breaker.failure",
            failure_count=self.failure_count,
            threshold=self.failure_threshold
        )

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error("circuit_breaker.open")


# Usage
async def call_external_api_with_circuit_breaker():
    """Example: Protect external API call."""
    circuit_breaker = CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=60,
        expected_exception=httpx.HTTPError
    )

    try:
        result = await circuit_breaker.call(
            external_api_call,
            param1="value1"
        )
        return result
    except CircuitBreakerOpenError:
        # Circuit is open, use fallback
        return fallback_response()

Database Integration

Patterns for working with PostgreSQL, Qdrant, and Redis.

PostgreSQL Knowledge Graph

Implementation (see earlier in document - Shared Memory Pattern section)

Transaction Patterns

Use Case: Atomic operations across multiple tables.

Implementation:

# database/transactions.py
async def atomic_knowledge_update(
    pool: asyncpg.Pool,
    entities: List[dict],
    relationships: List[dict]
):
    """
    Atomically update knowledge graph.

    All entities and relationships are inserted within a transaction.
    If any operation fails, all changes are rolled back.
    """
    async with pool.acquire() as conn:
        async with conn.transaction():
            # Insert entities
            entity_ids = []
            for entity in entities:
                entity_id = await conn.fetchval(
                    """
                    INSERT INTO entities (entity_type, name, properties)
                    VALUES ($1, $2, $3)
                    RETURNING id
                    """,
                    entity["type"],
                    entity["name"],
                    json.dumps(entity["properties"])
                )
                entity_ids.append(entity_id)

            # Insert relationships
            for rel in relationships:
                await conn.execute(
                    """
                    INSERT INTO relationships (from_entity_id, to_entity_id, relationship_type)
                    VALUES ($1, $2, $3)
                    """,
                    entity_ids[rel["from_index"]],
                    entity_ids[rel["to_index"]],
                    rel["type"]
                )

            logger.info(
                "knowledge_graph.updated",
                entities_count=len(entities),
                relationships_count=len(relationships)
            )

Message Queue Patterns

Async Task Processing

Use Case: Offload long-running tasks to background workers.

Architecture:

flowchart LR
    API[API Server] -->|Enqueue Task| REDIS[(Redis Queue)]
    REDIS -->|Dequeue| WORKER1[Worker 1]
    REDIS -->|Dequeue| WORKER2[Worker 2]
    REDIS -->|Dequeue| WORKER3[Worker 3]

    WORKER1 -->|Store Result| DB[(Database)]
    WORKER2 -->|Store Result| DB
    WORKER3 -->|Store Result| DB

Implementation:

# queue/task_queue.py
from rq import Queue
from redis import Redis
import structlog

logger = structlog.get_logger()

# Connect to Redis
redis_conn = Redis(host='localhost', port=6379, db=0)
task_queue = Queue('octollm_tasks', connection=redis_conn)

def enqueue_task(func: Callable, *args, **kwargs) -> str:
    """
    Enqueue task for background processing.

    Args:
        func: Function to execute
        *args, **kwargs: Function arguments

    Returns:
        Job ID
    """
    job = task_queue.enqueue(func, *args, **kwargs)
    logger.info("task.enqueued", job_id=job.id, func=func.__name__)
    return job.id

def get_task_result(job_id: str):
    """Get result of completed task."""
    from rq.job import Job
    job = Job.fetch(job_id, connection=redis_conn)

    if job.is_finished:
        return job.result
    elif job.is_failed:
        raise Exception(f"Task failed: {job.exc_info}")
    else:
        return None  # Still processing


# Example: Long-running code generation
def generate_code_background(goal: str, constraints: list) -> dict:
    """Background task for code generation."""
    # This runs in a separate worker process
    logger.info("background_task.start", goal=goal)

    # Expensive operation
    code = generate_code(goal, constraints)

    logger.info("background_task.complete")
    return {"code": code, "status": "complete"}


# Usage
async def handle_code_generation_request(request: dict):
    """API endpoint handler."""
    # Enqueue task (returns immediately)
    job_id = enqueue_task(
        generate_code_background,
        goal=request["goal"],
        constraints=request.get("constraints", [])
    )

    return {
        "job_id": job_id,
        "status": "queued",
        "message": "Code generation started"
    }

async def check_code_generation_status(job_id: str):
    """Check status of background task."""
    result = get_task_result(job_id)

    if result is None:
        return {"status": "processing"}
    else:
        return {"status": "complete", "result": result}

Priority Queue Pattern

Use Case: Process high-priority tasks first.

Implementation:

# queue/priority_queue.py
from rq import Queue

# Create priority queues
high_priority_queue = Queue('high', connection=redis_conn)
default_queue = Queue('default', connection=redis_conn)
low_priority_queue = Queue('low', connection=redis_conn)

def enqueue_with_priority(func: Callable, priority: str, *args, **kwargs):
    """Enqueue task with priority."""
    queue_map = {
        "high": high_priority_queue,
        "medium": default_queue,
        "low": low_priority_queue
    }

    queue = queue_map.get(priority, default_queue)
    job = queue.enqueue(func, *args, **kwargs)

    logger.info(
        "task.enqueued",
        job_id=job.id,
        priority=priority,
        func=func.__name__
    )

    return job.id


# Worker startup (prioritize high queue)
# $ rq worker high default low

Webhook Integration

Callback Registration

Use Case: Notify external systems when tasks complete.

Implementation:

# webhook/client.py
class WebhookClient:
    """Client for sending webhook notifications."""

    def __init__(self):
        self.client = httpx.AsyncClient(timeout=10)

    async def send_webhook(
        self,
        url: str,
        event_type: str,
        payload: dict,
        secret: Optional[str] = None
    ):
        """
        Send webhook notification.

        Args:
            url: Webhook URL
            event_type: Event type (e.g., "task.completed")
            payload: Event payload
            secret: Optional HMAC secret for signature
        """
        headers = {
            "Content-Type": "application/json",
            "X-Event-Type": event_type,
            "X-Timestamp": datetime.utcnow().isoformat()
        }

        # Add HMAC signature if secret provided
        if secret:
            signature = self._compute_signature(payload, secret)
            headers["X-Signature"] = signature

        try:
            response = await self.client.post(
                url,
                json=payload,
                headers=headers
            )
            response.raise_for_status()

            logger.info(
                "webhook.sent",
                url=url,
                event_type=event_type,
                status=response.status_code
            )

        except httpx.HTTPError as e:
            logger.error(
                "webhook.failed",
                url=url,
                error=str(e)
            )
            # Queue for retry
            await self._queue_retry(url, event_type, payload, secret)

    def _compute_signature(self, payload: dict, secret: str) -> str:
        """Compute HMAC signature for webhook."""
        import hmac
        import hashlib

        message = json.dumps(payload, sort_keys=True).encode()
        signature = hmac.new(
            secret.encode(),
            message,
            hashlib.sha256
        ).hexdigest()

        return f"sha256={signature}"

    async def _queue_retry(
        self,
        url: str,
        event_type: str,
        payload: dict,
        secret: Optional[str]
    ):
        """Queue webhook for retry."""
        # Store in Redis for background retry
        retry_data = {
            "url": url,
            "event_type": event_type,
            "payload": payload,
            "secret": secret,
            "retry_count": 0,
            "queued_at": datetime.utcnow().isoformat()
        }

        await redis_client.lpush(
            "webhook:retry_queue",
            json.dumps(retry_data)
        )


# Usage in orchestrator
async def notify_task_completion(task_id: str, result: dict):
    """Notify registered webhooks of task completion."""
    # Get registered webhooks for this task
    webhooks = await get_task_webhooks(task_id)

    webhook_client = WebhookClient()

    for webhook in webhooks:
        await webhook_client.send_webhook(
            url=webhook["url"],
            event_type="task.completed",
            payload={
                "task_id": task_id,
                "status": "completed",
                "result": result,
                "completed_at": datetime.utcnow().isoformat()
            },
            secret=webhook.get("secret")
        )

Batch Processing

Bulk Operation Pattern

Use Case: Process large datasets efficiently.

Implementation:

# batch/processor.py
from typing import List, Callable, TypeVar, Generic
import asyncio

T = TypeVar('T')
R = TypeVar('R')

class BatchProcessor(Generic[T, R]):
    """
    Process items in batches for efficiency.

    Useful for bulk database operations, API calls with rate limits, etc.
    """

    def __init__(
        self,
        batch_size: int = 100,
        max_concurrent: int = 5
    ):
        self.batch_size = batch_size
        self.max_concurrent = max_concurrent

    async def process_batches(
        self,
        items: List[T],
        processor: Callable[[List[T]], Awaitable[List[R]]]
    ) -> List[R]:
        """
        Process items in batches.

        Args:
            items: List of items to process
            processor: Async function that processes a batch

        Returns:
            List of all results
        """
        logger.info(
            "batch.start",
            total_items=len(items),
            batch_size=self.batch_size
        )

        # Split into batches
        batches = [
            items[i:i + self.batch_size]
            for i in range(0, len(items), self.batch_size)
        ]

        logger.info("batch.created", batch_count=len(batches))

        # Process batches with concurrency limit
        semaphore = asyncio.Semaphore(self.max_concurrent)

        async def process_batch_with_semaphore(batch):
            async with semaphore:
                return await processor(batch)

        # Execute all batches
        results = await asyncio.gather(*[
            process_batch_with_semaphore(batch)
            for batch in batches
        ])

        # Flatten results
        flattened = [item for batch_result in results for item in batch_result]

        logger.info("batch.complete", results_count=len(flattened))

        return flattened


# Example: Bulk embedding generation
async def generate_embeddings_batch(texts: List[str]) -> List[List[float]]:
    """Generate embeddings for a batch of texts."""
    # Call OpenAI API with batch
    response = await openai_client.create_embeddings(
        input=texts,
        model="text-embedding-ada-002"
    )
    return [item.embedding for item in response.data]


# Usage
async def embed_large_dataset(texts: List[str]):
    """Embed 10,000 texts efficiently."""
    processor = BatchProcessor(batch_size=100, max_concurrent=5)

    embeddings = await processor.process_batches(
        items=texts,
        processor=generate_embeddings_batch
    )

    # Store in vector database
    await store_embeddings(embeddings)

Real-Time Streaming

WebSocket Pattern

Use Case: Real-time bidirectional communication.

Implementation:

# streaming/websocket.py
from fastapi import WebSocket, WebSocketDisconnect
import structlog

logger = structlog.get_logger()

class ConnectionManager:
    """Manage WebSocket connections."""

    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, client_id: str, websocket: WebSocket):
        """Accept new WebSocket connection."""
        await websocket.accept()
        self.active_connections[client_id] = websocket
        logger.info("websocket.connected", client_id=client_id)

    def disconnect(self, client_id: str):
        """Remove disconnected client."""
        if client_id in self.active_connections:
            del self.active_connections[client_id]
            logger.info("websocket.disconnected", client_id=client_id)

    async def send_message(self, client_id: str, message: dict):
        """Send message to specific client."""
        if client_id in self.active_connections:
            websocket = self.active_connections[client_id]
            await websocket.send_json(message)

    async def broadcast(self, message: dict):
        """Broadcast message to all connected clients."""
        for client_id, websocket in self.active_connections.items():
            try:
                await websocket.send_json(message)
            except Exception as e:
                logger.error(
                    "websocket.broadcast.error",
                    client_id=client_id,
                    error=str(e)
                )


# FastAPI WebSocket endpoint
from fastapi import FastAPI

app = FastAPI()
manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    """WebSocket endpoint for real-time updates."""
    await manager.connect(client_id, websocket)

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()

            logger.info(
                "websocket.message.received",
                client_id=client_id,
                message_type=data.get("type")
            )

            # Handle message
            if data["type"] == "subscribe":
                # Subscribe to task updates
                task_id = data["task_id"]
                await subscribe_to_task_updates(client_id, task_id)

            elif data["type"] == "ping":
                # Respond with pong
                await manager.send_message(client_id, {"type": "pong"})

    except WebSocketDisconnect:
        manager.disconnect(client_id)


# Send updates to subscribed clients
async def notify_task_progress(task_id: str, progress: dict):
    """Send task progress update via WebSocket."""
    # Get subscribed clients
    subscribers = await get_task_subscribers(task_id)

    message = {
        "type": "task.progress",
        "task_id": task_id,
        "progress": progress,
        "timestamp": datetime.utcnow().isoformat()
    }

    for client_id in subscribers:
        await manager.send_message(client_id, message)

Server-Sent Events (SSE)

Use Case: One-way streaming from server to client.

Implementation:

# streaming/sse.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.get("/stream/tasks/{task_id}")
async def stream_task_updates(task_id: str):
    """Stream task updates using Server-Sent Events."""

    async def event_generator():
        """Generate SSE events."""
        while True:
            # Get current task status
            status = await get_task_status(task_id)

            # Format as SSE
            yield f"data: {json.dumps(status)}\n\n"

            # Stop if task complete
            if status["status"] in ["completed", "failed"]:
                break

            # Wait before next update
            await asyncio.sleep(1)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )


# Client-side usage (JavaScript)
"""
const eventSource = new EventSource('/stream/tasks/task-123');

eventSource.onmessage = (event) => {
    const status = JSON.parse(event.data);
    console.log('Task progress:', status.progress);

    if (status.status === 'completed') {
        eventSource.close();
    }
};
"""

Testing Integration

Mocking External Services

Implementation:

# tests/conftest.py
import pytest
from unittest.mock import AsyncMock, Mock
import httpx

@pytest.fixture
def mock_openai_client():
    """Mock OpenAI API client."""
    client = AsyncMock()
    client.chat_completion.return_value = {
        "choices": [{
            "message": {
                "content": "Mocked response"
            }
        }]
    }
    return client

@pytest.fixture
def mock_arm_client():
    """Mock arm client for testing."""
    client = AsyncMock()
    client.execute.return_value = {
        "result": "Mocked arm result",
        "confidence": 0.95
    }
    return client


# Test using mocks
@pytest.mark.asyncio
async def test_orchestrator_with_mocked_arms(mock_arm_client):
    """Test orchestrator using mocked arms."""
    orchestrator = Orchestrator(arm_registry={
        "coder": mock_arm_client
    })

    result = await orchestrator.execute_task(
        TaskContract(
            task_id="test-123",
            goal="Test goal"
        )
    )

    # Verify arm was called
    mock_arm_client.execute.assert_called_once()

    # Verify result
    assert result["status"] == "completed"

Contract Testing

Use Case: Verify API contracts between components.

Implementation:

# tests/contract_tests.py
import pytest
from pydantic import ValidationError

def test_task_contract_validation():
    """Test TaskContract schema validation."""

    # Valid contract
    valid_task = TaskContract(
        task_id="task-123e4567-e89b-12d3-a456-426614174000",
        goal="Write a function to sort a list",
        constraints=["No external libraries"],
        priority="medium"
    )
    assert valid_task.task_id.startswith("task-")

    # Invalid contract (missing required field)
    with pytest.raises(ValidationError):
        TaskContract(
            task_id="task-123",
            # Missing 'goal' field
            constraints=[]
        )

    # Invalid contract (wrong format)
    with pytest.raises(ValidationError):
        TaskContract(
            task_id="invalid-id-format",  # Should start with 'task-'
            goal="Test"
        )


def test_arm_response_contract():
    """Test arm response matches expected contract."""

    response = ArmResponse(
        result={"code": "print('hello')"},
        confidence=0.95,
        provenance=ProvenanceMetadata(
            arm_id="coder",
            timestamp=datetime.utcnow().isoformat(),
            action_type="code_generation",
            command_hash="abc123"
        )
    )

    assert 0.0 <= response.confidence <= 1.0
    assert response.provenance.arm_id == "coder"

Summary

This guide covered 10 major integration patterns for OctoLLM:

Pattern CategoryKey Takeaways
Arm-to-ArmUse direct HTTP for low latency, orchestrator-mediated for visibility, shared memory for async
OrchestratorSubmit tasks via REST API, register arms dynamically, use swarm for parallel execution
External APIUse circuit breakers, implement retries, respect rate limits
DatabasePostgreSQL for knowledge graph, Qdrant for vectors, Redis for cache
Message QueueUse priority queues, implement dead letter queues, track progress
WebhookSign payloads with HMAC, implement retry logic, validate endpoints
BatchProcess in chunks, limit concurrency, track progress
StreamingUse WebSocket for bidirectional, SSE for server-to-client, handle backpressure
TestingMock external services, test contracts, integration test patterns

Best Practices Summary

  1. Always use structured logging with context
  2. Implement retries with exponential backoff
  3. Use circuit breakers for external services
  4. Validate all inputs with Pydantic schemas
  5. Set appropriate timeouts (typically 30-60s)
  6. Include request IDs for tracing
  7. Handle errors gracefully with fallbacks
  8. Test integrations with mocks and contracts
  9. Monitor all integrations with metrics
  10. Document API contracts with OpenAPI

Next Steps


Document Maintainers: OctoLLM Core Team Last Updated: 2025-11-10 Next Review: 2025-12-10