Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Phase 2: Complete Implementation Guides Specifications

Generated: 2025-11-10 Status: PRODUCTION READY Coverage: All 7 Phase 2 implementation guides fully documented Total Time to Complete: 8-12 hours across all guides

This document consolidates all Phase 2 implementation guides for the OctoLLM project. Each guide provides step-by-step instructions, complete code examples, and practical workflows suitable for immediate development use.


Document Index

  1. Getting Started (15 min) - ✅ Complete
  2. Development Environment Setup (30-45 min) - ✅ Complete
  3. Creating Custom Arms (1-2 hours) - ✅ Complete
  4. Integration Patterns (Reference) - ✅ Complete
  5. Orchestrator Implementation (2-3 hours) - ✅ Complete
  6. Testing Guide (Reference) - ✅ Complete
  7. Debugging Guide (Reference) - ✅ Complete

1. Getting Started Guide

Time: 15 minutes Difficulty: Beginner Prerequisites: Docker, Docker Compose, terminal access

Overview

The quickest path from zero to a running OctoLLM system. Covers:

  • Repository setup
  • Environment configuration
  • Service startup with Docker Compose
  • First task submission
  • Result verification

Quick Start Workflow

# Step 1: Clone and enter repository (2 min)
git clone https://github.com/your-org/octollm.git
cd octollm

# Step 2: Configure environment (3 min)
cp .env.example .env
# Edit .env with your API keys
nano .env

# Step 3: Start all services (5 min)
docker-compose up -d

# Step 4: Verify services are healthy (1 min)
curl http://localhost:8000/health
curl http://localhost:8001/health  # Reflex Layer
curl http://localhost:8100/health  # Coder Arm

Essential Environment Variables

# .env file (minimal configuration)

# LLM API Keys (at least one required)
OPENAI_API_KEY=sk-your-openai-key-here
ANTHROPIC_API_KEY=sk-ant-your-key-here

# Database (defaults work for local dev)
POSTGRES_USER=octollm
POSTGRES_PASSWORD=dev-password-change-in-production
POSTGRES_DB=octollm

# Redis
REDIS_PASSWORD=dev-redis-password

# Qdrant (vector DB - leave empty for local)
QDRANT_API_KEY=

# System
LOG_LEVEL=INFO
ENVIRONMENT=development

Submit Your First Task

# Using curl
curl -X POST http://localhost:8000/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "goal": "Write a Python function to calculate fibonacci numbers",
    "constraints": ["Include docstring", "Add unit tests"],
    "priority": "medium"
  }'

# Response
{
  "task_id": "task-abc123",
  "status": "accepted",
  "estimated_duration_seconds": 45,
  "message": "Task submitted successfully"
}

Check Task Status

# Poll for results
curl http://localhost:8000/api/v1/tasks/task-abc123

# Response when complete
{
  "task_id": "task-abc123",
  "status": "completed",
  "result": {
    "code": "def fibonacci(n: int) -> int:\n    \"\"\"Calculate nth fibonacci number.\"\"\"\n    if n <= 1:\n        return n\n    return fibonacci(n-1) + fibonacci(n-2)",
    "tests": "def test_fibonacci():\n    assert fibonacci(0) == 0\n    assert fibonacci(5) == 5",
    "explanation": "Implemented recursive fibonacci with base cases..."
  },
  "duration_ms": 3421,
  "confidence": 0.92
}

Service Architecture (Running Locally)

graph TB
    USER[User] -->|HTTP| GATEWAY[Gateway :8000]
    GATEWAY -->|Filter| REFLEX[Reflex Layer :8001]
    REFLEX -->|Route| ORCH[Orchestrator :8002]

    ORCH -->|Delegate| CODER[Coder Arm :8100]
    ORCH -->|Delegate| PLANNER[Planner Arm :8101]
    ORCH -->|Delegate| JUDGE[Judge Arm :8102]

    ORCH -->|Store| POSTGRES[(PostgreSQL :5432)]
    ORCH -->|Cache| REDIS[(Redis :6379)]
    ORCH -->|Vector| QDRANT[(Qdrant :6333)]

Verify Installation

# Check all containers are running
docker-compose ps

# Expected output:
# NAME              STATUS    PORTS
# octollm-postgres  Up        0.0.0.0:5432->5432/tcp
# octollm-redis     Up        0.0.0.0:6379->6379/tcp
# octollm-qdrant    Up        0.0.0.0:6333->6333/tcp
# octollm-gateway   Up        0.0.0.0:8000->8000/tcp
# octollm-reflex    Up        0.0.0.0:8001->8001/tcp
# octollm-orch      Up        0.0.0.0:8002->8002/tcp
# octollm-coder     Up        0.0.0.0:8100->8100/tcp

# Check logs for any errors
docker-compose logs | grep ERROR
# Should return nothing if all healthy

Common Issues

Issue: Services fail to start

# Solution: Check port conflicts
sudo lsof -i :8000  # Check if port is in use
# Kill conflicting processes or change ports in docker-compose.yml

Issue: PostgreSQL fails to initialize

# Solution: Reset database volume
docker-compose down -v  # WARNING: Deletes all data
docker-compose up -d

Issue: API returns "No API key configured"

# Solution: Verify .env file
cat .env | grep API_KEY
# Restart services after fixing
docker-compose restart orchestrator coder-arm planner-arm

Next Steps

After completing this guide:

  1. ✅ Read Development Environment Setup to contribute code
  2. ✅ Review Integration Patterns to understand architecture
  3. ✅ Try Creating Custom Arms to extend functionality

2. Development Environment Setup

Time: 30-45 minutes Target Audience: Contributors to OctoLLM codebase Prerequisites: Command-line knowledge, Git basics

System Requirements

ResourceMinimumRecommended
CPU4 cores8+ cores
RAM8 GB16+ GB
Disk20 GB free50+ GB SSD
OSLinux, macOS 11+, Win 10+Linux/macOS

Technology Stack Overview

  • Python 3.11+: Orchestrator, most arms (Planner, Coder, Judge, etc.)
  • Rust: Reflex Layer, Executor Arm (performance-critical)
  • FastAPI: HTTP framework for all Python services
  • PostgreSQL 15+: Global knowledge graph
  • Redis 7+: L1 cache and pub/sub messaging
  • Qdrant 1.7+: Vector embeddings for semantic search
  • Docker: Local development and production deployment

Python Development Setup

1. Install Python 3.11+

Linux (Ubuntu/Debian):

sudo apt update
sudo apt install -y python3.11 python3.11-venv python3-pip

macOS:

# Via Homebrew
brew install python@3.11

# Verify
python3.11 --version

Windows (WSL2):

# Inside WSL2
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
sudo apt install -y python3.11 python3.11-venv

2. Install Poetry (Python Package Manager)

# Install Poetry
curl -sSL https://install.python-poetry.org | python3 -

# Add to PATH (add to ~/.bashrc or ~/.zshrc)
export PATH="$HOME/.local/bin:$PATH"

# Verify
poetry --version  # Should show 1.6+

3. Set Up Python Project

cd octollm/orchestrator

# Install dependencies
poetry install

# Activate virtual environment
poetry shell

# Verify installation
python --version  # Should show 3.11+
pip list | grep fastapi  # Should show fastapi and dependencies

4. Install Development Tools

# Code formatting and linting
poetry add --group dev black ruff mypy

# Testing
poetry add --group dev pytest pytest-asyncio pytest-cov httpx-mock

# Configure tools
cat > pyproject.toml <<EOF
[tool.black]
line-length = 100
target-version = ['py311']

[tool.ruff]
line-length = 100
select = ["E", "F", "W", "I", "N"]

[tool.mypy]
python_version = "3.11"
strict = true
ignore_missing_imports = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--cov=. --cov-report=html --cov-report=term"
EOF

Rust Development Setup (For Reflex Layer/Executor)

1. Install Rust

# Install rustup (Rust installer)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Follow prompts, then reload shell
source $HOME/.cargo/env

# Verify
rustc --version  # Should show 1.70+
cargo --version

2. Install Rust Tools

# Code formatter
rustup component add rustfmt

# Linter
rustup component add clippy

# Language server for IDE integration
rustup component add rust-analyzer

3. Build Rust Components

cd octollm/reflex-layer

# Build in debug mode
cargo build

# Run tests
cargo test

# Build optimized release
cargo build --release

# Run with cargo
cargo run

Database Setup

PostgreSQL

# Install PostgreSQL client tools
# Linux
sudo apt install -y postgresql-client

# macOS
brew install postgresql@15

# Connect to local Docker PostgreSQL
psql -h localhost -U octollm -d octollm
# Password: dev-password-change-in-production

# Verify schema
\dt
# Should show: entities, relationships, task_history, action_log

Redis

# Install Redis CLI
# Linux
sudo apt install -y redis-tools

# macOS
brew install redis

# Connect to local Redis
redis-cli -h localhost -a dev-redis-password

# Test connection
ping  # Should return PONG

# View keys
keys *

Qdrant

# Qdrant has HTTP API only, use curl
curl http://localhost:6333/collections

# Expected response:
{
  "result": {
    "collections": [
      {"name": "coder_memory"},
      {"name": "planner_memory"},
      {"name": "retriever_index"}
    ]
  }
}

IDE Configuration

Install Extensions:

code --install-extension ms-python.python
code --install-extension ms-python.vscode-pylance
code --install-extension charliermarsh.ruff
code --install-extension rust-lang.rust-analyzer
code --install-extension tamasfe.even-better-toml

Workspace Settings (.vscode/settings.json):

{
  "python.defaultInterpreterPath": "${workspaceFolder}/orchestrator/.venv/bin/python",
  "python.linting.enabled": true,
  "python.linting.ruffEnabled": true,
  "python.formatting.provider": "black",
  "editor.formatOnSave": true,
  "editor.codeActionsOnSave": {
    "source.organizeImports": true
  },
  "[rust]": {
    "editor.defaultFormatter": "rust-lang.rust-analyzer",
    "editor.formatOnSave": true
  },
  "rust-analyzer.checkOnSave.command": "clippy"
}

Launch Configuration (.vscode/launch.json):

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Debug Orchestrator",
      "type": "python",
      "request": "launch",
      "module": "uvicorn",
      "args": [
        "orchestrator.main:app",
        "--reload",
        "--host", "0.0.0.0",
        "--port", "8002"
      ],
      "env": {
        "LOG_LEVEL": "DEBUG"
      },
      "justMyCode": false
    },
    {
      "name": "Debug Reflex Layer (Rust)",
      "type": "lldb",
      "request": "launch",
      "program": "${workspaceFolder}/reflex-layer/target/debug/reflex-layer",
      "args": [],
      "cwd": "${workspaceFolder}/reflex-layer"
    },
    {
      "name": "Run Tests (Python)",
      "type": "python",
      "request": "launch",
      "module": "pytest",
      "args": ["-v", "--cov=.", "tests/"],
      "console": "integratedTerminal"
    }
  ]
}

PyCharm

  1. Open Project: FileOpen → Select octollm directory
  2. Configure Interpreter:
    • SettingsProjectPython Interpreter
    • Add Poetry environment: ~/.cache/pypoetry/virtualenvs/octollm-*/bin/python
  3. Enable Tools:
    • SettingsToolsBlack → Enable on save
    • SettingsToolsRuff → Enable
  4. Run Configurations:
    • Add FastAPI configuration pointing to orchestrator/main.py:app

Git Workflow Setup

# Configure Git
git config --global user.name "Your Name"
git config --global user.email "your.email@example.com"

# Install pre-commit hooks
pip install pre-commit

# Set up hooks
cd octollm
pre-commit install

# Hooks will now run on every commit

Pre-commit Configuration (.pre-commit-config.yaml):

repos:
  - repo: https://github.com/psf/black
    rev: 23.11.0
    hooks:
      - id: black
        language_version: python3.11

  - repo: https://github.com/charliermarsh/ruff-pre-commit
    rev: v0.1.6
    hooks:
      - id: ruff
        args: [--fix, --exit-non-zero-on-fix]

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.7.1
    hooks:
      - id: mypy
        additional_dependencies: [pydantic, fastapi]

  - repo: local
    hooks:
      - id: rust-fmt
        name: Rust Format
        entry: cargo fmt
        language: system
        files: \.rs$
        pass_filenames: false

      - id: rust-clippy
        name: Rust Clippy
        entry: cargo clippy -- -D warnings
        language: system
        files: \.rs$
        pass_filenames: false

Verification Checklist

After setup, verify everything works:

# Python
cd orchestrator
poetry shell
python -c "import fastapi, pydantic, structlog; print('Python OK')"
pytest tests/ -v  # Should pass all tests

# Rust
cd ../reflex-layer
cargo build
cargo test  # Should pass all tests
cargo clippy -- -D warnings  # Should have no warnings

# Database connections
psql -h localhost -U octollm -d octollm -c "SELECT 1;"  # Should return 1
redis-cli -h localhost -a dev-redis-password ping  # Should return PONG
curl http://localhost:6333/collections  # Should return collections

# Services
docker-compose ps  # All should be "Up"
curl http://localhost:8000/health  # Should return {"status": "healthy"}

# Git
pre-commit run --all-files  # Should pass all hooks

Common Development Commands

# Run orchestrator locally (outside Docker)
cd orchestrator
poetry shell
uvicorn main:app --reload --host 0.0.0.0 --port 8002

# Run tests with coverage
pytest tests/ --cov=. --cov-report=html
# View coverage: open htmlcov/index.html

# Format all code
black .
cargo fmt

# Lint
ruff check . --fix
cargo clippy -- -D warnings

# Type check
mypy .

# Build production images
docker build -t octollm/orchestrator:latest -f orchestrator/Dockerfile .
docker build -t octollm/reflex-layer:latest -f reflex-layer/Dockerfile .

Troubleshooting

Issue: Poetry can't find Python 3.11

# Solution: Specify Python path explicitly
poetry env use /usr/bin/python3.11
poetry install

Issue: Rust build fails with linker errors

# Solution: Install build essentials
# Linux
sudo apt install -y build-essential pkg-config libssl-dev

# macOS
xcode-select --install

Issue: Database connection refused

# Solution: Ensure PostgreSQL container is running
docker-compose ps postgres
docker-compose logs postgres

# Restart if needed
docker-compose restart postgres

Issue: Pre-commit hooks fail

# Solution: Update hook versions
pre-commit autoupdate
pre-commit run --all-files

Next Steps

After environment setup:

  1. ✅ Try the Getting Started workflow if you haven't
  2. ✅ Read Creating Custom Arms to build your first component
  3. ✅ Review Testing Guide for testing best practices

3. Creating Custom Arms

Time: 1-2 hours Difficulty: Intermediate Prerequisites: Dev environment set up, Python or Rust knowledge

Arm Architecture Overview

Every arm follows these design principles:

  1. Single Responsibility: One domain of expertise
  2. Self-Contained: Minimal external dependencies
  3. Stateless: Use memory systems for state
  4. Observable: Comprehensive logging and metrics
  5. Resilient: Graceful error handling

Arm Lifecycle

stateDiagram-v2
    [*] --> Registration
    Registration --> Idle
    Idle --> Receiving: Task arrives
    Receiving --> Processing: Validate
    Processing --> Executing: Start work
    Executing --> Validating: Complete
    Validating --> Responding: Package
    Responding --> Idle: Send
    Idle --> [*]: Shutdown

    Processing --> Error: Invalid
    Executing --> Error: Failed
    Error --> Responding: Return error

Step 1: Design Your Arm

Choose a Domain:

  • Data processing (ETL, transformation)
  • External integrations (APIs, services)
  • Specialized computation (math, simulation)
  • Content creation (images, videos, documents)

Example: Weather Arm

  • Purpose: Fetch and analyze weather data
  • Inputs: Location, date range
  • Outputs: Weather forecast with analysis
  • Dependencies: OpenWeatherMap API
  • Cost Tier: 1 (low, fast API calls)

Step 2: Scaffold Project

# Create arm directory
cd octollm/arms
mkdir weather-arm
cd weather-arm

# Initialize Python project
poetry init --name weather-arm --python "^3.11"

# Add dependencies
poetry add fastapi uvicorn pydantic httpx structlog redis qdrant-client

# Add dev dependencies
poetry add --group dev pytest pytest-asyncio httpx-mock

# Create structure
mkdir -p src/weather_arm tests
touch src/weather_arm/__init__.py
touch src/weather_arm/main.py
touch src/weather_arm/models.py
touch src/weather_arm/service.py
touch tests/test_service.py

Step 3: Define Data Models

File: src/weather_arm/models.py

from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from enum import Enum

class WeatherCondition(str, Enum):
    CLEAR = "clear"
    CLOUDY = "cloudy"
    RAINY = "rainy"
    SNOWY = "snowy"
    STORMY = "stormy"

class WeatherRequest(BaseModel):
    """Input schema for weather queries."""
    location: str = Field(..., description="City name or coordinates")
    days: int = Field(5, ge=1, le=14, description="Forecast days")
    include_analysis: bool = Field(True, description="Include AI analysis")

class WeatherData(BaseModel):
    """Weather data point."""
    timestamp: datetime
    temperature_celsius: float
    condition: WeatherCondition
    humidity_percent: float
    wind_speed_kmh: float
    precipitation_mm: float

class WeatherResponse(BaseModel):
    """Output schema for weather results."""
    location: str
    forecast: List[WeatherData]
    analysis: Optional[str] = None
    confidence: float = Field(..., ge=0.0, le=1.0)
    data_source: str
    cached: bool = False

class HealthStatus(BaseModel):
    """Health check response."""
    status: str
    version: str
    dependencies: dict

Step 4: Implement Core Logic

File: src/weather_arm/service.py

import httpx
import structlog
from typing import Optional
from datetime import datetime, timedelta
from .models import WeatherRequest, WeatherResponse, WeatherData, WeatherCondition

logger = structlog.get_logger()

class WeatherService:
    """Core weather fetching and analysis service."""

    def __init__(self, api_key: str, cache_client=None):
        self.api_key = api_key
        self.base_url = "https://api.openweathermap.org/data/2.5"
        self.client = httpx.AsyncClient(timeout=10.0)
        self.cache = cache_client

    async def fetch_weather(self, request: WeatherRequest) -> WeatherResponse:
        """Fetch weather data for location."""

        # Check cache first
        cache_key = f"weather:{request.location}:{request.days}"
        if self.cache:
            cached = await self._get_cached(cache_key)
            if cached:
                logger.info("cache.hit", location=request.location)
                return WeatherResponse(**cached, cached=True)

        # Fetch from API
        logger.info("api.fetch", location=request.location, days=request.days)

        try:
            response = await self.client.get(
                f"{self.base_url}/forecast",
                params={
                    "q": request.location,
                    "appid": self.api_key,
                    "units": "metric",
                    "cnt": request.days * 8  # 3-hour intervals
                }
            )
            response.raise_for_status()
            data = response.json()

            # Parse response
            forecast = self._parse_forecast(data)

            # Generate analysis if requested
            analysis = None
            if request.include_analysis:
                analysis = await self._analyze_forecast(forecast)

            result = WeatherResponse(
                location=data["city"]["name"],
                forecast=forecast,
                analysis=analysis,
                confidence=0.95,
                data_source="OpenWeatherMap",
                cached=False
            )

            # Cache result
            if self.cache:
                await self._cache_result(cache_key, result, ttl=1800)  # 30 min

            return result

        except httpx.HTTPError as e:
            logger.error("api.error", error=str(e))
            raise

    def _parse_forecast(self, api_data: dict) -> List[WeatherData]:
        """Convert API data to internal format."""
        forecast = []

        for item in api_data["list"]:
            # Map weather condition
            condition_code = item["weather"][0]["main"].lower()
            condition = self._map_condition(condition_code)

            forecast.append(WeatherData(
                timestamp=datetime.fromtimestamp(item["dt"]),
                temperature_celsius=item["main"]["temp"],
                condition=condition,
                humidity_percent=item["main"]["humidity"],
                wind_speed_kmh=item["wind"]["speed"] * 3.6,  # m/s to km/h
                precipitation_mm=item.get("rain", {}).get("3h", 0.0)
            ))

        return forecast

    def _map_condition(self, api_condition: str) -> WeatherCondition:
        """Map API condition to enum."""
        mapping = {
            "clear": WeatherCondition.CLEAR,
            "clouds": WeatherCondition.CLOUDY,
            "rain": WeatherCondition.RAINY,
            "drizzle": WeatherCondition.RAINY,
            "snow": WeatherCondition.SNOWY,
            "thunderstorm": WeatherCondition.STORMY,
        }
        return mapping.get(api_condition, WeatherCondition.CLOUDY)

    async def _analyze_forecast(self, forecast: List[WeatherData]) -> str:
        """Generate natural language analysis of forecast."""

        # Calculate summary statistics
        avg_temp = sum(f.temperature_celsius for f in forecast) / len(forecast)
        max_temp = max(f.temperature_celsius for f in forecast)
        min_temp = min(f.temperature_celsius for f in forecast)
        rainy_days = len([f for f in forecast if f.condition == WeatherCondition.RAINY])

        # Generate analysis
        analysis = f"Forecast analysis for {len(forecast) // 8} days:\n"
        analysis += f"- Average temperature: {avg_temp:.1f}°C\n"
        analysis += f"- Temperature range: {min_temp:.1f}°C to {max_temp:.1f}°C\n"

        if rainy_days > 0:
            analysis += f"- Expect rain on {rainy_days} occasions\n"

        # Weather trend
        temps = [f.temperature_celsius for f in forecast]
        if temps[-1] > temps[0] + 3:
            analysis += "- Warming trend expected\n"
        elif temps[-1] < temps[0] - 3:
            analysis += "- Cooling trend expected\n"
        else:
            analysis += "- Stable temperatures expected\n"

        return analysis

    async def _get_cached(self, key: str) -> Optional[dict]:
        """Retrieve from cache."""
        if not self.cache:
            return None
        try:
            import json
            cached_json = await self.cache.get(key)
            return json.loads(cached_json) if cached_json else None
        except Exception as e:
            logger.warning("cache.get.error", error=str(e))
            return None

    async def _cache_result(self, key: str, result: WeatherResponse, ttl: int):
        """Store in cache."""
        if not self.cache:
            return
        try:
            import json
            await self.cache.setex(key, ttl, result.json())
        except Exception as e:
            logger.warning("cache.set.error", error=str(e))

Step 5: Create FastAPI Application

File: src/weather_arm/main.py

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import structlog
import redis.asyncio as redis
from contextlib import asynccontextmanager
import os

from .models import WeatherRequest, WeatherResponse, HealthStatus
from .service import WeatherService

# Configure logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# Shared state
weather_service: WeatherService = None
redis_client: redis.Redis = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage application lifecycle."""
    global weather_service, redis_client

    # Startup
    logger.info("startup.begin")

    # Connect to Redis cache
    redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    redis_client = await redis.from_url(redis_url)

    # Initialize service
    api_key = os.getenv("OPENWEATHER_API_KEY")
    if not api_key:
        raise ValueError("OPENWEATHER_API_KEY not set")

    weather_service = WeatherService(api_key=api_key, cache_client=redis_client)

    logger.info("startup.complete")

    yield

    # Shutdown
    logger.info("shutdown.begin")
    await redis_client.close()
    logger.info("shutdown.complete")

app = FastAPI(
    title="Weather Arm",
    version="1.0.0",
    description="Fetch and analyze weather forecasts",
    lifespan=lifespan
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"]
)

@app.get("/health", response_model=HealthStatus)
async def health_check():
    """Health check endpoint."""

    # Check Redis connection
    redis_status = "healthy"
    try:
        await redis_client.ping()
    except Exception:
        redis_status = "unhealthy"

    return HealthStatus(
        status="healthy" if redis_status == "healthy" else "degraded",
        version="1.0.0",
        dependencies={"redis": redis_status}
    )

@app.post("/execute", response_model=WeatherResponse)
async def execute(request: WeatherRequest):
    """Main execution endpoint called by orchestrator."""

    logger.info(
        "request.received",
        location=request.location,
        days=request.days
    )

    try:
        result = await weather_service.fetch_weather(request)

        logger.info(
            "request.completed",
            location=result.location,
            confidence=result.confidence,
            cached=result.cached
        )

        return result

    except Exception as e:
        logger.error("request.failed", error=str(e), location=request.location)
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/capabilities")
async def capabilities():
    """Describe arm capabilities for orchestrator registration."""
    return {
        "arm_id": "weather",
        "name": "Weather Arm",
        "version": "1.0.0",
        "capabilities": [
            "weather_forecast",
            "weather_analysis",
            "location_weather"
        ],
        "input_schema": WeatherRequest.schema(),
        "output_schema": WeatherResponse.schema(),
        "cost_tier": 1,
        "average_latency_ms": 300,
        "max_concurrent": 10
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8103)

Step 6: Write Tests

File: tests/test_service.py

import pytest
from httpx import AsyncClient, Response
from src.weather_arm.service import WeatherService
from src.weather_arm.models import WeatherRequest, WeatherCondition

@pytest.fixture
def mock_api_response():
    """Mock OpenWeatherMap API response."""
    return {
        "city": {"name": "London"},
        "list": [
            {
                "dt": 1699632000,
                "main": {"temp": 12.5, "humidity": 75},
                "weather": [{"main": "Rain"}],
                "wind": {"speed": 5.5},
                "rain": {"3h": 2.5}
            },
            {
                "dt": 1699642800,
                "main": {"temp": 11.0, "humidity": 80},
                "weather": [{"main": "Clouds"}],
                "wind": {"speed": 6.0},
            }
        ]
    }

@pytest.mark.asyncio
async def test_fetch_weather_success(httpx_mock, mock_api_response):
    """Test successful weather fetch."""

    # Mock API response
    httpx_mock.add_response(
        url="https://api.openweathermap.org/data/2.5/forecast",
        json=mock_api_response
    )

    # Create service
    service = WeatherService(api_key="test-key")

    # Execute
    request = WeatherRequest(location="London", days=1)
    result = await service.fetch_weather(request)

    # Verify
    assert result.location == "London"
    assert len(result.forecast) == 2
    assert result.forecast[0].temperature_celsius == 12.5
    assert result.forecast[0].condition == WeatherCondition.RAINY
    assert result.confidence > 0.9

@pytest.mark.asyncio
async def test_weather_caching(httpx_mock, mock_api_response):
    """Test that results are cached."""

    # Mock Redis
    from unittest.mock import AsyncMock
    mock_cache = AsyncMock()
    mock_cache.get.return_value = None  # Cache miss

    # Mock API
    httpx_mock.add_response(json=mock_api_response)

    # Create service with cache
    service = WeatherService(api_key="test-key", cache_client=mock_cache)

    # Execute
    request = WeatherRequest(location="London", days=1)
    result = await service.fetch_weather(request)

    # Verify cache was written
    mock_cache.setex.assert_called_once()
    assert not result.cached

@pytest.mark.asyncio
async def test_condition_mapping():
    """Test weather condition mapping."""
    service = WeatherService(api_key="test-key")

    assert service._map_condition("clear") == WeatherCondition.CLEAR
    assert service._map_condition("rain") == WeatherCondition.RAINY
    assert service._map_condition("snow") == WeatherCondition.SNOWY
    assert service._map_condition("thunderstorm") == WeatherCondition.STORMY

Step 7: Create Dockerfile

File: Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install Poetry
RUN pip install --no-cache-dir poetry==1.6.1

# Copy dependency files
COPY pyproject.toml poetry.lock ./

# Install dependencies
RUN poetry config virtualenvs.create false \
    && poetry install --no-dev --no-interaction --no-ansi

# Copy application code
COPY src/ ./src/

# Expose port
EXPOSE 8103

# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
  CMD python -c "import httpx; httpx.get('http://localhost:8103/health')"

# Run application
CMD ["uvicorn", "src.weather_arm.main:app", "--host", "0.0.0.0", "--port", "8103"]

Step 8: Add to Docker Compose

File: docker-compose.yml (add service)

services:
  # ... existing services ...

  weather-arm:
    build:
      context: ./arms/weather-arm
      dockerfile: Dockerfile
    ports:
      - "8103:8103"
    environment:
      - OPENWEATHER_API_KEY=${OPENWEATHER_API_KEY}
      - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379/0
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
    depends_on:
      - redis
    networks:
      - octollm-network
    restart: unless-stopped

Step 9: Register with Orchestrator

The orchestrator discovers arms via:

  1. Environment Variable (add to orchestrator service):
environment:
  - ARM_REGISTRY=http://weather-arm:8103,http://coder-arm:8100,http://planner-arm:8101
  1. Dynamic Discovery (orchestrator polls /capabilities):
# Orchestrator automatically calls:
# GET http://weather-arm:8103/capabilities
# Response used to populate arm registry

Step 10: Test Integration

# Build and start
docker-compose up -d weather-arm

# Check health
curl http://localhost:8103/health

# Test directly
curl -X POST http://localhost:8103/execute \
  -H "Content-Type: application/json" \
  -d '{
    "location": "London",
    "days": 3,
    "include_analysis": true
  }'

# Test via orchestrator
curl -X POST http://localhost:8000/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "goal": "Get weather forecast for Paris for next 5 days",
    "constraints": ["Include detailed analysis"]
  }'

Performance Optimization

Add Metrics:

from prometheus_client import Counter, Histogram, generate_latest

REQUEST_COUNT = Counter('weather_requests_total', 'Total requests')
REQUEST_DURATION = Histogram('weather_request_duration_seconds', 'Request duration')

@app.post("/execute")
@REQUEST_DURATION.time()
async def execute(request: WeatherRequest):
    REQUEST_COUNT.inc()
    # ... existing code ...

@app.get("/metrics")
async def metrics():
    return Response(content=generate_latest(), media_type="text/plain")

Add Connection Pooling:

# Reuse HTTP client
self.client = httpx.AsyncClient(
    timeout=10.0,
    limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)

Next Steps

Congratulations! You've built a complete custom arm. Next:

  1. ✅ Review Integration Patterns for arm-to-arm communication
  2. ✅ Read Testing Guide for comprehensive testing strategies
  3. ✅ Check Debugging Guide if you encounter issues

4. Integration Patterns

Purpose: Reference guide for all communication patterns in OctoLLM Estimated Reading Time: 30-45 minutes Use Case: Consult when implementing arm interactions or external integrations

Pattern Categories

This section provides complete code examples for:

  1. Arm-to-Arm Communication (4 patterns)
  2. Orchestrator Integration (3 patterns)
  3. External API Integration (3 patterns)
  4. Database Integration (4 patterns)
  5. Message Queue Patterns (2 patterns)
  6. Webhook Patterns (2 patterns)
  7. Batch Processing (2 patterns)
  8. Real-Time Streaming (2 patterns)
  9. Testing Integration (3 patterns)

Key Integration Patterns

1. Arm-to-Arm Direct Communication

When to use: One arm needs another arm's output synchronously

import httpx
from typing import Optional

class JudgeArmClient:
    """Client for direct communication with Judge Arm."""

    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=timeout)

    async def validate_code(self, code: str, language: str) -> dict:
        """Request code validation from Judge Arm."""

        response = await self.client.post(
            f"{self.base_url}/validate",
            json={
                "output": {"code": code},
                "validation_types": ["syntax", "quality"],
                "context": {"language": language}
            },
            headers={
                "X-Arm-ID": "coder",
                "X-Request-ID": str(uuid4())
            }
        )

        response.raise_for_status()
        return response.json()

# Usage in Coder Arm
async def generate_code(request):
    code = await llm_generate(request)

    # Validate with Judge Arm
    judge_client = JudgeArmClient("http://judge-arm:8102")
    validation = await judge_client.validate_code(code, "python")

    if not validation["valid"]:
        # Fix issues and retry
        code = await fix_code(code, validation["issues"])

    return code

2. Orchestrator-Mediated Workflow

When to use: Complex multi-step tasks requiring orchestration

class OrchestratorClient:
    """Client for submitting sub-tasks to orchestrator."""

    async def submit_subtask(
        self,
        goal: str,
        required_capabilities: List[str],
        parent_task_id: str
    ) -> str:
        """Submit sub-task to orchestrator for routing."""

        response = await self.client.post(
            f"{self.orchestrator_url}/api/v1/tasks",
            json={
                "goal": goal,
                "parent_task_id": parent_task_id,
                "required_capabilities": required_capabilities,
                "priority": "high"
            }
        )

        return response.json()["task_id"]

    async def wait_for_result(self, task_id: str, timeout: int = 60) -> dict:
        """Poll for task completion."""
        start = time.time()

        while time.time() - start < timeout:
            result = await self.client.get(f"{self.orchestrator_url}/api/v1/tasks/{task_id}")

            if result["status"] == "completed":
                return result["result"]
            elif result["status"] == "failed":
                raise Exception(result["error"])

            await asyncio.sleep(2)

        raise TimeoutError(f"Task {task_id} did not complete in {timeout}s")

# Usage in Planner Arm
async def execute_plan(plan):
    orchestrator = OrchestratorClient("http://orchestrator:8002")

    for step in plan.steps:
        # Submit step to orchestrator
        task_id = await orchestrator.submit_subtask(
            goal=step.action,
            required_capabilities=step.required_capabilities,
            parent_task_id=plan.id
        )

        # Wait for result
        result = await orchestrator.wait_for_result(task_id)

        # Store result for next step
        plan.store_result(step.id, result)

3. Shared Memory Pattern

When to use: Multiple arms need access to same data

class SharedMemoryClient:
    """Unified client for shared memory systems."""

    def __init__(self, redis_url: str, qdrant_url: str, postgres_url: str):
        self.redis = redis.from_url(redis_url)
        self.qdrant = QdrantClient(url=qdrant_url)
        self.postgres = await asyncpg.create_pool(postgres_url)

    # L1 Cache (Redis)
    async def cache_get(self, key: str) -> Optional[Any]:
        """Get from fast cache."""
        value = await self.redis.get(key)
        return json.loads(value) if value else None

    async def cache_set(self, key: str, value: Any, ttl: int = 300):
        """Set in fast cache with TTL."""
        await self.redis.setex(key, ttl, json.dumps(value))

    # L2 Vector Store (Qdrant)
    async def vector_search(
        self,
        collection: str,
        query: str,
        limit: int = 5
    ) -> List[dict]:
        """Semantic search in vector store."""
        query_vector = self.encoder.encode(query)

        results = self.qdrant.search(
            collection_name=collection,
            query_vector=query_vector,
            limit=limit
        )

        return [{"score": r.score, **r.payload} for r in results]

    # L3 Knowledge Graph (PostgreSQL)
    async def graph_query(self, entity_name: str) -> dict:
        """Query knowledge graph."""
        async with self.postgres.acquire() as conn:
            entity = await conn.fetchrow(
                "SELECT * FROM entities WHERE name = $1",
                entity_name
            )

            relationships = await conn.fetch(
                """SELECT r.relationship_type, e.name as target
                   FROM relationships r
                   JOIN entities e ON r.to_entity_id = e.id
                   WHERE r.from_entity_id = $1""",
                entity["id"]
            )

            return {
                "entity": dict(entity),
                "relationships": [dict(r) for r in relationships]
            }

# Usage across multiple arms
memory = SharedMemoryClient(redis_url, qdrant_url, postgres_url)

# Coder Arm stores solution
await memory.cache_set(f"code:{task_id}", generated_code, ttl=600)

# Judge Arm retrieves and validates
code = await memory.cache_get(f"code:{task_id}")
validation = validate(code)

# Orchestrator records in knowledge graph
await memory.graph_query("Python sorting algorithms")

4. Circuit Breaker Pattern (External APIs)

When to use: Calling unreliable external services

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Blocking calls
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    """Circuit breaker for external API calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout_seconds: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timedelta(seconds=timeout_seconds)
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func: Callable, *args, **kwargs):
        """Execute function with circuit breaker protection."""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError(
                    f"Circuit breaker is OPEN. Try again after "
                    f"{self.timeout.total_seconds()}s"
                )

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except self.expected_exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """Reset on successful call."""
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED

    def _on_failure(self):
        """Record failure and open circuit if threshold reached."""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to retry."""
        return (
            self.last_failure_time
            and datetime.now() - self.last_failure_time >= self.timeout
        )

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=30)

async def call_external_api(data):
    async with httpx.AsyncClient() as client:
        response = await client.post("https://api.example.com/endpoint", json=data)
        response.raise_for_status()
        return response.json()

# Protected call
try:
    result = await circuit_breaker.call(call_external_api, {"key": "value"})
except CircuitBreakerOpenError:
    # Circuit is open, use fallback
    result = get_cached_result()

5. Batch Processing Pattern

When to use: Processing large datasets efficiently

from typing import TypeVar, Generic, List, Callable, Awaitable

T = TypeVar('T')
R = TypeVar('R')

class BatchProcessor(Generic[T, R]):
    """Process items in batches with concurrency control."""

    def __init__(
        self,
        batch_size: int = 100,
        max_concurrent: int = 5
    ):
        self.batch_size = batch_size
        self.max_concurrent = max_concurrent

    async def process_batches(
        self,
        items: List[T],
        processor: Callable[[List[T]], Awaitable[List[R]]]
    ) -> List[R]:
        """Process items in batches with concurrency limit."""

        # Split into batches
        batches = [
            items[i:i + self.batch_size]
            for i in range(0, len(items), self.batch_size)
        ]

        # Process with concurrency limit
        semaphore = asyncio.Semaphore(self.max_concurrent)

        async def process_batch_with_semaphore(batch):
            async with semaphore:
                return await processor(batch)

        # Execute all batches
        results = await asyncio.gather(*[
            process_batch_with_semaphore(batch)
            for batch in batches
        ])

        # Flatten results
        return [item for batch_result in results for item in batch_result]

# Usage: Process 1000 documents
async def process_document_batch(docs: List[str]) -> List[dict]:
    """Process batch of documents."""
    # Use LLM to analyze documents
    return [analyze_document(doc) for doc in docs]

processor = BatchProcessor(batch_size=50, max_concurrent=3)
documents = load_documents()  # 1000 documents

results = await processor.process_batches(documents, process_document_batch)
# Processes in 20 batches of 50, with max 3 concurrent batches

6. WebSocket Streaming Pattern

When to use: Real-time updates to client

from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set

class ConnectionManager:
    """Manage WebSocket connections for streaming updates."""

    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, client_id: str, websocket: WebSocket):
        """Accept new WebSocket connection."""
        await websocket.accept()
        self.active_connections[client_id] = websocket

    def disconnect(self, client_id: str):
        """Remove connection."""
        self.active_connections.pop(client_id, None)

    async def send_message(self, client_id: str, message: dict):
        """Send message to specific client."""
        if client_id in self.active_connections:
            websocket = self.active_connections[client_id]
            await websocket.send_json(message)

    async def broadcast(self, message: dict):
        """Broadcast message to all connected clients."""
        for websocket in self.active_connections.values():
            await websocket.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    """WebSocket endpoint for streaming task updates."""
    await manager.connect(client_id, websocket)

    try:
        while True:
            # Receive messages from client
            data = await websocket.receive_json()

            # Process request
            task_id = data.get("task_id")
            if task_id:
                # Stream task progress updates
                async for update in stream_task_progress(task_id):
                    await manager.send_message(client_id, update)

    except WebSocketDisconnect:
        manager.disconnect(client_id)

async def stream_task_progress(task_id: str):
    """Stream task progress updates."""
    while True:
        status = await get_task_status(task_id)

        yield {
            "task_id": task_id,
            "status": status["status"],
            "progress": status.get("progress", 0),
            "message": status.get("message", "")
        }

        if status["status"] in ["completed", "failed"]:
            break

        await asyncio.sleep(1)

Complete Integration Examples

Multi-Arm Workflow: Coder → Judge → Executor pipeline

async def code_validate_execute_workflow(task_request):
    """Complete workflow: generate code, validate, execute."""

    # Step 1: Generate code (Coder Arm)
    coder = ArmClient("http://coder-arm:8100")
    code_result = await coder.execute({
        "request_type": "generate",
        "instruction": task_request.goal,
        "language": "python"
    })

    # Step 2: Validate code (Judge Arm)
    judge = ArmClient("http://judge-arm:8102")
    validation = await judge.execute({
        "output": code_result,
        "validation_types": ["schema", "quality", "criteria"],
        "acceptance_criteria": task_request.acceptance_criteria
    })

    if not validation["valid"]:
        raise ValueError(f"Validation failed: {validation['issues']}")

    # Step 3: Execute code (Executor Arm)
    executor = ArmClient("http://executor-arm:8103")
    execution_result = await executor.execute({
        "action_type": "python",
        "code": code_result["code"],
        "timeout_seconds": 30
    })

    return {
        "code": code_result["code"],
        "validation": validation,
        "execution": execution_result
    }

Best Practices Summary

  1. Always use timeouts on all HTTP/API calls
  2. Implement retry logic with exponential backoff
  3. Cache aggressively to reduce latency and cost
  4. Log all integration points with structured logging
  5. Monitor failures with metrics and alerts
  6. Test integration paths with contract tests
  7. Document API contracts with OpenAPI/Swagger
  8. Version APIs to support backward compatibility
  9. Use circuit breakers for external dependencies
  10. Implement graceful degradation when services fail

Reference Architecture

graph TB
    CLIENT[Client] -->|HTTP| GATEWAY[API Gateway]
    GATEWAY -->|Filter| REFLEX[Reflex Layer]
    REFLEX -->|Route| ORCH[Orchestrator]

    ORCH -->|Direct HTTP| ARM1[Coder Arm]
    ORCH -->|Direct HTTP| ARM2[Judge Arm]
    ORCH -->|Direct HTTP| ARM3[Executor Arm]

    ARM1 -->|Validate| ARM2
    ARM2 -->|Execute| ARM3

    ORCH -->|Read/Write| POSTGRES[(PostgreSQL)]
    ORCH -->|Cache| REDIS[(Redis)]
    ORCH -->|Vector Search| QDRANT[(Qdrant)]

    ARM1 -->|Share Data| REDIS
    ARM2 -->|Share Data| REDIS
    ARM3 -->|Share Data| REDIS

    ORCH -->|Metrics| PROMETHEUS[Prometheus]
    PROMETHEUS -->|Visualize| GRAFANA[Grafana]

5. Orchestrator Implementation

Time: 2-3 hours Difficulty: Advanced Prerequisites: Python proficiency, async programming, OctoLLM architecture understanding

Overview

Build the orchestrator from scratch following these steps:

  1. Project setup and dependencies
  2. Configuration management
  3. Core components (Intent Parser, Task Planner, Arm Router)
  4. API implementation
  5. Testing
  6. Deployment

Project Structure

orchestrator/
├── pyproject.toml          # Poetry configuration
├── src/
│   └── orchestrator/
│       ├── __init__.py
│       ├── main.py         # FastAPI application
│       ├── config.py       # Configuration
│       ├── models.py       # Pydantic models
│       ├── intent_parser.py
│       ├── task_planner.py
│       ├── arm_router.py
│       ├── result_integrator.py
│       └── memory.py       # Memory client
├── tests/
│   ├── test_intent_parser.py
│   ├── test_task_planner.py
│   ├── test_arm_router.py
│   └── test_api.py
└── Dockerfile

Step 1: Dependencies

File: pyproject.toml

[tool.poetry]
name = "orchestrator"
version = "1.0.0"
description = "OctoLLM Orchestrator Service"
authors = ["Your Team"]
python = "^3.11"

[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.104.1"
uvicorn = {extras = ["standard"], version = "^0.24.0"}
pydantic = "^2.5.0"
pydantic-settings = "^2.1.0"
httpx = "^0.25.2"
asyncpg = "^0.29.0"
redis = {extras = ["hiredis"], version = "^5.0.1"}
qdrant-client = "^1.7.0"
structlog = "^23.2.0"
tenacity = "^8.2.3"
openai = "^1.3.7"
prometheus-client = "^0.19.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.21.1"
pytest-cov = "^4.1.0"
httpx-mock = "^0.11.0"
black = "^23.11.0"
ruff = "^0.1.6"
mypy = "^1.7.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Step 2: Configuration

File: src/orchestrator/config.py

from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field

class Settings(BaseSettings):
    """Orchestrator configuration from environment variables."""

    model_config = SettingsConfigDict(
        env_file=".env",
        case_sensitive=False
    )

    # API Configuration
    api_host: str = Field(default="0.0.0.0")
    api_port: int = Field(default=8002)

    # LLM Configuration
    openai_api_key: str = Field(...)
    llm_model_planning: str = Field(default="gpt-3.5-turbo")
    llm_model_intent: str = Field(default="gpt-3.5-turbo")

    # Database URLs
    postgres_url: str = Field(default="postgresql://octollm:password@localhost:5432/octollm")
    redis_url: str = Field(default="redis://localhost:6379/0")
    qdrant_url: str = Field(default="http://localhost:6333")

    # System Configuration
    max_concurrent_tasks: int = Field(default=10, ge=1, le=100)
    task_timeout_seconds: int = Field(default=300, ge=10, le=3600)
    log_level: str = Field(default="INFO")
    environment: str = Field(default="development")

    # Arm Discovery
    arm_registry_url: Optional[str] = Field(default=None)
    arm_discovery_interval_seconds: int = Field(default=60)

settings = Settings()

Step 3: Data Models

File: src/orchestrator/models.py

from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
import uuid

class Priority(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class TaskStatus(str, Enum):
    PENDING = "pending"
    ACCEPTED = "accepted"
    PLANNING = "planning"
    EXECUTING = "executing"
    COMPLETED = "completed"
    FAILED = "failed"

class TaskRequest(BaseModel):
    """Incoming task request from client."""
    goal: str = Field(..., min_length=10, max_length=2000)
    constraints: List[str] = Field(default_factory=list)
    context: Dict[str, Any] = Field(default_factory=dict)
    priority: Priority = Field(default=Priority.MEDIUM)
    deadline_seconds: Optional[int] = Field(None, ge=10, le=3600)

class SubTask(BaseModel):
    """Single step in execution plan."""
    step: int = Field(..., ge=1)
    action: str
    required_arm: str
    acceptance_criteria: List[str]
    depends_on: List[int] = Field(default_factory=list)
    estimated_duration_seconds: int = Field(..., ge=1)

class ExecutionPlan(BaseModel):
    """Complete task execution plan."""
    plan_id: str = Field(default_factory=lambda: f"plan-{uuid.uuid4()}")
    subtasks: List[SubTask]
    estimated_duration_seconds: int
    confidence: float = Field(..., ge=0.0, le=1.0)

class TaskResponse(BaseModel):
    """Response to task submission."""
    task_id: str
    status: TaskStatus
    estimated_duration_seconds: Optional[int] = None
    message: str

class TaskResult(BaseModel):
    """Complete task result."""
    task_id: str
    status: TaskStatus
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    duration_ms: Optional[int] = None
    confidence: Optional[float] = None
    plan: Optional[ExecutionPlan] = None
    created_at: datetime
    completed_at: Optional[datetime] = None

Step 4: Intent Parser

File: src/orchestrator/intent_parser.py

import openai
import json
import structlog
from typing import Dict, Any

logger = structlog.get_logger()

class ParsedIntent(BaseModel):
    """Structured intent from natural language."""
    goal: str
    required_capabilities: List[str]
    constraints: List[str]
    context: Dict[str, Any]
    complexity: str  # "simple", "medium", "complex"
    confidence: float

class IntentParser:
    """Parse natural language requests into structured intents."""

    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        self.client = openai.AsyncOpenAI(api_key=api_key)
        self.model = model

    async def parse(self, user_request: str) -> ParsedIntent:
        """Parse user request into structured intent."""

        logger.info("intent.parse.start", request_length=len(user_request))

        prompt = self._build_parsing_prompt(user_request)

        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": prompt["system"]},
                    {"role": "user", "content": prompt["user"]}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )

            parsed = json.loads(response.choices[0].message.content)
            intent = ParsedIntent(**parsed)

            logger.info(
                "intent.parse.success",
                capabilities=intent.required_capabilities,
                complexity=intent.complexity,
                confidence=intent.confidence
            )

            return intent

        except Exception as e:
            logger.error("intent.parse.failed", error=str(e))
            raise

    def _build_parsing_prompt(self, request: str) -> Dict[str, str]:
        """Build prompt for intent parsing."""

        system_prompt = """You are an intent parser for a distributed AI system.

Available capabilities:
- code_generation: Generate, debug, refactor code
- code_execution: Run scripts, shell commands
- web_search: Search internet, documentation
- data_analysis: Analyze datasets, statistics
- validation: Check outputs, fact-check
- planning: Break down complex tasks
- safety: Content filtering, PII detection

Your task: Parse requests into structured intents.

Output JSON format:
{
  "goal": "Clear, specific goal statement",
  "required_capabilities": ["capability1", "capability2"],
  "constraints": ["constraint1", "constraint2"],
  "context": {"key": "value"},
  "complexity": "simple|medium|complex",
  "confidence": 0.0-1.0
}"""

        user_prompt = f"Parse this request:\n\n{request}"

        return {"system": system_prompt, "user": user_prompt}

Step 5: Task Planner

File: src/orchestrator/task_planner.py

import openai
import json
import structlog
from typing import List, Dict, Any

logger = structlog.get_logger()

class TaskPlanner:
    """Decompose complex tasks into executable subtasks."""

    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        self.client = openai.AsyncOpenAI(api_key=api_key)
        self.model = model

    async def plan(
        self,
        goal: str,
        constraints: List[str],
        context: Dict[str, Any]
    ) -> ExecutionPlan:
        """Generate execution plan for goal."""

        logger.info("plan.generate.start", goal=goal[:50])

        prompt = self._build_planning_prompt(goal, constraints, context)

        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": prompt["system"]},
                    {"role": "user", "content": prompt["user"]}
                ],
                temperature=0.7,
                response_format={"type": "json_object"}
            )

            plan_data = json.loads(response.choices[0].message.content)

            # Parse subtasks
            subtasks = [SubTask(**step) for step in plan_data["subtasks"]]

            # Calculate total duration
            total_duration = sum(s.estimated_duration_seconds for s in subtasks)

            plan = ExecutionPlan(
                subtasks=subtasks,
                estimated_duration_seconds=total_duration,
                confidence=plan_data.get("confidence", 0.8)
            )

            # Validate plan
            self._validate_plan(plan)

            logger.info(
                "plan.generate.success",
                steps=len(subtasks),
                duration=total_duration
            )

            return plan

        except Exception as e:
            logger.error("plan.generate.failed", error=str(e))
            raise

    def _validate_plan(self, plan: ExecutionPlan):
        """Validate plan structure and dependencies."""

        step_numbers = {s.step for s in plan.subtasks}

        for subtask in plan.subtasks:
            # Check dependencies exist
            for dep in subtask.depends_on:
                if dep not in step_numbers:
                    raise ValueError(
                        f"Step {subtask.step} depends on non-existent step {dep}"
                    )

                # Check no forward dependencies
                if dep >= subtask.step:
                    raise ValueError(
                        f"Step {subtask.step} cannot depend on later step {dep}"
                    )

    def _build_planning_prompt(
        self,
        goal: str,
        constraints: List[str],
        context: Dict[str, Any]
    ) -> Dict[str, str]:
        """Build prompt for task planning."""

        system_prompt = """You are a task planner for a distributed AI system.

Available arms:
- coder: Code generation, debugging, refactoring
- executor: Run commands, scripts, API calls
- planner: Task decomposition, dependency resolution
- judge: Validate outputs, fact-check
- retriever: Search knowledge bases, web
- guardian: Safety checks, PII detection

Generate 3-7 clear steps. For each step:
- action: What to do (imperative)
- required_arm: Which arm executes
- acceptance_criteria: 2-3 success conditions
- depends_on: Prerequisite step numbers
- estimated_duration_seconds: Realistic estimate

Output JSON format:
{
  "subtasks": [
    {
      "step": 1,
      "action": "Search for...",
      "required_arm": "retriever",
      "acceptance_criteria": ["Found X", "Contains Y"],
      "depends_on": [],
      "estimated_duration_seconds": 20
    }
  ],
  "confidence": 0.85
}"""

        user_prompt = f"""Goal: {goal}

Constraints:
{chr(10).join(f"- {c}" for c in constraints) if constraints else "None"}

Context:
{json.dumps(context, indent=2) if context else "None"}

Generate execution plan:"""

        return {"system": system_prompt, "user": user_prompt}

Step 6: Arm Router

File: src/orchestrator/arm_router.py

import structlog
from typing import Dict, List, Optional
from dataclasses import dataclass

logger = structlog.get_logger()

@dataclass
class ArmScore:
    """Scoring for arm selection."""
    arm_id: str
    capability_match: float
    availability: float
    historical_success: float
    cost_efficiency: float
    total_score: float

class ArmRouter:
    """Route tasks to appropriate arms based on capabilities."""

    def __init__(self):
        self.arm_registry: Dict[str, Dict] = {}
        self.historical_stats: Dict[str, Dict] = {}

    def register_arm(self, arm_id: str, capabilities: Dict):
        """Register arm with capabilities."""
        self.arm_registry[arm_id] = capabilities

        if arm_id not in self.historical_stats:
            self.historical_stats[arm_id] = {
                "total": 0,
                "success": 0,
                "avg_duration_ms": 0
            }

        logger.info("arm.registered", arm_id=arm_id, capabilities=capabilities.get("capabilities"))

    async def route(
        self,
        required_capabilities: List[str],
        priority: str = "medium"
    ) -> str:
        """Select best arm for task."""

        logger.info(
            "routing.start",
            required_capabilities=required_capabilities,
            available_arms=list(self.arm_registry.keys())
        )

        # Score all arms
        scores = []
        for arm_id in self.arm_registry:
            score = self._score_arm(arm_id, required_capabilities, priority)
            if score.capability_match > 0:  # Must have at least one capability
                scores.append(score)

        if not scores:
            raise ValueError(
                f"No arm found with capabilities: {required_capabilities}"
            )

        # Select best
        best = max(scores, key=lambda s: s.total_score)

        logger.info(
            "routing.selected",
            arm_id=best.arm_id,
            score=best.total_score,
            capability_match=best.capability_match
        )

        return best.arm_id

    def _score_arm(
        self,
        arm_id: str,
        required_capabilities: List[str],
        priority: str
    ) -> ArmScore:
        """Calculate composite score for arm.

        Scoring weights:
        - Capability match: 40%
        - Availability: 20%
        - Historical success: 30%
        - Cost efficiency: 10%
        """

        arm_info = self.arm_registry[arm_id]
        arm_capabilities = set(arm_info.get("capabilities", []))
        required_set = set(required_capabilities)

        # Capability match (40%)
        matching = arm_capabilities & required_set
        capability_match = len(matching) / len(required_set) if required_set else 0

        # Availability (20%)
        status = arm_info.get("status", "healthy")
        availability = 1.0 if status == "healthy" else 0.0

        # Historical success rate (30%)
        stats = self.historical_stats.get(arm_id, {"success": 10, "total": 10})
        historical_success = stats["success"] / stats["total"] if stats["total"] > 0 else 0.5

        # Cost efficiency (10%)
        cost_tier = arm_info.get("cost_tier", 3)
        cost_efficiency = 1.0 - (cost_tier / 5.0)

        # Composite score
        total_score = (
            capability_match * 0.4 +
            availability * 0.2 +
            historical_success * 0.3 +
            cost_efficiency * 0.1
        )

        return ArmScore(
            arm_id=arm_id,
            capability_match=capability_match,
            availability=availability,
            historical_success=historical_success,
            cost_efficiency=cost_efficiency,
            total_score=total_score
        )

    def record_execution(self, arm_id: str, success: bool, duration_ms: int):
        """Record arm execution for historical stats."""

        if arm_id not in self.historical_stats:
            self.historical_stats[arm_id] = {"total": 0, "success": 0}

        stats = self.historical_stats[arm_id]
        stats["total"] += 1
        if success:
            stats["success"] += 1

        # Update rolling average duration
        current_avg = stats.get("avg_duration_ms", 0)
        stats["avg_duration_ms"] = (current_avg * 0.9) + (duration_ms * 0.1)

Step 7: FastAPI Application

File: src/orchestrator/main.py

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import structlog
import asyncpg
import redis.asyncio as redis
from contextlib import asynccontextmanager
import uuid
from datetime import datetime

from .config import settings
from .models import TaskRequest, TaskResponse, TaskResult, TaskStatus
from .intent_parser import IntentParser
from .task_planner import TaskPlanner
from .arm_router import ArmRouter

# Configure logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# Global state
db_pool: asyncpg.Pool = None
redis_client: redis.Redis = None
intent_parser: IntentParser = None
task_planner: TaskPlanner = None
arm_router: ArmRouter = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage application lifecycle."""
    global db_pool, redis_client, intent_parser, task_planner, arm_router

    logger.info("startup.begin")

    # Database
    db_pool = await asyncpg.create_pool(settings.postgres_url)

    # Redis
    redis_client = await redis.from_url(settings.redis_url)

    # Components
    intent_parser = IntentParser(settings.openai_api_key, settings.llm_model_intent)
    task_planner = TaskPlanner(settings.openai_api_key, settings.llm_model_planning)
    arm_router = ArmRouter()

    # Discover arms
    await discover_arms()

    logger.info("startup.complete")

    yield

    logger.info("shutdown.begin")
    await db_pool.close()
    await redis_client.close()
    logger.info("shutdown.complete")

app = FastAPI(
    title="OctoLLM Orchestrator",
    version="1.0.0",
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"]
)

@app.get("/health")
async def health_check():
    """Health check endpoint."""

    # Check database
    try:
        async with db_pool.acquire() as conn:
            await conn.fetchval("SELECT 1")
        db_status = "healthy"
    except Exception:
        db_status = "unhealthy"

    # Check Redis
    try:
        await redis_client.ping()
        redis_status = "healthy"
    except Exception:
        redis_status = "unhealthy"

    overall = "healthy" if db_status == "healthy" and redis_status == "healthy" else "degraded"

    return {
        "status": overall,
        "version": "1.0.0",
        "dependencies": {
            "postgres": db_status,
            "redis": redis_status
        }
    }

@app.post("/api/v1/tasks", response_model=TaskResponse)
async def submit_task(request: TaskRequest):
    """Submit new task for execution."""

    task_id = f"task-{uuid.uuid4()}"

    logger.info(
        "task.submitted",
        task_id=task_id,
        goal=request.goal[:50],
        priority=request.priority
    )

    try:
        # Parse intent
        intent = await intent_parser.parse(request.goal)

        # Generate plan
        plan = await task_planner.plan(
            goal=intent.goal,
            constraints=request.constraints,
            context=request.context
        )

        # Store task
        async with db_pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO task_history
                   (task_id, goal, plan, results, success, duration_ms, created_at)
                   VALUES ($1, $2, $3, $4, $5, $6, $7)""",
                task_id,
                request.goal,
                plan.json(),
                "{}",
                False,
                0,
                datetime.utcnow()
            )

        # Start execution in background
        # (In production, use task queue like Celery)

        return TaskResponse(
            task_id=task_id,
            status=TaskStatus.ACCEPTED,
            estimated_duration_seconds=plan.estimated_duration_seconds,
            message="Task accepted and queued for execution"
        )

    except Exception as e:
        logger.error("task.submit.failed", task_id=task_id, error=str(e))
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/v1/tasks/{task_id}", response_model=TaskResult)
async def get_task_status(task_id: str):
    """Get status and result of task."""

    async with db_pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM task_history WHERE task_id = $1",
            task_id
        )

    if not row:
        raise HTTPException(status_code=404, detail=f"Task {task_id} not found")

    import json

    return TaskResult(
        task_id=row["task_id"],
        status=TaskStatus.COMPLETED if row["success"] else TaskStatus.FAILED,
        result=json.loads(row["results"]) if row["results"] else None,
        duration_ms=row["duration_ms"],
        created_at=row["created_at"],
        completed_at=row.get("completed_at")
    )

async def discover_arms():
    """Discover and register available arms."""

    # In production, query service discovery or config
    # For demo, register static arms

    arm_router.register_arm("coder", {
        "capabilities": ["code_generation", "code_debug", "code_refactor"],
        "endpoint": "http://coder-arm:8100",
        "cost_tier": 4,
        "status": "healthy"
    })

    arm_router.register_arm("executor", {
        "capabilities": ["code_execution", "shell_command", "api_call"],
        "endpoint": "http://executor-arm:8103",
        "cost_tier": 3,
        "status": "healthy"
    })

    arm_router.register_arm("judge", {
        "capabilities": ["validation", "fact_check", "quality_check"],
        "endpoint": "http://judge-arm:8102",
        "cost_tier": 2,
        "status": "healthy"
    })

    logger.info("arms.discovered", count=len(arm_router.arm_registry))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host=settings.api_host, port=settings.api_port)

Step 8: Testing

File: tests/test_api.py

import pytest
from httpx import AsyncClient
from src.orchestrator.main import app

@pytest.mark.asyncio
async def test_submit_task():
    """Test task submission."""

    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.post(
            "/api/v1/tasks",
            json={
                "goal": "Write a Python function to reverse a string",
                "constraints": ["Include docstring"],
                "priority": "medium"
            }
        )

    assert response.status_code == 200
    data = response.json()
    assert "task_id" in data
    assert data["status"] == "accepted"

@pytest.mark.asyncio
async def test_health_check():
    """Test health endpoint."""

    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.get("/health")

    assert response.status_code == 200
    data = response.json()
    assert data["status"] in ["healthy", "degraded"]
    assert "dependencies" in data

Step 9: Deployment

File: Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install Poetry
RUN pip install --no-cache-dir poetry==1.6.1

# Copy dependencies
COPY pyproject.toml poetry.lock ./

# Install dependencies
RUN poetry config virtualenvs.create false \
    && poetry install --no-dev --no-interaction

# Copy application
COPY src/ ./src/

# Expose port
EXPOSE 8002

# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
  CMD python -c "import httpx; httpx.get('http://localhost:8002/health')"

# Run
CMD ["uvicorn", "src.orchestrator.main:app", "--host", "0.0.0.0", "--port", "8002"]

Run locally:

cd orchestrator
poetry install
poetry shell
uvicorn src.orchestrator.main:app --reload

Run with Docker:

docker build -t octollm/orchestrator:latest .
docker run -p 8002:8002 --env-file .env octollm/orchestrator:latest

Verification

# Health check
curl http://localhost:8002/health

# Submit task
curl -X POST http://localhost:8002/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "goal": "Write a function to calculate factorial",
    "constraints": ["Use recursion", "Add docstring"],
    "priority": "medium"
  }'

# Check status
curl http://localhost:8002/api/v1/tasks/task-abc123

6. Testing Guide

Purpose: Comprehensive testing strategy reference Target Audience: All developers Coverage Goals: 85-95% depending on component criticality

Test Pyramid

graph BT
    E2E[E2E Tests<br/>10%<br/>Slow, Full System]
    INTEGRATION[Integration Tests<br/>30%<br/>Component Boundaries]
    UNIT[Unit Tests<br/>60%<br/>Fast, Isolated]

    E2E --> INTEGRATION
    INTEGRATION --> UNIT

Testing Stack

[tool.poetry.group.test.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.21.1"
pytest-cov = "^4.1.0"
pytest-xdist = "^3.5.0"    # Parallel execution
httpx-mock = "^0.11.0"     # HTTP mocking
faker = "^20.1.0"          # Test data generation

Unit Test Example

import pytest
from src.orchestrator.models import TaskRequest, Priority

class TestTaskContract:
    """Test TaskRequest validation."""

    def test_valid_task_request(self):
        """Test valid task creation."""
        task = TaskRequest(
            goal="Write a function to sort a list",
            constraints=["Use Python 3.11+"],
            priority=Priority.MEDIUM
        )

        assert len(task.goal) >= 10
        assert task.priority == Priority.MEDIUM

    def test_goal_too_short(self):
        """Test goal minimum length validation."""
        with pytest.raises(ValidationError) as exc:
            TaskRequest(goal="Short", priority=Priority.LOW)

        assert "goal" in str(exc.value)

    @pytest.mark.parametrize("priority", [
        Priority.LOW, Priority.MEDIUM, Priority.HIGH, Priority.CRITICAL
    ])
    def test_all_priorities_valid(self, priority):
        """Test all priority levels accepted."""
        task = TaskRequest(
            goal="Test goal with sufficient length",
            priority=priority
        )
        assert task.priority == priority

Integration Test Example

@pytest.mark.integration
@pytest.mark.asyncio
async def test_task_submission_workflow(http_client, db_pool):
    """Test complete task submission flow."""

    # Submit task
    response = await http_client.post(
        "/api/v1/tasks",
        json={
            "goal": "Write a Python function to calculate fibonacci",
            "constraints": ["Include docstring", "Add tests"]
        }
    )

    assert response.status_code == 200
    task_id = response.json()["task_id"]

    # Verify stored in database
    async with db_pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM task_history WHERE task_id = $1",
            task_id
        )

    assert row is not None
    assert row["goal"] == "Write a Python function to calculate fibonacci"

E2E Test Example

@pytest.mark.e2e
@pytest.mark.slow
@pytest.mark.asyncio
async def test_complete_code_generation_workflow(http_client):
    """Test end-to-end code generation workflow."""

    # 1. Submit task
    submit_response = await http_client.post(
        "/api/v1/tasks",
        json={
            "goal": "Write a Python function to reverse a string",
            "constraints": ["Include docstring", "Add unit tests"]
        }
    )

    task_id = submit_response.json()["task_id"]

    # 2. Poll for completion (max 60s)
    max_wait = 60
    start = time.time()

    while time.time() - start < max_wait:
        status_response = await http_client.get(f"/api/v1/tasks/{task_id}")
        status = status_response.json()

        if status["status"] == "completed":
            # 3. Verify result structure
            assert "code" in status["result"]
            assert "tests" in status["result"]
            assert status["confidence"] > 0.7

            # 4. Verify code is valid Python
            code = status["result"]["code"]
            compile(code, "<string>", "exec")  # Should not raise

            return

        elif status["status"] == "failed":
            pytest.fail(f"Task failed: {status.get('error')}")

        await asyncio.sleep(2)

    pytest.fail("Task did not complete within timeout")

Mocking External Services

@pytest.fixture
def mock_openai_client(monkeypatch):
    """Mock OpenAI API calls."""

    async def mock_create(*args, **kwargs):
        return MockResponse(
            choices=[
                MockChoice(
                    message=MockMessage(
                        content='{"goal": "Test", "required_capabilities": ["code"]}'
                    )
                )
            ]
        )

    monkeypatch.setattr(
        "openai.AsyncOpenAI.chat.completions.create",
        mock_create
    )

@pytest.mark.asyncio
async def test_intent_parsing_with_mock(mock_openai_client):
    """Test intent parsing with mocked LLM."""

    parser = IntentParser(api_key="test-key")
    intent = await parser.parse("Write a Python function")

    assert intent.goal == "Test"
    assert "code" in intent.required_capabilities

Coverage Configuration

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--cov=src --cov-report=html --cov-report=term --cov-fail-under=85"
markers = [
    "unit: Unit tests (fast)",
    "integration: Integration tests (medium)",
    "e2e: End-to-end tests (slow)",
    "slow: Slow tests (>1s)"
]

Run Tests

# All tests
pytest

# Unit tests only (fast)
pytest -m unit

# With coverage
pytest --cov=src --cov-report=html

# Parallel execution
pytest -n auto

# Specific file
pytest tests/test_intent_parser.py -v

7. Debugging Guide

Purpose: Debugging tools, techniques, and common problem solutions Target Audience: All developers Coverage: Development and production debugging

Structured Logging

import structlog

# Configure logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()  # JSON for production
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Usage
logger.info(
    "task.started",
    task_id="task-123",
    user_id="user-456",
    goal="Write code"
)

logger.error(
    "task.failed",
    task_id="task-123",
    error=str(e),
    traceback=traceback.format_exc()
)

VS Code Debugger

Configuration (.vscode/launch.json):

{
  "configurations": [
    {
      "name": "Debug Orchestrator",
      "type": "python",
      "request": "launch",
      "module": "uvicorn",
      "args": [
        "src.orchestrator.main:app",
        "--reload",
        "--host", "0.0.0.0",
        "--port", "8002"
      ],
      "env": {
        "LOG_LEVEL": "DEBUG",
        "OPENAI_API_KEY": "${env:OPENAI_API_KEY}"
      },
      "justMyCode": false
    }
  ]
}

Interactive Debugging

# Add breakpoint
import pdb; pdb.set_trace()

# Or use breakpoint() in Python 3.7+
breakpoint()

# Common commands:
# n - next line
# s - step into function
# c - continue execution
# p variable - print variable
# l - list code around current line
# bt - backtrace (call stack)

Metrics and Monitoring

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
TASK_COUNTER = Counter(
    'octollm_tasks_total',
    'Total tasks processed',
    ['status', 'priority']
)

TASK_DURATION = Histogram(
    'octollm_task_duration_seconds',
    'Task processing duration',
    ['arm_type']
)

ACTIVE_TASKS = Gauge(
    'octollm_active_tasks',
    'Number of currently active tasks'
)

# Usage
TASK_COUNTER.labels(status='completed', priority='high').inc()
TASK_DURATION.labels(arm_type='coder').observe(12.5)
ACTIVE_TASKS.set(5)

# Expose metrics endpoint
from prometheus_client import generate_latest

@app.get("/metrics")
async def metrics():
    return Response(content=generate_latest(), media_type="text/plain")

Common Problems and Solutions

Problem: Task routing failures

# Debug routing
logger.debug(
    "routing.debug",
    required_capabilities=required_capabilities,
    available_arms={
        arm_id: info.get("capabilities")
        for arm_id, info in arm_registry.items()
    }
)

Problem: Database connection issues

# Test connection
psql -h localhost -U octollm -d octollm

# Check connections
SELECT count(*) FROM pg_stat_activity WHERE datname = 'octollm';

# Kill idle connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = 'octollm' AND state = 'idle';

Problem: Memory leaks

# Profile memory usage
import tracemalloc

tracemalloc.start()

# ... run code ...

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

for stat in top_stats[:10]:
    print(stat)

Log Analysis

# View logs
docker-compose logs -f orchestrator

# Filter errors
docker-compose logs orchestrator | grep ERROR

# JSON log parsing with jq
docker-compose logs orchestrator --no-color | jq 'select(.level=="error")'

# Count errors by type
docker-compose logs orchestrator --no-color | \
  jq -r '.error_type' | sort | uniq -c

Summary

This document provides complete Phase 2 implementation specifications for OctoLLM:

  1. Getting Started (15 min): Quick setup to running system
  2. Dev Environment (30-45 min): Complete development setup
  3. Custom Arms (1-2 hours): Build and deploy custom arms
  4. Integration Patterns: Reference for all communication patterns
  5. Orchestrator Implementation (2-3 hours): Build orchestrator from scratch
  6. Testing Guide: Unit, integration, and E2E testing strategies
  7. Debugging Guide: Tools and techniques for troubleshooting

Key Features Across All Guides

  • Step-by-Step Instructions: Numbered steps with time estimates
  • Complete Code Examples: 50+ production-ready implementations
  • Mermaid Diagrams: 10+ architectural and workflow diagrams
  • Platform Coverage: Linux, macOS, Windows (WSL2)
  • Best Practices: Security, performance, testing, observability
  • Troubleshooting: Common problems and solutions
  • Cross-References: Links between related guides

Implementation Roadmap

Week 1: Setup and First Steps

  • Complete Getting Started guide
  • Set up development environment
  • Run all services locally

Week 2-3: Core Learning

  • Review Integration Patterns
  • Build a simple custom arm
  • Understand orchestrator architecture

Week 4-5: Advanced Development

  • Implement orchestrator from scratch
  • Write comprehensive tests
  • Set up debugging and monitoring

Week 6+: Production Readiness

  • Performance optimization
  • Security hardening
  • Production deployment

Next Steps

After completing Phase 2:

  1. Begin actual implementation of arms
  2. Set up CI/CD pipelines
  3. Deploy to staging environment
  4. Conduct integration testing
  5. Move to production deployment

Documentation Metrics

Total Documents: 7 comprehensive guides Total Pages: ~100+ pages of detailed documentation Code Examples: 50+ production-ready implementations Diagrams: 10+ Mermaid architectural diagrams Estimated Completion Time: 8-12 hours total Coverage: Development setup → Testing → Debugging → Deployment


Document Status: ✅ COMPLETE - All Phase 2 implementation guides fully specified Ready for: Immediate use by development team Maintained by: OctoLLM Documentation Team Last Updated: 2025-11-10