Phase 2: Complete Implementation Guides Specifications
Generated: 2025-11-10 Status: PRODUCTION READY Coverage: All 7 Phase 2 implementation guides fully documented Total Time to Complete: 8-12 hours across all guides
This document consolidates all Phase 2 implementation guides for the OctoLLM project. Each guide provides step-by-step instructions, complete code examples, and practical workflows suitable for immediate development use.
Document Index
- Getting Started (15 min) - ✅ Complete
- Development Environment Setup (30-45 min) - ✅ Complete
- Creating Custom Arms (1-2 hours) - ✅ Complete
- Integration Patterns (Reference) - ✅ Complete
- Orchestrator Implementation (2-3 hours) - ✅ Complete
- Testing Guide (Reference) - ✅ Complete
- Debugging Guide (Reference) - ✅ Complete
1. Getting Started Guide
Time: 15 minutes Difficulty: Beginner Prerequisites: Docker, Docker Compose, terminal access
Overview
The quickest path from zero to a running OctoLLM system. Covers:
- Repository setup
- Environment configuration
- Service startup with Docker Compose
- First task submission
- Result verification
Quick Start Workflow
# Step 1: Clone and enter repository (2 min)
git clone https://github.com/your-org/octollm.git
cd octollm
# Step 2: Configure environment (3 min)
cp .env.example .env
# Edit .env with your API keys
nano .env
# Step 3: Start all services (5 min)
docker-compose up -d
# Step 4: Verify services are healthy (1 min)
curl http://localhost:8000/health
curl http://localhost:8001/health # Reflex Layer
curl http://localhost:8100/health # Coder Arm
Essential Environment Variables
# .env file (minimal configuration)
# LLM API Keys (at least one required)
OPENAI_API_KEY=sk-your-openai-key-here
ANTHROPIC_API_KEY=sk-ant-your-key-here
# Database (defaults work for local dev)
POSTGRES_USER=octollm
POSTGRES_PASSWORD=dev-password-change-in-production
POSTGRES_DB=octollm
# Redis
REDIS_PASSWORD=dev-redis-password
# Qdrant (vector DB - leave empty for local)
QDRANT_API_KEY=
# System
LOG_LEVEL=INFO
ENVIRONMENT=development
Submit Your First Task
# Using curl
curl -X POST http://localhost:8000/api/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"goal": "Write a Python function to calculate fibonacci numbers",
"constraints": ["Include docstring", "Add unit tests"],
"priority": "medium"
}'
# Response
{
"task_id": "task-abc123",
"status": "accepted",
"estimated_duration_seconds": 45,
"message": "Task submitted successfully"
}
Check Task Status
# Poll for results
curl http://localhost:8000/api/v1/tasks/task-abc123
# Response when complete
{
"task_id": "task-abc123",
"status": "completed",
"result": {
"code": "def fibonacci(n: int) -> int:\n \"\"\"Calculate nth fibonacci number.\"\"\"\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)",
"tests": "def test_fibonacci():\n assert fibonacci(0) == 0\n assert fibonacci(5) == 5",
"explanation": "Implemented recursive fibonacci with base cases..."
},
"duration_ms": 3421,
"confidence": 0.92
}
Service Architecture (Running Locally)
graph TB
USER[User] -->|HTTP| GATEWAY[Gateway :8000]
GATEWAY -->|Filter| REFLEX[Reflex Layer :8001]
REFLEX -->|Route| ORCH[Orchestrator :8002]
ORCH -->|Delegate| CODER[Coder Arm :8100]
ORCH -->|Delegate| PLANNER[Planner Arm :8101]
ORCH -->|Delegate| JUDGE[Judge Arm :8102]
ORCH -->|Store| POSTGRES[(PostgreSQL :5432)]
ORCH -->|Cache| REDIS[(Redis :6379)]
ORCH -->|Vector| QDRANT[(Qdrant :6333)]
Verify Installation
# Check all containers are running
docker-compose ps
# Expected output:
# NAME STATUS PORTS
# octollm-postgres Up 0.0.0.0:5432->5432/tcp
# octollm-redis Up 0.0.0.0:6379->6379/tcp
# octollm-qdrant Up 0.0.0.0:6333->6333/tcp
# octollm-gateway Up 0.0.0.0:8000->8000/tcp
# octollm-reflex Up 0.0.0.0:8001->8001/tcp
# octollm-orch Up 0.0.0.0:8002->8002/tcp
# octollm-coder Up 0.0.0.0:8100->8100/tcp
# Check logs for any errors
docker-compose logs | grep ERROR
# Should return nothing if all healthy
Common Issues
Issue: Services fail to start
# Solution: Check port conflicts
sudo lsof -i :8000 # Check if port is in use
# Kill conflicting processes or change ports in docker-compose.yml
Issue: PostgreSQL fails to initialize
# Solution: Reset database volume
docker-compose down -v # WARNING: Deletes all data
docker-compose up -d
Issue: API returns "No API key configured"
# Solution: Verify .env file
cat .env | grep API_KEY
# Restart services after fixing
docker-compose restart orchestrator coder-arm planner-arm
Next Steps
After completing this guide:
- ✅ Read Development Environment Setup to contribute code
- ✅ Review Integration Patterns to understand architecture
- ✅ Try Creating Custom Arms to extend functionality
2. Development Environment Setup
Time: 30-45 minutes Target Audience: Contributors to OctoLLM codebase Prerequisites: Command-line knowledge, Git basics
System Requirements
| Resource | Minimum | Recommended |
|---|---|---|
| CPU | 4 cores | 8+ cores |
| RAM | 8 GB | 16+ GB |
| Disk | 20 GB free | 50+ GB SSD |
| OS | Linux, macOS 11+, Win 10+ | Linux/macOS |
Technology Stack Overview
- Python 3.11+: Orchestrator, most arms (Planner, Coder, Judge, etc.)
- Rust: Reflex Layer, Executor Arm (performance-critical)
- FastAPI: HTTP framework for all Python services
- PostgreSQL 15+: Global knowledge graph
- Redis 7+: L1 cache and pub/sub messaging
- Qdrant 1.7+: Vector embeddings for semantic search
- Docker: Local development and production deployment
Python Development Setup
1. Install Python 3.11+
Linux (Ubuntu/Debian):
sudo apt update
sudo apt install -y python3.11 python3.11-venv python3-pip
macOS:
# Via Homebrew
brew install python@3.11
# Verify
python3.11 --version
Windows (WSL2):
# Inside WSL2
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
sudo apt install -y python3.11 python3.11-venv
2. Install Poetry (Python Package Manager)
# Install Poetry
curl -sSL https://install.python-poetry.org | python3 -
# Add to PATH (add to ~/.bashrc or ~/.zshrc)
export PATH="$HOME/.local/bin:$PATH"
# Verify
poetry --version # Should show 1.6+
3. Set Up Python Project
cd octollm/orchestrator
# Install dependencies
poetry install
# Activate virtual environment
poetry shell
# Verify installation
python --version # Should show 3.11+
pip list | grep fastapi # Should show fastapi and dependencies
4. Install Development Tools
# Code formatting and linting
poetry add --group dev black ruff mypy
# Testing
poetry add --group dev pytest pytest-asyncio pytest-cov httpx-mock
# Configure tools
cat > pyproject.toml <<EOF
[tool.black]
line-length = 100
target-version = ['py311']
[tool.ruff]
line-length = 100
select = ["E", "F", "W", "I", "N"]
[tool.mypy]
python_version = "3.11"
strict = true
ignore_missing_imports = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--cov=. --cov-report=html --cov-report=term"
EOF
Rust Development Setup (For Reflex Layer/Executor)
1. Install Rust
# Install rustup (Rust installer)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Follow prompts, then reload shell
source $HOME/.cargo/env
# Verify
rustc --version # Should show 1.70+
cargo --version
2. Install Rust Tools
# Code formatter
rustup component add rustfmt
# Linter
rustup component add clippy
# Language server for IDE integration
rustup component add rust-analyzer
3. Build Rust Components
cd octollm/reflex-layer
# Build in debug mode
cargo build
# Run tests
cargo test
# Build optimized release
cargo build --release
# Run with cargo
cargo run
Database Setup
PostgreSQL
# Install PostgreSQL client tools
# Linux
sudo apt install -y postgresql-client
# macOS
brew install postgresql@15
# Connect to local Docker PostgreSQL
psql -h localhost -U octollm -d octollm
# Password: dev-password-change-in-production
# Verify schema
\dt
# Should show: entities, relationships, task_history, action_log
Redis
# Install Redis CLI
# Linux
sudo apt install -y redis-tools
# macOS
brew install redis
# Connect to local Redis
redis-cli -h localhost -a dev-redis-password
# Test connection
ping # Should return PONG
# View keys
keys *
Qdrant
# Qdrant has HTTP API only, use curl
curl http://localhost:6333/collections
# Expected response:
{
"result": {
"collections": [
{"name": "coder_memory"},
{"name": "planner_memory"},
{"name": "retriever_index"}
]
}
}
IDE Configuration
VS Code (Recommended)
Install Extensions:
code --install-extension ms-python.python
code --install-extension ms-python.vscode-pylance
code --install-extension charliermarsh.ruff
code --install-extension rust-lang.rust-analyzer
code --install-extension tamasfe.even-better-toml
Workspace Settings (.vscode/settings.json):
{
"python.defaultInterpreterPath": "${workspaceFolder}/orchestrator/.venv/bin/python",
"python.linting.enabled": true,
"python.linting.ruffEnabled": true,
"python.formatting.provider": "black",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"[rust]": {
"editor.defaultFormatter": "rust-lang.rust-analyzer",
"editor.formatOnSave": true
},
"rust-analyzer.checkOnSave.command": "clippy"
}
Launch Configuration (.vscode/launch.json):
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug Orchestrator",
"type": "python",
"request": "launch",
"module": "uvicorn",
"args": [
"orchestrator.main:app",
"--reload",
"--host", "0.0.0.0",
"--port", "8002"
],
"env": {
"LOG_LEVEL": "DEBUG"
},
"justMyCode": false
},
{
"name": "Debug Reflex Layer (Rust)",
"type": "lldb",
"request": "launch",
"program": "${workspaceFolder}/reflex-layer/target/debug/reflex-layer",
"args": [],
"cwd": "${workspaceFolder}/reflex-layer"
},
{
"name": "Run Tests (Python)",
"type": "python",
"request": "launch",
"module": "pytest",
"args": ["-v", "--cov=.", "tests/"],
"console": "integratedTerminal"
}
]
}
PyCharm
- Open Project:
File→Open→ Selectoctollmdirectory - Configure Interpreter:
Settings→Project→Python Interpreter- Add Poetry environment:
~/.cache/pypoetry/virtualenvs/octollm-*/bin/python
- Enable Tools:
Settings→Tools→Black→ Enable on saveSettings→Tools→Ruff→ Enable
- Run Configurations:
- Add
FastAPIconfiguration pointing toorchestrator/main.py:app
- Add
Git Workflow Setup
# Configure Git
git config --global user.name "Your Name"
git config --global user.email "your.email@example.com"
# Install pre-commit hooks
pip install pre-commit
# Set up hooks
cd octollm
pre-commit install
# Hooks will now run on every commit
Pre-commit Configuration (.pre-commit-config.yaml):
repos:
- repo: https://github.com/psf/black
rev: 23.11.0
hooks:
- id: black
language_version: python3.11
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.6
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.1
hooks:
- id: mypy
additional_dependencies: [pydantic, fastapi]
- repo: local
hooks:
- id: rust-fmt
name: Rust Format
entry: cargo fmt
language: system
files: \.rs$
pass_filenames: false
- id: rust-clippy
name: Rust Clippy
entry: cargo clippy -- -D warnings
language: system
files: \.rs$
pass_filenames: false
Verification Checklist
After setup, verify everything works:
# Python
cd orchestrator
poetry shell
python -c "import fastapi, pydantic, structlog; print('Python OK')"
pytest tests/ -v # Should pass all tests
# Rust
cd ../reflex-layer
cargo build
cargo test # Should pass all tests
cargo clippy -- -D warnings # Should have no warnings
# Database connections
psql -h localhost -U octollm -d octollm -c "SELECT 1;" # Should return 1
redis-cli -h localhost -a dev-redis-password ping # Should return PONG
curl http://localhost:6333/collections # Should return collections
# Services
docker-compose ps # All should be "Up"
curl http://localhost:8000/health # Should return {"status": "healthy"}
# Git
pre-commit run --all-files # Should pass all hooks
Common Development Commands
# Run orchestrator locally (outside Docker)
cd orchestrator
poetry shell
uvicorn main:app --reload --host 0.0.0.0 --port 8002
# Run tests with coverage
pytest tests/ --cov=. --cov-report=html
# View coverage: open htmlcov/index.html
# Format all code
black .
cargo fmt
# Lint
ruff check . --fix
cargo clippy -- -D warnings
# Type check
mypy .
# Build production images
docker build -t octollm/orchestrator:latest -f orchestrator/Dockerfile .
docker build -t octollm/reflex-layer:latest -f reflex-layer/Dockerfile .
Troubleshooting
Issue: Poetry can't find Python 3.11
# Solution: Specify Python path explicitly
poetry env use /usr/bin/python3.11
poetry install
Issue: Rust build fails with linker errors
# Solution: Install build essentials
# Linux
sudo apt install -y build-essential pkg-config libssl-dev
# macOS
xcode-select --install
Issue: Database connection refused
# Solution: Ensure PostgreSQL container is running
docker-compose ps postgres
docker-compose logs postgres
# Restart if needed
docker-compose restart postgres
Issue: Pre-commit hooks fail
# Solution: Update hook versions
pre-commit autoupdate
pre-commit run --all-files
Next Steps
After environment setup:
- ✅ Try the Getting Started workflow if you haven't
- ✅ Read Creating Custom Arms to build your first component
- ✅ Review Testing Guide for testing best practices
3. Creating Custom Arms
Time: 1-2 hours Difficulty: Intermediate Prerequisites: Dev environment set up, Python or Rust knowledge
Arm Architecture Overview
Every arm follows these design principles:
- Single Responsibility: One domain of expertise
- Self-Contained: Minimal external dependencies
- Stateless: Use memory systems for state
- Observable: Comprehensive logging and metrics
- Resilient: Graceful error handling
Arm Lifecycle
stateDiagram-v2
[*] --> Registration
Registration --> Idle
Idle --> Receiving: Task arrives
Receiving --> Processing: Validate
Processing --> Executing: Start work
Executing --> Validating: Complete
Validating --> Responding: Package
Responding --> Idle: Send
Idle --> [*]: Shutdown
Processing --> Error: Invalid
Executing --> Error: Failed
Error --> Responding: Return error
Step 1: Design Your Arm
Choose a Domain:
- Data processing (ETL, transformation)
- External integrations (APIs, services)
- Specialized computation (math, simulation)
- Content creation (images, videos, documents)
Example: Weather Arm
- Purpose: Fetch and analyze weather data
- Inputs: Location, date range
- Outputs: Weather forecast with analysis
- Dependencies: OpenWeatherMap API
- Cost Tier: 1 (low, fast API calls)
Step 2: Scaffold Project
# Create arm directory
cd octollm/arms
mkdir weather-arm
cd weather-arm
# Initialize Python project
poetry init --name weather-arm --python "^3.11"
# Add dependencies
poetry add fastapi uvicorn pydantic httpx structlog redis qdrant-client
# Add dev dependencies
poetry add --group dev pytest pytest-asyncio httpx-mock
# Create structure
mkdir -p src/weather_arm tests
touch src/weather_arm/__init__.py
touch src/weather_arm/main.py
touch src/weather_arm/models.py
touch src/weather_arm/service.py
touch tests/test_service.py
Step 3: Define Data Models
File: src/weather_arm/models.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from enum import Enum
class WeatherCondition(str, Enum):
CLEAR = "clear"
CLOUDY = "cloudy"
RAINY = "rainy"
SNOWY = "snowy"
STORMY = "stormy"
class WeatherRequest(BaseModel):
"""Input schema for weather queries."""
location: str = Field(..., description="City name or coordinates")
days: int = Field(5, ge=1, le=14, description="Forecast days")
include_analysis: bool = Field(True, description="Include AI analysis")
class WeatherData(BaseModel):
"""Weather data point."""
timestamp: datetime
temperature_celsius: float
condition: WeatherCondition
humidity_percent: float
wind_speed_kmh: float
precipitation_mm: float
class WeatherResponse(BaseModel):
"""Output schema for weather results."""
location: str
forecast: List[WeatherData]
analysis: Optional[str] = None
confidence: float = Field(..., ge=0.0, le=1.0)
data_source: str
cached: bool = False
class HealthStatus(BaseModel):
"""Health check response."""
status: str
version: str
dependencies: dict
Step 4: Implement Core Logic
File: src/weather_arm/service.py
import httpx
import structlog
from typing import Optional
from datetime import datetime, timedelta
from .models import WeatherRequest, WeatherResponse, WeatherData, WeatherCondition
logger = structlog.get_logger()
class WeatherService:
"""Core weather fetching and analysis service."""
def __init__(self, api_key: str, cache_client=None):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
self.client = httpx.AsyncClient(timeout=10.0)
self.cache = cache_client
async def fetch_weather(self, request: WeatherRequest) -> WeatherResponse:
"""Fetch weather data for location."""
# Check cache first
cache_key = f"weather:{request.location}:{request.days}"
if self.cache:
cached = await self._get_cached(cache_key)
if cached:
logger.info("cache.hit", location=request.location)
return WeatherResponse(**cached, cached=True)
# Fetch from API
logger.info("api.fetch", location=request.location, days=request.days)
try:
response = await self.client.get(
f"{self.base_url}/forecast",
params={
"q": request.location,
"appid": self.api_key,
"units": "metric",
"cnt": request.days * 8 # 3-hour intervals
}
)
response.raise_for_status()
data = response.json()
# Parse response
forecast = self._parse_forecast(data)
# Generate analysis if requested
analysis = None
if request.include_analysis:
analysis = await self._analyze_forecast(forecast)
result = WeatherResponse(
location=data["city"]["name"],
forecast=forecast,
analysis=analysis,
confidence=0.95,
data_source="OpenWeatherMap",
cached=False
)
# Cache result
if self.cache:
await self._cache_result(cache_key, result, ttl=1800) # 30 min
return result
except httpx.HTTPError as e:
logger.error("api.error", error=str(e))
raise
def _parse_forecast(self, api_data: dict) -> List[WeatherData]:
"""Convert API data to internal format."""
forecast = []
for item in api_data["list"]:
# Map weather condition
condition_code = item["weather"][0]["main"].lower()
condition = self._map_condition(condition_code)
forecast.append(WeatherData(
timestamp=datetime.fromtimestamp(item["dt"]),
temperature_celsius=item["main"]["temp"],
condition=condition,
humidity_percent=item["main"]["humidity"],
wind_speed_kmh=item["wind"]["speed"] * 3.6, # m/s to km/h
precipitation_mm=item.get("rain", {}).get("3h", 0.0)
))
return forecast
def _map_condition(self, api_condition: str) -> WeatherCondition:
"""Map API condition to enum."""
mapping = {
"clear": WeatherCondition.CLEAR,
"clouds": WeatherCondition.CLOUDY,
"rain": WeatherCondition.RAINY,
"drizzle": WeatherCondition.RAINY,
"snow": WeatherCondition.SNOWY,
"thunderstorm": WeatherCondition.STORMY,
}
return mapping.get(api_condition, WeatherCondition.CLOUDY)
async def _analyze_forecast(self, forecast: List[WeatherData]) -> str:
"""Generate natural language analysis of forecast."""
# Calculate summary statistics
avg_temp = sum(f.temperature_celsius for f in forecast) / len(forecast)
max_temp = max(f.temperature_celsius for f in forecast)
min_temp = min(f.temperature_celsius for f in forecast)
rainy_days = len([f for f in forecast if f.condition == WeatherCondition.RAINY])
# Generate analysis
analysis = f"Forecast analysis for {len(forecast) // 8} days:\n"
analysis += f"- Average temperature: {avg_temp:.1f}°C\n"
analysis += f"- Temperature range: {min_temp:.1f}°C to {max_temp:.1f}°C\n"
if rainy_days > 0:
analysis += f"- Expect rain on {rainy_days} occasions\n"
# Weather trend
temps = [f.temperature_celsius for f in forecast]
if temps[-1] > temps[0] + 3:
analysis += "- Warming trend expected\n"
elif temps[-1] < temps[0] - 3:
analysis += "- Cooling trend expected\n"
else:
analysis += "- Stable temperatures expected\n"
return analysis
async def _get_cached(self, key: str) -> Optional[dict]:
"""Retrieve from cache."""
if not self.cache:
return None
try:
import json
cached_json = await self.cache.get(key)
return json.loads(cached_json) if cached_json else None
except Exception as e:
logger.warning("cache.get.error", error=str(e))
return None
async def _cache_result(self, key: str, result: WeatherResponse, ttl: int):
"""Store in cache."""
if not self.cache:
return
try:
import json
await self.cache.setex(key, ttl, result.json())
except Exception as e:
logger.warning("cache.set.error", error=str(e))
Step 5: Create FastAPI Application
File: src/weather_arm/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import structlog
import redis.asyncio as redis
from contextlib import asynccontextmanager
import os
from .models import WeatherRequest, WeatherResponse, HealthStatus
from .service import WeatherService
# Configure logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Shared state
weather_service: WeatherService = None
redis_client: redis.Redis = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
global weather_service, redis_client
# Startup
logger.info("startup.begin")
# Connect to Redis cache
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
redis_client = await redis.from_url(redis_url)
# Initialize service
api_key = os.getenv("OPENWEATHER_API_KEY")
if not api_key:
raise ValueError("OPENWEATHER_API_KEY not set")
weather_service = WeatherService(api_key=api_key, cache_client=redis_client)
logger.info("startup.complete")
yield
# Shutdown
logger.info("shutdown.begin")
await redis_client.close()
logger.info("shutdown.complete")
app = FastAPI(
title="Weather Arm",
version="1.0.0",
description="Fetch and analyze weather forecasts",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
@app.get("/health", response_model=HealthStatus)
async def health_check():
"""Health check endpoint."""
# Check Redis connection
redis_status = "healthy"
try:
await redis_client.ping()
except Exception:
redis_status = "unhealthy"
return HealthStatus(
status="healthy" if redis_status == "healthy" else "degraded",
version="1.0.0",
dependencies={"redis": redis_status}
)
@app.post("/execute", response_model=WeatherResponse)
async def execute(request: WeatherRequest):
"""Main execution endpoint called by orchestrator."""
logger.info(
"request.received",
location=request.location,
days=request.days
)
try:
result = await weather_service.fetch_weather(request)
logger.info(
"request.completed",
location=result.location,
confidence=result.confidence,
cached=result.cached
)
return result
except Exception as e:
logger.error("request.failed", error=str(e), location=request.location)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/capabilities")
async def capabilities():
"""Describe arm capabilities for orchestrator registration."""
return {
"arm_id": "weather",
"name": "Weather Arm",
"version": "1.0.0",
"capabilities": [
"weather_forecast",
"weather_analysis",
"location_weather"
],
"input_schema": WeatherRequest.schema(),
"output_schema": WeatherResponse.schema(),
"cost_tier": 1,
"average_latency_ms": 300,
"max_concurrent": 10
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8103)
Step 6: Write Tests
File: tests/test_service.py
import pytest
from httpx import AsyncClient, Response
from src.weather_arm.service import WeatherService
from src.weather_arm.models import WeatherRequest, WeatherCondition
@pytest.fixture
def mock_api_response():
"""Mock OpenWeatherMap API response."""
return {
"city": {"name": "London"},
"list": [
{
"dt": 1699632000,
"main": {"temp": 12.5, "humidity": 75},
"weather": [{"main": "Rain"}],
"wind": {"speed": 5.5},
"rain": {"3h": 2.5}
},
{
"dt": 1699642800,
"main": {"temp": 11.0, "humidity": 80},
"weather": [{"main": "Clouds"}],
"wind": {"speed": 6.0},
}
]
}
@pytest.mark.asyncio
async def test_fetch_weather_success(httpx_mock, mock_api_response):
"""Test successful weather fetch."""
# Mock API response
httpx_mock.add_response(
url="https://api.openweathermap.org/data/2.5/forecast",
json=mock_api_response
)
# Create service
service = WeatherService(api_key="test-key")
# Execute
request = WeatherRequest(location="London", days=1)
result = await service.fetch_weather(request)
# Verify
assert result.location == "London"
assert len(result.forecast) == 2
assert result.forecast[0].temperature_celsius == 12.5
assert result.forecast[0].condition == WeatherCondition.RAINY
assert result.confidence > 0.9
@pytest.mark.asyncio
async def test_weather_caching(httpx_mock, mock_api_response):
"""Test that results are cached."""
# Mock Redis
from unittest.mock import AsyncMock
mock_cache = AsyncMock()
mock_cache.get.return_value = None # Cache miss
# Mock API
httpx_mock.add_response(json=mock_api_response)
# Create service with cache
service = WeatherService(api_key="test-key", cache_client=mock_cache)
# Execute
request = WeatherRequest(location="London", days=1)
result = await service.fetch_weather(request)
# Verify cache was written
mock_cache.setex.assert_called_once()
assert not result.cached
@pytest.mark.asyncio
async def test_condition_mapping():
"""Test weather condition mapping."""
service = WeatherService(api_key="test-key")
assert service._map_condition("clear") == WeatherCondition.CLEAR
assert service._map_condition("rain") == WeatherCondition.RAINY
assert service._map_condition("snow") == WeatherCondition.SNOWY
assert service._map_condition("thunderstorm") == WeatherCondition.STORMY
Step 7: Create Dockerfile
File: Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install Poetry
RUN pip install --no-cache-dir poetry==1.6.1
# Copy dependency files
COPY pyproject.toml poetry.lock ./
# Install dependencies
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi
# Copy application code
COPY src/ ./src/
# Expose port
EXPOSE 8103
# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8103/health')"
# Run application
CMD ["uvicorn", "src.weather_arm.main:app", "--host", "0.0.0.0", "--port", "8103"]
Step 8: Add to Docker Compose
File: docker-compose.yml (add service)
services:
# ... existing services ...
weather-arm:
build:
context: ./arms/weather-arm
dockerfile: Dockerfile
ports:
- "8103:8103"
environment:
- OPENWEATHER_API_KEY=${OPENWEATHER_API_KEY}
- REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379/0
- LOG_LEVEL=${LOG_LEVEL:-INFO}
depends_on:
- redis
networks:
- octollm-network
restart: unless-stopped
Step 9: Register with Orchestrator
The orchestrator discovers arms via:
- Environment Variable (add to orchestrator service):
environment:
- ARM_REGISTRY=http://weather-arm:8103,http://coder-arm:8100,http://planner-arm:8101
- Dynamic Discovery (orchestrator polls
/capabilities):
# Orchestrator automatically calls:
# GET http://weather-arm:8103/capabilities
# Response used to populate arm registry
Step 10: Test Integration
# Build and start
docker-compose up -d weather-arm
# Check health
curl http://localhost:8103/health
# Test directly
curl -X POST http://localhost:8103/execute \
-H "Content-Type: application/json" \
-d '{
"location": "London",
"days": 3,
"include_analysis": true
}'
# Test via orchestrator
curl -X POST http://localhost:8000/api/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"goal": "Get weather forecast for Paris for next 5 days",
"constraints": ["Include detailed analysis"]
}'
Performance Optimization
Add Metrics:
from prometheus_client import Counter, Histogram, generate_latest
REQUEST_COUNT = Counter('weather_requests_total', 'Total requests')
REQUEST_DURATION = Histogram('weather_request_duration_seconds', 'Request duration')
@app.post("/execute")
@REQUEST_DURATION.time()
async def execute(request: WeatherRequest):
REQUEST_COUNT.inc()
# ... existing code ...
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
Add Connection Pooling:
# Reuse HTTP client
self.client = httpx.AsyncClient(
timeout=10.0,
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
Next Steps
Congratulations! You've built a complete custom arm. Next:
- ✅ Review Integration Patterns for arm-to-arm communication
- ✅ Read Testing Guide for comprehensive testing strategies
- ✅ Check Debugging Guide if you encounter issues
4. Integration Patterns
Purpose: Reference guide for all communication patterns in OctoLLM Estimated Reading Time: 30-45 minutes Use Case: Consult when implementing arm interactions or external integrations
Pattern Categories
This section provides complete code examples for:
- Arm-to-Arm Communication (4 patterns)
- Orchestrator Integration (3 patterns)
- External API Integration (3 patterns)
- Database Integration (4 patterns)
- Message Queue Patterns (2 patterns)
- Webhook Patterns (2 patterns)
- Batch Processing (2 patterns)
- Real-Time Streaming (2 patterns)
- Testing Integration (3 patterns)
Key Integration Patterns
1. Arm-to-Arm Direct Communication
When to use: One arm needs another arm's output synchronously
import httpx
from typing import Optional
class JudgeArmClient:
"""Client for direct communication with Judge Arm."""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=timeout)
async def validate_code(self, code: str, language: str) -> dict:
"""Request code validation from Judge Arm."""
response = await self.client.post(
f"{self.base_url}/validate",
json={
"output": {"code": code},
"validation_types": ["syntax", "quality"],
"context": {"language": language}
},
headers={
"X-Arm-ID": "coder",
"X-Request-ID": str(uuid4())
}
)
response.raise_for_status()
return response.json()
# Usage in Coder Arm
async def generate_code(request):
code = await llm_generate(request)
# Validate with Judge Arm
judge_client = JudgeArmClient("http://judge-arm:8102")
validation = await judge_client.validate_code(code, "python")
if not validation["valid"]:
# Fix issues and retry
code = await fix_code(code, validation["issues"])
return code
2. Orchestrator-Mediated Workflow
When to use: Complex multi-step tasks requiring orchestration
class OrchestratorClient:
"""Client for submitting sub-tasks to orchestrator."""
async def submit_subtask(
self,
goal: str,
required_capabilities: List[str],
parent_task_id: str
) -> str:
"""Submit sub-task to orchestrator for routing."""
response = await self.client.post(
f"{self.orchestrator_url}/api/v1/tasks",
json={
"goal": goal,
"parent_task_id": parent_task_id,
"required_capabilities": required_capabilities,
"priority": "high"
}
)
return response.json()["task_id"]
async def wait_for_result(self, task_id: str, timeout: int = 60) -> dict:
"""Poll for task completion."""
start = time.time()
while time.time() - start < timeout:
result = await self.client.get(f"{self.orchestrator_url}/api/v1/tasks/{task_id}")
if result["status"] == "completed":
return result["result"]
elif result["status"] == "failed":
raise Exception(result["error"])
await asyncio.sleep(2)
raise TimeoutError(f"Task {task_id} did not complete in {timeout}s")
# Usage in Planner Arm
async def execute_plan(plan):
orchestrator = OrchestratorClient("http://orchestrator:8002")
for step in plan.steps:
# Submit step to orchestrator
task_id = await orchestrator.submit_subtask(
goal=step.action,
required_capabilities=step.required_capabilities,
parent_task_id=plan.id
)
# Wait for result
result = await orchestrator.wait_for_result(task_id)
# Store result for next step
plan.store_result(step.id, result)
3. Shared Memory Pattern
When to use: Multiple arms need access to same data
class SharedMemoryClient:
"""Unified client for shared memory systems."""
def __init__(self, redis_url: str, qdrant_url: str, postgres_url: str):
self.redis = redis.from_url(redis_url)
self.qdrant = QdrantClient(url=qdrant_url)
self.postgres = await asyncpg.create_pool(postgres_url)
# L1 Cache (Redis)
async def cache_get(self, key: str) -> Optional[Any]:
"""Get from fast cache."""
value = await self.redis.get(key)
return json.loads(value) if value else None
async def cache_set(self, key: str, value: Any, ttl: int = 300):
"""Set in fast cache with TTL."""
await self.redis.setex(key, ttl, json.dumps(value))
# L2 Vector Store (Qdrant)
async def vector_search(
self,
collection: str,
query: str,
limit: int = 5
) -> List[dict]:
"""Semantic search in vector store."""
query_vector = self.encoder.encode(query)
results = self.qdrant.search(
collection_name=collection,
query_vector=query_vector,
limit=limit
)
return [{"score": r.score, **r.payload} for r in results]
# L3 Knowledge Graph (PostgreSQL)
async def graph_query(self, entity_name: str) -> dict:
"""Query knowledge graph."""
async with self.postgres.acquire() as conn:
entity = await conn.fetchrow(
"SELECT * FROM entities WHERE name = $1",
entity_name
)
relationships = await conn.fetch(
"""SELECT r.relationship_type, e.name as target
FROM relationships r
JOIN entities e ON r.to_entity_id = e.id
WHERE r.from_entity_id = $1""",
entity["id"]
)
return {
"entity": dict(entity),
"relationships": [dict(r) for r in relationships]
}
# Usage across multiple arms
memory = SharedMemoryClient(redis_url, qdrant_url, postgres_url)
# Coder Arm stores solution
await memory.cache_set(f"code:{task_id}", generated_code, ttl=600)
# Judge Arm retrieves and validates
code = await memory.cache_get(f"code:{task_id}")
validation = validate(code)
# Orchestrator records in knowledge graph
await memory.graph_query("Python sorting algorithms")
4. Circuit Breaker Pattern (External APIs)
When to use: Calling unreliable external services
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking calls
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Circuit breaker for external API calls."""
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timedelta(seconds=timeout_seconds)
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN. Try again after "
f"{self.timeout.total_seconds()}s"
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
"""Reset on successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Record failure and open circuit if threshold reached."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to retry."""
return (
self.last_failure_time
and datetime.now() - self.last_failure_time >= self.timeout
)
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=30)
async def call_external_api(data):
async with httpx.AsyncClient() as client:
response = await client.post("https://api.example.com/endpoint", json=data)
response.raise_for_status()
return response.json()
# Protected call
try:
result = await circuit_breaker.call(call_external_api, {"key": "value"})
except CircuitBreakerOpenError:
# Circuit is open, use fallback
result = get_cached_result()
5. Batch Processing Pattern
When to use: Processing large datasets efficiently
from typing import TypeVar, Generic, List, Callable, Awaitable
T = TypeVar('T')
R = TypeVar('R')
class BatchProcessor(Generic[T, R]):
"""Process items in batches with concurrency control."""
def __init__(
self,
batch_size: int = 100,
max_concurrent: int = 5
):
self.batch_size = batch_size
self.max_concurrent = max_concurrent
async def process_batches(
self,
items: List[T],
processor: Callable[[List[T]], Awaitable[List[R]]]
) -> List[R]:
"""Process items in batches with concurrency limit."""
# Split into batches
batches = [
items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)
]
# Process with concurrency limit
semaphore = asyncio.Semaphore(self.max_concurrent)
async def process_batch_with_semaphore(batch):
async with semaphore:
return await processor(batch)
# Execute all batches
results = await asyncio.gather(*[
process_batch_with_semaphore(batch)
for batch in batches
])
# Flatten results
return [item for batch_result in results for item in batch_result]
# Usage: Process 1000 documents
async def process_document_batch(docs: List[str]) -> List[dict]:
"""Process batch of documents."""
# Use LLM to analyze documents
return [analyze_document(doc) for doc in docs]
processor = BatchProcessor(batch_size=50, max_concurrent=3)
documents = load_documents() # 1000 documents
results = await processor.process_batches(documents, process_document_batch)
# Processes in 20 batches of 50, with max 3 concurrent batches
6. WebSocket Streaming Pattern
When to use: Real-time updates to client
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set
class ConnectionManager:
"""Manage WebSocket connections for streaming updates."""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, client_id: str, websocket: WebSocket):
"""Accept new WebSocket connection."""
await websocket.accept()
self.active_connections[client_id] = websocket
def disconnect(self, client_id: str):
"""Remove connection."""
self.active_connections.pop(client_id, None)
async def send_message(self, client_id: str, message: dict):
"""Send message to specific client."""
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
await websocket.send_json(message)
async def broadcast(self, message: dict):
"""Broadcast message to all connected clients."""
for websocket in self.active_connections.values():
await websocket.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
"""WebSocket endpoint for streaming task updates."""
await manager.connect(client_id, websocket)
try:
while True:
# Receive messages from client
data = await websocket.receive_json()
# Process request
task_id = data.get("task_id")
if task_id:
# Stream task progress updates
async for update in stream_task_progress(task_id):
await manager.send_message(client_id, update)
except WebSocketDisconnect:
manager.disconnect(client_id)
async def stream_task_progress(task_id: str):
"""Stream task progress updates."""
while True:
status = await get_task_status(task_id)
yield {
"task_id": task_id,
"status": status["status"],
"progress": status.get("progress", 0),
"message": status.get("message", "")
}
if status["status"] in ["completed", "failed"]:
break
await asyncio.sleep(1)
Complete Integration Examples
Multi-Arm Workflow: Coder → Judge → Executor pipeline
async def code_validate_execute_workflow(task_request):
"""Complete workflow: generate code, validate, execute."""
# Step 1: Generate code (Coder Arm)
coder = ArmClient("http://coder-arm:8100")
code_result = await coder.execute({
"request_type": "generate",
"instruction": task_request.goal,
"language": "python"
})
# Step 2: Validate code (Judge Arm)
judge = ArmClient("http://judge-arm:8102")
validation = await judge.execute({
"output": code_result,
"validation_types": ["schema", "quality", "criteria"],
"acceptance_criteria": task_request.acceptance_criteria
})
if not validation["valid"]:
raise ValueError(f"Validation failed: {validation['issues']}")
# Step 3: Execute code (Executor Arm)
executor = ArmClient("http://executor-arm:8103")
execution_result = await executor.execute({
"action_type": "python",
"code": code_result["code"],
"timeout_seconds": 30
})
return {
"code": code_result["code"],
"validation": validation,
"execution": execution_result
}
Best Practices Summary
- Always use timeouts on all HTTP/API calls
- Implement retry logic with exponential backoff
- Cache aggressively to reduce latency and cost
- Log all integration points with structured logging
- Monitor failures with metrics and alerts
- Test integration paths with contract tests
- Document API contracts with OpenAPI/Swagger
- Version APIs to support backward compatibility
- Use circuit breakers for external dependencies
- Implement graceful degradation when services fail
Reference Architecture
graph TB
CLIENT[Client] -->|HTTP| GATEWAY[API Gateway]
GATEWAY -->|Filter| REFLEX[Reflex Layer]
REFLEX -->|Route| ORCH[Orchestrator]
ORCH -->|Direct HTTP| ARM1[Coder Arm]
ORCH -->|Direct HTTP| ARM2[Judge Arm]
ORCH -->|Direct HTTP| ARM3[Executor Arm]
ARM1 -->|Validate| ARM2
ARM2 -->|Execute| ARM3
ORCH -->|Read/Write| POSTGRES[(PostgreSQL)]
ORCH -->|Cache| REDIS[(Redis)]
ORCH -->|Vector Search| QDRANT[(Qdrant)]
ARM1 -->|Share Data| REDIS
ARM2 -->|Share Data| REDIS
ARM3 -->|Share Data| REDIS
ORCH -->|Metrics| PROMETHEUS[Prometheus]
PROMETHEUS -->|Visualize| GRAFANA[Grafana]
5. Orchestrator Implementation
Time: 2-3 hours Difficulty: Advanced Prerequisites: Python proficiency, async programming, OctoLLM architecture understanding
Overview
Build the orchestrator from scratch following these steps:
- Project setup and dependencies
- Configuration management
- Core components (Intent Parser, Task Planner, Arm Router)
- API implementation
- Testing
- Deployment
Project Structure
orchestrator/
├── pyproject.toml # Poetry configuration
├── src/
│ └── orchestrator/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── config.py # Configuration
│ ├── models.py # Pydantic models
│ ├── intent_parser.py
│ ├── task_planner.py
│ ├── arm_router.py
│ ├── result_integrator.py
│ └── memory.py # Memory client
├── tests/
│ ├── test_intent_parser.py
│ ├── test_task_planner.py
│ ├── test_arm_router.py
│ └── test_api.py
└── Dockerfile
Step 1: Dependencies
File: pyproject.toml
[tool.poetry]
name = "orchestrator"
version = "1.0.0"
description = "OctoLLM Orchestrator Service"
authors = ["Your Team"]
python = "^3.11"
[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.104.1"
uvicorn = {extras = ["standard"], version = "^0.24.0"}
pydantic = "^2.5.0"
pydantic-settings = "^2.1.0"
httpx = "^0.25.2"
asyncpg = "^0.29.0"
redis = {extras = ["hiredis"], version = "^5.0.1"}
qdrant-client = "^1.7.0"
structlog = "^23.2.0"
tenacity = "^8.2.3"
openai = "^1.3.7"
prometheus-client = "^0.19.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.21.1"
pytest-cov = "^4.1.0"
httpx-mock = "^0.11.0"
black = "^23.11.0"
ruff = "^0.1.6"
mypy = "^1.7.1"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Step 2: Configuration
File: src/orchestrator/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field
class Settings(BaseSettings):
"""Orchestrator configuration from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
case_sensitive=False
)
# API Configuration
api_host: str = Field(default="0.0.0.0")
api_port: int = Field(default=8002)
# LLM Configuration
openai_api_key: str = Field(...)
llm_model_planning: str = Field(default="gpt-3.5-turbo")
llm_model_intent: str = Field(default="gpt-3.5-turbo")
# Database URLs
postgres_url: str = Field(default="postgresql://octollm:password@localhost:5432/octollm")
redis_url: str = Field(default="redis://localhost:6379/0")
qdrant_url: str = Field(default="http://localhost:6333")
# System Configuration
max_concurrent_tasks: int = Field(default=10, ge=1, le=100)
task_timeout_seconds: int = Field(default=300, ge=10, le=3600)
log_level: str = Field(default="INFO")
environment: str = Field(default="development")
# Arm Discovery
arm_registry_url: Optional[str] = Field(default=None)
arm_discovery_interval_seconds: int = Field(default=60)
settings = Settings()
Step 3: Data Models
File: src/orchestrator/models.py
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
import uuid
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class TaskStatus(str, Enum):
PENDING = "pending"
ACCEPTED = "accepted"
PLANNING = "planning"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
class TaskRequest(BaseModel):
"""Incoming task request from client."""
goal: str = Field(..., min_length=10, max_length=2000)
constraints: List[str] = Field(default_factory=list)
context: Dict[str, Any] = Field(default_factory=dict)
priority: Priority = Field(default=Priority.MEDIUM)
deadline_seconds: Optional[int] = Field(None, ge=10, le=3600)
class SubTask(BaseModel):
"""Single step in execution plan."""
step: int = Field(..., ge=1)
action: str
required_arm: str
acceptance_criteria: List[str]
depends_on: List[int] = Field(default_factory=list)
estimated_duration_seconds: int = Field(..., ge=1)
class ExecutionPlan(BaseModel):
"""Complete task execution plan."""
plan_id: str = Field(default_factory=lambda: f"plan-{uuid.uuid4()}")
subtasks: List[SubTask]
estimated_duration_seconds: int
confidence: float = Field(..., ge=0.0, le=1.0)
class TaskResponse(BaseModel):
"""Response to task submission."""
task_id: str
status: TaskStatus
estimated_duration_seconds: Optional[int] = None
message: str
class TaskResult(BaseModel):
"""Complete task result."""
task_id: str
status: TaskStatus
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
duration_ms: Optional[int] = None
confidence: Optional[float] = None
plan: Optional[ExecutionPlan] = None
created_at: datetime
completed_at: Optional[datetime] = None
Step 4: Intent Parser
File: src/orchestrator/intent_parser.py
import openai
import json
import structlog
from typing import Dict, Any
logger = structlog.get_logger()
class ParsedIntent(BaseModel):
"""Structured intent from natural language."""
goal: str
required_capabilities: List[str]
constraints: List[str]
context: Dict[str, Any]
complexity: str # "simple", "medium", "complex"
confidence: float
class IntentParser:
"""Parse natural language requests into structured intents."""
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
self.client = openai.AsyncOpenAI(api_key=api_key)
self.model = model
async def parse(self, user_request: str) -> ParsedIntent:
"""Parse user request into structured intent."""
logger.info("intent.parse.start", request_length=len(user_request))
prompt = self._build_parsing_prompt(user_request)
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": prompt["system"]},
{"role": "user", "content": prompt["user"]}
],
temperature=0.3,
response_format={"type": "json_object"}
)
parsed = json.loads(response.choices[0].message.content)
intent = ParsedIntent(**parsed)
logger.info(
"intent.parse.success",
capabilities=intent.required_capabilities,
complexity=intent.complexity,
confidence=intent.confidence
)
return intent
except Exception as e:
logger.error("intent.parse.failed", error=str(e))
raise
def _build_parsing_prompt(self, request: str) -> Dict[str, str]:
"""Build prompt for intent parsing."""
system_prompt = """You are an intent parser for a distributed AI system.
Available capabilities:
- code_generation: Generate, debug, refactor code
- code_execution: Run scripts, shell commands
- web_search: Search internet, documentation
- data_analysis: Analyze datasets, statistics
- validation: Check outputs, fact-check
- planning: Break down complex tasks
- safety: Content filtering, PII detection
Your task: Parse requests into structured intents.
Output JSON format:
{
"goal": "Clear, specific goal statement",
"required_capabilities": ["capability1", "capability2"],
"constraints": ["constraint1", "constraint2"],
"context": {"key": "value"},
"complexity": "simple|medium|complex",
"confidence": 0.0-1.0
}"""
user_prompt = f"Parse this request:\n\n{request}"
return {"system": system_prompt, "user": user_prompt}
Step 5: Task Planner
File: src/orchestrator/task_planner.py
import openai
import json
import structlog
from typing import List, Dict, Any
logger = structlog.get_logger()
class TaskPlanner:
"""Decompose complex tasks into executable subtasks."""
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
self.client = openai.AsyncOpenAI(api_key=api_key)
self.model = model
async def plan(
self,
goal: str,
constraints: List[str],
context: Dict[str, Any]
) -> ExecutionPlan:
"""Generate execution plan for goal."""
logger.info("plan.generate.start", goal=goal[:50])
prompt = self._build_planning_prompt(goal, constraints, context)
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": prompt["system"]},
{"role": "user", "content": prompt["user"]}
],
temperature=0.7,
response_format={"type": "json_object"}
)
plan_data = json.loads(response.choices[0].message.content)
# Parse subtasks
subtasks = [SubTask(**step) for step in plan_data["subtasks"]]
# Calculate total duration
total_duration = sum(s.estimated_duration_seconds for s in subtasks)
plan = ExecutionPlan(
subtasks=subtasks,
estimated_duration_seconds=total_duration,
confidence=plan_data.get("confidence", 0.8)
)
# Validate plan
self._validate_plan(plan)
logger.info(
"plan.generate.success",
steps=len(subtasks),
duration=total_duration
)
return plan
except Exception as e:
logger.error("plan.generate.failed", error=str(e))
raise
def _validate_plan(self, plan: ExecutionPlan):
"""Validate plan structure and dependencies."""
step_numbers = {s.step for s in plan.subtasks}
for subtask in plan.subtasks:
# Check dependencies exist
for dep in subtask.depends_on:
if dep not in step_numbers:
raise ValueError(
f"Step {subtask.step} depends on non-existent step {dep}"
)
# Check no forward dependencies
if dep >= subtask.step:
raise ValueError(
f"Step {subtask.step} cannot depend on later step {dep}"
)
def _build_planning_prompt(
self,
goal: str,
constraints: List[str],
context: Dict[str, Any]
) -> Dict[str, str]:
"""Build prompt for task planning."""
system_prompt = """You are a task planner for a distributed AI system.
Available arms:
- coder: Code generation, debugging, refactoring
- executor: Run commands, scripts, API calls
- planner: Task decomposition, dependency resolution
- judge: Validate outputs, fact-check
- retriever: Search knowledge bases, web
- guardian: Safety checks, PII detection
Generate 3-7 clear steps. For each step:
- action: What to do (imperative)
- required_arm: Which arm executes
- acceptance_criteria: 2-3 success conditions
- depends_on: Prerequisite step numbers
- estimated_duration_seconds: Realistic estimate
Output JSON format:
{
"subtasks": [
{
"step": 1,
"action": "Search for...",
"required_arm": "retriever",
"acceptance_criteria": ["Found X", "Contains Y"],
"depends_on": [],
"estimated_duration_seconds": 20
}
],
"confidence": 0.85
}"""
user_prompt = f"""Goal: {goal}
Constraints:
{chr(10).join(f"- {c}" for c in constraints) if constraints else "None"}
Context:
{json.dumps(context, indent=2) if context else "None"}
Generate execution plan:"""
return {"system": system_prompt, "user": user_prompt}
Step 6: Arm Router
File: src/orchestrator/arm_router.py
import structlog
from typing import Dict, List, Optional
from dataclasses import dataclass
logger = structlog.get_logger()
@dataclass
class ArmScore:
"""Scoring for arm selection."""
arm_id: str
capability_match: float
availability: float
historical_success: float
cost_efficiency: float
total_score: float
class ArmRouter:
"""Route tasks to appropriate arms based on capabilities."""
def __init__(self):
self.arm_registry: Dict[str, Dict] = {}
self.historical_stats: Dict[str, Dict] = {}
def register_arm(self, arm_id: str, capabilities: Dict):
"""Register arm with capabilities."""
self.arm_registry[arm_id] = capabilities
if arm_id not in self.historical_stats:
self.historical_stats[arm_id] = {
"total": 0,
"success": 0,
"avg_duration_ms": 0
}
logger.info("arm.registered", arm_id=arm_id, capabilities=capabilities.get("capabilities"))
async def route(
self,
required_capabilities: List[str],
priority: str = "medium"
) -> str:
"""Select best arm for task."""
logger.info(
"routing.start",
required_capabilities=required_capabilities,
available_arms=list(self.arm_registry.keys())
)
# Score all arms
scores = []
for arm_id in self.arm_registry:
score = self._score_arm(arm_id, required_capabilities, priority)
if score.capability_match > 0: # Must have at least one capability
scores.append(score)
if not scores:
raise ValueError(
f"No arm found with capabilities: {required_capabilities}"
)
# Select best
best = max(scores, key=lambda s: s.total_score)
logger.info(
"routing.selected",
arm_id=best.arm_id,
score=best.total_score,
capability_match=best.capability_match
)
return best.arm_id
def _score_arm(
self,
arm_id: str,
required_capabilities: List[str],
priority: str
) -> ArmScore:
"""Calculate composite score for arm.
Scoring weights:
- Capability match: 40%
- Availability: 20%
- Historical success: 30%
- Cost efficiency: 10%
"""
arm_info = self.arm_registry[arm_id]
arm_capabilities = set(arm_info.get("capabilities", []))
required_set = set(required_capabilities)
# Capability match (40%)
matching = arm_capabilities & required_set
capability_match = len(matching) / len(required_set) if required_set else 0
# Availability (20%)
status = arm_info.get("status", "healthy")
availability = 1.0 if status == "healthy" else 0.0
# Historical success rate (30%)
stats = self.historical_stats.get(arm_id, {"success": 10, "total": 10})
historical_success = stats["success"] / stats["total"] if stats["total"] > 0 else 0.5
# Cost efficiency (10%)
cost_tier = arm_info.get("cost_tier", 3)
cost_efficiency = 1.0 - (cost_tier / 5.0)
# Composite score
total_score = (
capability_match * 0.4 +
availability * 0.2 +
historical_success * 0.3 +
cost_efficiency * 0.1
)
return ArmScore(
arm_id=arm_id,
capability_match=capability_match,
availability=availability,
historical_success=historical_success,
cost_efficiency=cost_efficiency,
total_score=total_score
)
def record_execution(self, arm_id: str, success: bool, duration_ms: int):
"""Record arm execution for historical stats."""
if arm_id not in self.historical_stats:
self.historical_stats[arm_id] = {"total": 0, "success": 0}
stats = self.historical_stats[arm_id]
stats["total"] += 1
if success:
stats["success"] += 1
# Update rolling average duration
current_avg = stats.get("avg_duration_ms", 0)
stats["avg_duration_ms"] = (current_avg * 0.9) + (duration_ms * 0.1)
Step 7: FastAPI Application
File: src/orchestrator/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import structlog
import asyncpg
import redis.asyncio as redis
from contextlib import asynccontextmanager
import uuid
from datetime import datetime
from .config import settings
from .models import TaskRequest, TaskResponse, TaskResult, TaskStatus
from .intent_parser import IntentParser
from .task_planner import TaskPlanner
from .arm_router import ArmRouter
# Configure logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Global state
db_pool: asyncpg.Pool = None
redis_client: redis.Redis = None
intent_parser: IntentParser = None
task_planner: TaskPlanner = None
arm_router: ArmRouter = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
global db_pool, redis_client, intent_parser, task_planner, arm_router
logger.info("startup.begin")
# Database
db_pool = await asyncpg.create_pool(settings.postgres_url)
# Redis
redis_client = await redis.from_url(settings.redis_url)
# Components
intent_parser = IntentParser(settings.openai_api_key, settings.llm_model_intent)
task_planner = TaskPlanner(settings.openai_api_key, settings.llm_model_planning)
arm_router = ArmRouter()
# Discover arms
await discover_arms()
logger.info("startup.complete")
yield
logger.info("shutdown.begin")
await db_pool.close()
await redis_client.close()
logger.info("shutdown.complete")
app = FastAPI(
title="OctoLLM Orchestrator",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
# Check database
try:
async with db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
db_status = "healthy"
except Exception:
db_status = "unhealthy"
# Check Redis
try:
await redis_client.ping()
redis_status = "healthy"
except Exception:
redis_status = "unhealthy"
overall = "healthy" if db_status == "healthy" and redis_status == "healthy" else "degraded"
return {
"status": overall,
"version": "1.0.0",
"dependencies": {
"postgres": db_status,
"redis": redis_status
}
}
@app.post("/api/v1/tasks", response_model=TaskResponse)
async def submit_task(request: TaskRequest):
"""Submit new task for execution."""
task_id = f"task-{uuid.uuid4()}"
logger.info(
"task.submitted",
task_id=task_id,
goal=request.goal[:50],
priority=request.priority
)
try:
# Parse intent
intent = await intent_parser.parse(request.goal)
# Generate plan
plan = await task_planner.plan(
goal=intent.goal,
constraints=request.constraints,
context=request.context
)
# Store task
async with db_pool.acquire() as conn:
await conn.execute(
"""INSERT INTO task_history
(task_id, goal, plan, results, success, duration_ms, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
task_id,
request.goal,
plan.json(),
"{}",
False,
0,
datetime.utcnow()
)
# Start execution in background
# (In production, use task queue like Celery)
return TaskResponse(
task_id=task_id,
status=TaskStatus.ACCEPTED,
estimated_duration_seconds=plan.estimated_duration_seconds,
message="Task accepted and queued for execution"
)
except Exception as e:
logger.error("task.submit.failed", task_id=task_id, error=str(e))
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/tasks/{task_id}", response_model=TaskResult)
async def get_task_status(task_id: str):
"""Get status and result of task."""
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM task_history WHERE task_id = $1",
task_id
)
if not row:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
import json
return TaskResult(
task_id=row["task_id"],
status=TaskStatus.COMPLETED if row["success"] else TaskStatus.FAILED,
result=json.loads(row["results"]) if row["results"] else None,
duration_ms=row["duration_ms"],
created_at=row["created_at"],
completed_at=row.get("completed_at")
)
async def discover_arms():
"""Discover and register available arms."""
# In production, query service discovery or config
# For demo, register static arms
arm_router.register_arm("coder", {
"capabilities": ["code_generation", "code_debug", "code_refactor"],
"endpoint": "http://coder-arm:8100",
"cost_tier": 4,
"status": "healthy"
})
arm_router.register_arm("executor", {
"capabilities": ["code_execution", "shell_command", "api_call"],
"endpoint": "http://executor-arm:8103",
"cost_tier": 3,
"status": "healthy"
})
arm_router.register_arm("judge", {
"capabilities": ["validation", "fact_check", "quality_check"],
"endpoint": "http://judge-arm:8102",
"cost_tier": 2,
"status": "healthy"
})
logger.info("arms.discovered", count=len(arm_router.arm_registry))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=settings.api_host, port=settings.api_port)
Step 8: Testing
File: tests/test_api.py
import pytest
from httpx import AsyncClient
from src.orchestrator.main import app
@pytest.mark.asyncio
async def test_submit_task():
"""Test task submission."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/api/v1/tasks",
json={
"goal": "Write a Python function to reverse a string",
"constraints": ["Include docstring"],
"priority": "medium"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
assert data["status"] == "accepted"
@pytest.mark.asyncio
async def test_health_check():
"""Test health endpoint."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded"]
assert "dependencies" in data
Step 9: Deployment
File: Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install Poetry
RUN pip install --no-cache-dir poetry==1.6.1
# Copy dependencies
COPY pyproject.toml poetry.lock ./
# Install dependencies
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction
# Copy application
COPY src/ ./src/
# Expose port
EXPOSE 8002
# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8002/health')"
# Run
CMD ["uvicorn", "src.orchestrator.main:app", "--host", "0.0.0.0", "--port", "8002"]
Run locally:
cd orchestrator
poetry install
poetry shell
uvicorn src.orchestrator.main:app --reload
Run with Docker:
docker build -t octollm/orchestrator:latest .
docker run -p 8002:8002 --env-file .env octollm/orchestrator:latest
Verification
# Health check
curl http://localhost:8002/health
# Submit task
curl -X POST http://localhost:8002/api/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"goal": "Write a function to calculate factorial",
"constraints": ["Use recursion", "Add docstring"],
"priority": "medium"
}'
# Check status
curl http://localhost:8002/api/v1/tasks/task-abc123
6. Testing Guide
Purpose: Comprehensive testing strategy reference Target Audience: All developers Coverage Goals: 85-95% depending on component criticality
Test Pyramid
graph BT
E2E[E2E Tests<br/>10%<br/>Slow, Full System]
INTEGRATION[Integration Tests<br/>30%<br/>Component Boundaries]
UNIT[Unit Tests<br/>60%<br/>Fast, Isolated]
E2E --> INTEGRATION
INTEGRATION --> UNIT
Testing Stack
[tool.poetry.group.test.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.21.1"
pytest-cov = "^4.1.0"
pytest-xdist = "^3.5.0" # Parallel execution
httpx-mock = "^0.11.0" # HTTP mocking
faker = "^20.1.0" # Test data generation
Unit Test Example
import pytest
from src.orchestrator.models import TaskRequest, Priority
class TestTaskContract:
"""Test TaskRequest validation."""
def test_valid_task_request(self):
"""Test valid task creation."""
task = TaskRequest(
goal="Write a function to sort a list",
constraints=["Use Python 3.11+"],
priority=Priority.MEDIUM
)
assert len(task.goal) >= 10
assert task.priority == Priority.MEDIUM
def test_goal_too_short(self):
"""Test goal minimum length validation."""
with pytest.raises(ValidationError) as exc:
TaskRequest(goal="Short", priority=Priority.LOW)
assert "goal" in str(exc.value)
@pytest.mark.parametrize("priority", [
Priority.LOW, Priority.MEDIUM, Priority.HIGH, Priority.CRITICAL
])
def test_all_priorities_valid(self, priority):
"""Test all priority levels accepted."""
task = TaskRequest(
goal="Test goal with sufficient length",
priority=priority
)
assert task.priority == priority
Integration Test Example
@pytest.mark.integration
@pytest.mark.asyncio
async def test_task_submission_workflow(http_client, db_pool):
"""Test complete task submission flow."""
# Submit task
response = await http_client.post(
"/api/v1/tasks",
json={
"goal": "Write a Python function to calculate fibonacci",
"constraints": ["Include docstring", "Add tests"]
}
)
assert response.status_code == 200
task_id = response.json()["task_id"]
# Verify stored in database
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM task_history WHERE task_id = $1",
task_id
)
assert row is not None
assert row["goal"] == "Write a Python function to calculate fibonacci"
E2E Test Example
@pytest.mark.e2e
@pytest.mark.slow
@pytest.mark.asyncio
async def test_complete_code_generation_workflow(http_client):
"""Test end-to-end code generation workflow."""
# 1. Submit task
submit_response = await http_client.post(
"/api/v1/tasks",
json={
"goal": "Write a Python function to reverse a string",
"constraints": ["Include docstring", "Add unit tests"]
}
)
task_id = submit_response.json()["task_id"]
# 2. Poll for completion (max 60s)
max_wait = 60
start = time.time()
while time.time() - start < max_wait:
status_response = await http_client.get(f"/api/v1/tasks/{task_id}")
status = status_response.json()
if status["status"] == "completed":
# 3. Verify result structure
assert "code" in status["result"]
assert "tests" in status["result"]
assert status["confidence"] > 0.7
# 4. Verify code is valid Python
code = status["result"]["code"]
compile(code, "<string>", "exec") # Should not raise
return
elif status["status"] == "failed":
pytest.fail(f"Task failed: {status.get('error')}")
await asyncio.sleep(2)
pytest.fail("Task did not complete within timeout")
Mocking External Services
@pytest.fixture
def mock_openai_client(monkeypatch):
"""Mock OpenAI API calls."""
async def mock_create(*args, **kwargs):
return MockResponse(
choices=[
MockChoice(
message=MockMessage(
content='{"goal": "Test", "required_capabilities": ["code"]}'
)
)
]
)
monkeypatch.setattr(
"openai.AsyncOpenAI.chat.completions.create",
mock_create
)
@pytest.mark.asyncio
async def test_intent_parsing_with_mock(mock_openai_client):
"""Test intent parsing with mocked LLM."""
parser = IntentParser(api_key="test-key")
intent = await parser.parse("Write a Python function")
assert intent.goal == "Test"
assert "code" in intent.required_capabilities
Coverage Configuration
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--cov=src --cov-report=html --cov-report=term --cov-fail-under=85"
markers = [
"unit: Unit tests (fast)",
"integration: Integration tests (medium)",
"e2e: End-to-end tests (slow)",
"slow: Slow tests (>1s)"
]
Run Tests
# All tests
pytest
# Unit tests only (fast)
pytest -m unit
# With coverage
pytest --cov=src --cov-report=html
# Parallel execution
pytest -n auto
# Specific file
pytest tests/test_intent_parser.py -v
7. Debugging Guide
Purpose: Debugging tools, techniques, and common problem solutions Target Audience: All developers Coverage: Development and production debugging
Structured Logging
import structlog
# Configure logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer() # JSON for production
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Usage
logger.info(
"task.started",
task_id="task-123",
user_id="user-456",
goal="Write code"
)
logger.error(
"task.failed",
task_id="task-123",
error=str(e),
traceback=traceback.format_exc()
)
VS Code Debugger
Configuration (.vscode/launch.json):
{
"configurations": [
{
"name": "Debug Orchestrator",
"type": "python",
"request": "launch",
"module": "uvicorn",
"args": [
"src.orchestrator.main:app",
"--reload",
"--host", "0.0.0.0",
"--port", "8002"
],
"env": {
"LOG_LEVEL": "DEBUG",
"OPENAI_API_KEY": "${env:OPENAI_API_KEY}"
},
"justMyCode": false
}
]
}
Interactive Debugging
# Add breakpoint
import pdb; pdb.set_trace()
# Or use breakpoint() in Python 3.7+
breakpoint()
# Common commands:
# n - next line
# s - step into function
# c - continue execution
# p variable - print variable
# l - list code around current line
# bt - backtrace (call stack)
Metrics and Monitoring
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
TASK_COUNTER = Counter(
'octollm_tasks_total',
'Total tasks processed',
['status', 'priority']
)
TASK_DURATION = Histogram(
'octollm_task_duration_seconds',
'Task processing duration',
['arm_type']
)
ACTIVE_TASKS = Gauge(
'octollm_active_tasks',
'Number of currently active tasks'
)
# Usage
TASK_COUNTER.labels(status='completed', priority='high').inc()
TASK_DURATION.labels(arm_type='coder').observe(12.5)
ACTIVE_TASKS.set(5)
# Expose metrics endpoint
from prometheus_client import generate_latest
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
Common Problems and Solutions
Problem: Task routing failures
# Debug routing
logger.debug(
"routing.debug",
required_capabilities=required_capabilities,
available_arms={
arm_id: info.get("capabilities")
for arm_id, info in arm_registry.items()
}
)
Problem: Database connection issues
# Test connection
psql -h localhost -U octollm -d octollm
# Check connections
SELECT count(*) FROM pg_stat_activity WHERE datname = 'octollm';
# Kill idle connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = 'octollm' AND state = 'idle';
Problem: Memory leaks
# Profile memory usage
import tracemalloc
tracemalloc.start()
# ... run code ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
Log Analysis
# View logs
docker-compose logs -f orchestrator
# Filter errors
docker-compose logs orchestrator | grep ERROR
# JSON log parsing with jq
docker-compose logs orchestrator --no-color | jq 'select(.level=="error")'
# Count errors by type
docker-compose logs orchestrator --no-color | \
jq -r '.error_type' | sort | uniq -c
Summary
This document provides complete Phase 2 implementation specifications for OctoLLM:
- ✅ Getting Started (15 min): Quick setup to running system
- ✅ Dev Environment (30-45 min): Complete development setup
- ✅ Custom Arms (1-2 hours): Build and deploy custom arms
- ✅ Integration Patterns: Reference for all communication patterns
- ✅ Orchestrator Implementation (2-3 hours): Build orchestrator from scratch
- ✅ Testing Guide: Unit, integration, and E2E testing strategies
- ✅ Debugging Guide: Tools and techniques for troubleshooting
Key Features Across All Guides
- Step-by-Step Instructions: Numbered steps with time estimates
- Complete Code Examples: 50+ production-ready implementations
- Mermaid Diagrams: 10+ architectural and workflow diagrams
- Platform Coverage: Linux, macOS, Windows (WSL2)
- Best Practices: Security, performance, testing, observability
- Troubleshooting: Common problems and solutions
- Cross-References: Links between related guides
Implementation Roadmap
Week 1: Setup and First Steps
- Complete Getting Started guide
- Set up development environment
- Run all services locally
Week 2-3: Core Learning
- Review Integration Patterns
- Build a simple custom arm
- Understand orchestrator architecture
Week 4-5: Advanced Development
- Implement orchestrator from scratch
- Write comprehensive tests
- Set up debugging and monitoring
Week 6+: Production Readiness
- Performance optimization
- Security hardening
- Production deployment
Next Steps
After completing Phase 2:
- Begin actual implementation of arms
- Set up CI/CD pipelines
- Deploy to staging environment
- Conduct integration testing
- Move to production deployment
Documentation Metrics
Total Documents: 7 comprehensive guides Total Pages: ~100+ pages of detailed documentation Code Examples: 50+ production-ready implementations Diagrams: 10+ Mermaid architectural diagrams Estimated Completion Time: 8-12 hours total Coverage: Development setup → Testing → Debugging → Deployment
Document Status: ✅ COMPLETE - All Phase 2 implementation guides fully specified Ready for: Immediate use by development team Maintained by: OctoLLM Documentation Team Last Updated: 2025-11-10