Component API Contracts
Document: API Specifications Version: 1.0 Last Updated: 2025-11-10 Status: Production Ready
← Back to Documentation | API Reference | REST API
Table of Contents
- Overview
- Core Data Models
- Orchestrator API
- Arm Interface Contract
- Reflex Layer API
- Authentication
- Error Handling
- Versioning
- Rate Limiting
- OpenAPI Specification
Overview
OctoLLM's component API contracts define the formal interfaces between all system components. These contracts ensure interoperability, enable independent development and testing, and provide clear boundaries for security isolation.
Contract Philosophy
The OctoLLM API contracts are designed around these core philosophies:
- Explicit over Implicit: All expectations, constraints, and capabilities are explicitly declared in machine-readable schemas
- Fail Fast: Invalid inputs are rejected immediately with detailed error messages
- Defensive Programming: All components validate inputs and sanitize outputs
- Observable by Default: All operations emit structured logs and metrics
- Capability-Based Security: Access is governed by cryptographic capability tokens, not ambient authority
Design Principles
1. Strong Typing with Pydantic
All data structures use Pydantic models for:
- Automatic validation
- JSON schema generation
- FastAPI integration
- Clear documentation
Example:
from pydantic import BaseModel, Field, validator
class TaskContract(BaseModel):
task_id: str = Field(..., description="Unique identifier")
goal: str = Field(..., min_length=1, max_length=2000)
@validator('task_id')
def validate_task_id(cls, v):
if not v.startswith('task-'):
raise ValueError('task_id must start with "task-"')
return v
2. Versioned Schemas
All schemas include version information:
class VersionedContract(BaseModel):
api_version: str = Field(default="v1", const=True)
schema_version: str = Field(default="1.0.0")
3. Graceful Degradation
Contracts support optional fields for backward compatibility:
class TaskContract(BaseModel):
# Required fields (breaking changes require version bump)
task_id: str
goal: str
# Optional fields (can be added without breaking changes)
priority: Optional[Priority] = Priority.MEDIUM
metadata: Optional[Dict[str, Any]] = {}
4. Rich Error Information
Errors include actionable information:
class ErrorResponse(BaseModel):
error_code: str
message: str
details: Optional[Dict[str, Any]] = None
retry_after_seconds: Optional[int] = None
documentation_url: Optional[str] = None
graph TD
subgraph "Contract Layer"
TC[TaskContract]
AC[ArmCapability]
PM[ProvenanceMetadata]
BM[BaseMessage]
ER[ErrorResponse]
end
subgraph "Orchestrator"
O[Orchestrator API]
end
subgraph "Arms"
A1[Planner Arm]
A2[Coder Arm]
A3[Executor Arm]
end
subgraph "Reflex Layer"
RL[Reflex API]
end
O -->|uses| TC
O -->|queries| AC
O -->|sends| BM
A1 -->|implements| AC
A2 -->|implements| AC
A3 -->|implements| AC
A1 -->|returns| PM
A2 -->|returns| PM
A3 -->|returns| PM
O -->|returns| ER
A1 -->|returns| ER
RL -->|returns| ER
Core Data Models
This section defines the fundamental data structures used throughout OctoLLM.
TaskContract
The TaskContract defines a formal specification for a task or subtask:
Complete Pydantic Model
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Dict, Any
from enum import Enum
from datetime import datetime
class Priority(str, Enum):
"""Task priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class TaskContract(BaseModel):
"""Formal specification for a subtask.
This contract defines everything needed for an arm to understand
and execute a task independently.
"""
# Core identification
task_id: str = Field(
...,
description="Unique task identifier (format: task-{uuid})",
regex=r'^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
# Task definition
goal: str = Field(
...,
description="Natural language goal description",
min_length=10,
max_length=2000
)
constraints: List[str] = Field(
default_factory=list,
description="Hard constraints (time, cost, safety)",
max_items=20
)
context: Dict[str, Any] = Field(
default_factory=dict,
description="Relevant background information"
)
acceptance_criteria: List[str] = Field(
default_factory=list,
description="Conditions for successful completion",
max_items=10
)
# Resource management
budget: Dict[str, int] = Field(
default_factory=lambda: {
"max_tokens": 4000,
"max_time_seconds": 30,
"max_retries": 3
},
description="Resource limits"
)
# Task metadata
priority: Priority = Field(
default=Priority.MEDIUM,
description="Task priority level"
)
parent_task_id: Optional[str] = Field(
None,
description="Parent task if this is a subtask",
regex=r'^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
assigned_arm: Optional[str] = Field(
None,
description="Target arm identifier (e.g., 'coder-001')"
)
# Temporal information
created_at: datetime = Field(
default_factory=datetime.utcnow,
description="Task creation timestamp"
)
deadline: Optional[datetime] = Field(
None,
description="Task deadline (UTC)"
)
# Capability requirements
required_capabilities: List[str] = Field(
default_factory=list,
description="Required capability tokens",
max_items=10
)
# API versioning
api_version: str = Field(
default="v1",
const=True,
description="API version"
)
schema_version: str = Field(
default="1.0.0",
description="Schema version"
)
@validator('deadline')
def validate_deadline(cls, v, values):
"""Ensure deadline is in the future."""
if v and v < values.get('created_at', datetime.utcnow()):
raise ValueError('deadline must be in the future')
return v
@validator('budget')
def validate_budget(cls, v):
"""Validate budget parameters."""
if v.get('max_tokens', 0) <= 0:
raise ValueError('max_tokens must be positive')
if v.get('max_time_seconds', 0) <= 0:
raise ValueError('max_time_seconds must be positive')
return v
class Config:
json_schema_extra = {
"example": {
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"goal": "Generate a Python function to parse JSON with error handling",
"constraints": [
"Must handle malformed JSON gracefully",
"Must include type hints",
"Must include docstrings"
],
"context": {
"language": "python",
"python_version": "3.10+",
"use_case": "API response parsing"
},
"acceptance_criteria": [
"Function includes try-except blocks",
"Function has type hints",
"Function has comprehensive docstring",
"Includes usage example"
],
"budget": {
"max_tokens": 2000,
"max_time_seconds": 15,
"max_retries": 2
},
"priority": "medium",
"assigned_arm": "coder-001",
"required_capabilities": ["code_generation"]
}
}
JSON Schema
{
"title": "TaskContract",
"type": "object",
"required": ["task_id", "goal"],
"properties": {
"task_id": {
"type": "string",
"pattern": "^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
"description": "Unique task identifier"
},
"goal": {
"type": "string",
"minLength": 10,
"maxLength": 2000,
"description": "Natural language goal description"
},
"constraints": {
"type": "array",
"items": {"type": "string"},
"maxItems": 20,
"description": "Hard constraints"
},
"context": {
"type": "object",
"description": "Background information"
},
"acceptance_criteria": {
"type": "array",
"items": {"type": "string"},
"maxItems": 10,
"description": "Success conditions"
},
"budget": {
"type": "object",
"properties": {
"max_tokens": {"type": "integer", "minimum": 1},
"max_time_seconds": {"type": "integer", "minimum": 1},
"max_retries": {"type": "integer", "minimum": 0}
}
},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "critical"]
},
"parent_task_id": {
"type": "string",
"pattern": "^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
},
"assigned_arm": {
"type": "string"
},
"created_at": {
"type": "string",
"format": "date-time"
},
"deadline": {
"type": "string",
"format": "date-time"
},
"required_capabilities": {
"type": "array",
"items": {"type": "string"},
"maxItems": 10
},
"api_version": {
"type": "string",
"const": "v1"
},
"schema_version": {
"type": "string"
}
}
}
ArmCapability
The ArmCapability model describes what an arm can do:
Complete Pydantic Model
from typing import Set, Dict, Any, List
from pydantic import BaseModel, Field, HttpUrl
class ArmCapability(BaseModel):
"""Description of what an arm can do.
This is registered in the ARM_REGISTRY and used by the orchestrator
for intelligent task routing.
"""
# Core identification
arm_id: str = Field(
...,
description="Unique arm identifier (e.g., 'planner-001')",
regex=r'^[a-z]+-[0-9]{3}$'
)
name: str = Field(
...,
description="Human-readable name",
min_length=1,
max_length=100
)
description: str = Field(
...,
description="Detailed description of arm's purpose",
min_length=10,
max_length=500
)
# Schema definitions
input_schema: Dict[str, Any] = Field(
...,
description="JSON schema for input validation"
)
output_schema: Dict[str, Any] = Field(
...,
description="JSON schema for output validation"
)
# Capability tags
capabilities: Set[str] = Field(
...,
description="Capability tags (e.g., 'code', 'security', 'web')",
min_items=1
)
# Performance characteristics
cost_tier: int = Field(
...,
description="Cost tier (1=cheap, 5=expensive)",
ge=1,
le=5
)
average_latency_ms: float = Field(
...,
description="Average response latency in milliseconds",
gt=0
)
success_rate: float = Field(
...,
description="Historical success rate (0.0-1.0)",
ge=0.0,
le=1.0
)
# Network configuration
endpoint: HttpUrl = Field(
...,
description="Kubernetes service URL or function reference"
)
health_check_endpoint: HttpUrl = Field(
...,
description="Health check URL"
)
# Capacity management
max_concurrent_tasks: int = Field(
default=10,
description="Maximum concurrent tasks this arm can handle",
ge=1
)
# Versioning
api_version: str = Field(
default="v1",
description="API version supported by this arm"
)
arm_version: str = Field(
...,
description="Arm implementation version (semver)",
regex=r'^\d+\.\d+\.\d+$'
)
class Config:
json_schema_extra = {
"example": {
"arm_id": "coder-001",
"name": "Coder Arm",
"description": "Generates and analyzes code in multiple programming languages with emphasis on security and quality",
"input_schema": {
"type": "object",
"properties": {
"goal": {"type": "string"},
"language": {"type": "string"},
"context": {"type": "object"}
},
"required": ["goal", "language"]
},
"output_schema": {
"type": "object",
"properties": {
"code": {"type": "string"},
"language": {"type": "string"},
"explanation": {"type": "string"},
"confidence": {"type": "number"}
},
"required": ["code", "language"]
},
"capabilities": ["code_generation", "code_analysis", "refactoring"],
"cost_tier": 3,
"average_latency_ms": 1500.0,
"success_rate": 0.94,
"endpoint": "http://coder-arm:8080",
"health_check_endpoint": "http://coder-arm:8080/health",
"max_concurrent_tasks": 20,
"api_version": "v1",
"arm_version": "1.2.3"
}
}
Arm Registry Example
from typing import Dict
# Global ARM_REGISTRY
ARM_REGISTRY: Dict[str, ArmCapability] = {
"planner": ArmCapability(
arm_id="planner-001",
name="Task Planner",
description="Decomposes complex tasks into subtasks with dependencies",
input_schema={
"type": "object",
"properties": {
"goal": {"type": "string"},
"constraints": {"type": "array", "items": {"type": "string"}}
},
"required": ["goal"]
},
output_schema={
"type": "object",
"properties": {
"plan": {
"type": "array",
"items": {
"type": "object",
"properties": {
"step_id": {"type": "string"},
"action": {"type": "string"},
"arm": {"type": "string"},
"dependencies": {"type": "array", "items": {"type": "string"}}
}
}
}
},
"required": ["plan"]
},
capabilities={"planning", "decomposition", "dependency_resolution"},
cost_tier=2,
average_latency_ms=1200.0,
success_rate=0.92,
endpoint="http://planner-arm:8080",
health_check_endpoint="http://planner-arm:8080/health",
max_concurrent_tasks=15,
api_version="v1",
arm_version="1.0.0"
),
"coder": ArmCapability(
arm_id="coder-001",
name="Coder Arm",
description="Generates and analyzes code in multiple languages",
input_schema={
"type": "object",
"properties": {
"goal": {"type": "string"},
"language": {"type": "string"},
"context": {"type": "object"}
},
"required": ["goal", "language"]
},
output_schema={
"type": "object",
"properties": {
"code": {"type": "string"},
"language": {"type": "string"},
"explanation": {"type": "string"}
},
"required": ["code", "language"]
},
capabilities={"code_generation", "code_analysis", "refactoring"},
cost_tier=3,
average_latency_ms=1500.0,
success_rate=0.94,
endpoint="http://coder-arm:8080",
health_check_endpoint="http://coder-arm:8080/health",
max_concurrent_tasks=20,
api_version="v1",
arm_version="1.2.3"
),
"executor": ArmCapability(
arm_id="executor-001",
name="Executor Arm",
description="Executes tools in isolated sandboxes",
input_schema={
"type": "object",
"properties": {
"tool": {"type": "string"},
"args": {"type": "object"},
"sandbox": {"type": "string"}
},
"required": ["tool", "args"]
},
output_schema={
"type": "object",
"properties": {
"stdout": {"type": "string"},
"stderr": {"type": "string"},
"exit_code": {"type": "integer"},
"duration_ms": {"type": "integer"}
},
"required": ["exit_code"]
},
capabilities={"tool_execution", "sandbox_management", "security_scanning"},
cost_tier=4,
average_latency_ms=2500.0,
success_rate=0.88,
endpoint="http://executor-arm:8080",
health_check_endpoint="http://executor-arm:8080/health",
max_concurrent_tasks=10,
api_version="v1",
arm_version="1.1.0"
),
"retriever": ArmCapability(
arm_id="retriever-001",
name="Retriever Arm",
description="Retrieves and summarizes documentation",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string"},
"sources": {"type": "array", "items": {"type": "string"}}
},
"required": ["query"]
},
output_schema={
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"content": {"type": "string"},
"source": {"type": "string"},
"relevance": {"type": "number"}
}
}
}
},
"required": ["results"]
},
capabilities={"documentation_search", "summarization", "context_extraction"},
cost_tier=2,
average_latency_ms=800.0,
success_rate=0.96,
endpoint="http://retriever-arm:8080",
health_check_endpoint="http://retriever-arm:8080/health",
max_concurrent_tasks=25,
api_version="v1",
arm_version="1.0.5"
),
"judge": ArmCapability(
arm_id="judge-001",
name="Judge Arm",
description="Validates results and enforces quality standards",
input_schema={
"type": "object",
"properties": {
"task_id": {"type": "string"},
"result": {"type": "object"},
"criteria": {"type": "array", "items": {"type": "string"}}
},
"required": ["task_id", "result"]
},
output_schema={
"type": "object",
"properties": {
"passed": {"type": "boolean"},
"score": {"type": "number"},
"feedback": {"type": "string"},
"issues": {"type": "array", "items": {"type": "string"}}
},
"required": ["passed", "score"]
},
capabilities={"result_validation", "quality_assurance", "testing"},
cost_tier=2,
average_latency_ms=900.0,
success_rate=0.98,
endpoint="http://judge-arm:8080",
health_check_endpoint="http://judge-arm:8080/health",
max_concurrent_tasks=30,
api_version="v1",
arm_version="1.0.2"
)
}
ProvenanceMetadata
The ProvenanceMetadata model tracks the origin and processing history of data:
Complete Pydantic Model
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
class ProvenanceMetadata(BaseModel):
"""Provenance information for audit and debugging.
Tracks the complete lineage of a task result including:
- Which components touched the data
- When and why transformations occurred
- Resource consumption
- Security validations
"""
# Source identification
task_id: str = Field(
...,
description="Task identifier",
regex=r'^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
arm_id: str = Field(
...,
description="Arm that produced this result"
)
# Temporal information
timestamp: datetime = Field(
default_factory=datetime.utcnow,
description="Result generation timestamp (UTC)"
)
processing_time_ms: int = Field(
...,
description="Processing duration in milliseconds",
ge=0
)
# Processing chain
processing_chain: List[str] = Field(
default_factory=list,
description="Ordered list of components that processed this data"
)
# Resource consumption
tokens_consumed: Optional[int] = Field(
None,
description="LLM tokens consumed",
ge=0
)
estimated_cost_usd: Optional[float] = Field(
None,
description="Estimated processing cost in USD",
ge=0.0
)
# Quality metrics
confidence: float = Field(
...,
description="Confidence score (0.0-1.0)",
ge=0.0,
le=1.0
)
quality_score: Optional[float] = Field(
None,
description="Quality assessment score (0.0-1.0)",
ge=0.0,
le=1.0
)
# Security
pii_detected: bool = Field(
default=False,
description="Whether PII was detected and redacted"
)
security_scan_passed: bool = Field(
default=True,
description="Whether security scan passed"
)
# Model information
model_used: Optional[str] = Field(
None,
description="Model identifier (e.g., 'claude-sonnet-4')"
)
model_version: Optional[str] = Field(
None,
description="Model version"
)
# Additional metadata
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional provenance metadata"
)
class Config:
json_schema_extra = {
"example": {
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"arm_id": "coder-001",
"timestamp": "2025-11-10T10:30:00Z",
"processing_time_ms": 1450,
"processing_chain": ["reflex-layer", "coder-001", "judge-001"],
"tokens_consumed": 1250,
"estimated_cost_usd": 0.015,
"confidence": 0.92,
"quality_score": 0.88,
"pii_detected": False,
"security_scan_passed": True,
"model_used": "claude-sonnet-4",
"model_version": "20250929",
"metadata": {
"language": "python",
"complexity": "medium",
"cached": False
}
}
}
BaseMessage
The BaseMessage model defines the structure for inter-component communication:
Complete Pydantic Model
from enum import Enum
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
class MessageType(str, Enum):
"""Message types for component communication."""
TASK_REQUEST = "task_request"
TASK_RESPONSE = "task_response"
STATUS_UPDATE = "status_update"
ERROR = "error"
HEARTBEAT = "heartbeat"
CANCEL_REQUEST = "cancel_request"
class BaseMessage(BaseModel):
"""Base message format for all inter-component communication.
All messages exchanged between orchestrator, arms, and other
components use this structure.
"""
# Message identification
message_id: str = Field(
...,
description="Unique message identifier",
regex=r'^msg-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
message_type: MessageType = Field(
...,
description="Message type"
)
# Routing information
sender_id: str = Field(
...,
description="Sender component identifier"
)
recipient_id: str = Field(
...,
description="Recipient component identifier"
)
# Correlation
correlation_id: Optional[str] = Field(
None,
description="Correlation ID for request/response pairs"
)
# Message content
payload: Dict[str, Any] = Field(
...,
description="Message payload"
)
# Temporal information
timestamp: datetime = Field(
default_factory=datetime.utcnow,
description="Message creation timestamp (UTC)"
)
# Priority and delivery
priority: Priority = Field(
default=Priority.MEDIUM,
description="Message priority"
)
ttl_seconds: int = Field(
default=300,
description="Time-to-live in seconds",
ge=1,
le=3600
)
# Metadata
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional metadata"
)
class Config:
json_schema_extra = {
"example": {
"message_id": "msg-650e8400-e29b-41d4-a716-446655440000",
"message_type": "task_request",
"sender_id": "orchestrator-001",
"recipient_id": "coder-001",
"correlation_id": "task-550e8400-e29b-41d4-a716-446655440000",
"payload": {
"goal": "Generate Python function",
"context": {"language": "python"}
},
"timestamp": "2025-11-10T10:30:00Z",
"priority": "medium",
"ttl_seconds": 300,
"metadata": {}
}
}
ErrorResponse
The ErrorResponse model provides structured error information:
Complete Pydantic Model
from enum import Enum
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field, HttpUrl
class ErrorCategory(str, Enum):
"""Error categories for classification."""
VALIDATION = "validation"
AUTHENTICATION = "authentication"
AUTHORIZATION = "authorization"
NOT_FOUND = "not_found"
RATE_LIMIT = "rate_limit"
TIMEOUT = "timeout"
INTERNAL = "internal"
EXTERNAL = "external"
class ErrorResponse(BaseModel):
"""Structured error response.
Provides rich error information including error codes,
human-readable messages, retry guidance, and links to documentation.
"""
# Error identification
error_code: str = Field(
...,
description="Machine-readable error code (e.g., 'INVALID_TASK_ID')",
regex=r'^[A-Z_]+$'
)
category: ErrorCategory = Field(
...,
description="Error category for classification"
)
# Error information
message: str = Field(
...,
description="Human-readable error message",
min_length=1,
max_length=500
)
details: Optional[Dict[str, Any]] = Field(
None,
description="Additional error details (field validation errors, stack traces, etc.)"
)
# Retry guidance
retryable: bool = Field(
default=False,
description="Whether the operation can be retried"
)
retry_after_seconds: Optional[int] = Field(
None,
description="Recommended retry delay in seconds",
ge=1
)
# Documentation
documentation_url: Optional[HttpUrl] = Field(
None,
description="URL to relevant documentation"
)
# Context
request_id: Optional[str] = Field(
None,
description="Request ID for debugging"
)
timestamp: datetime = Field(
default_factory=datetime.utcnow,
description="Error timestamp (UTC)"
)
# Suggestions
suggestions: List[str] = Field(
default_factory=list,
description="Suggested actions to resolve the error",
max_items=5
)
class Config:
json_schema_extra = {
"example": {
"error_code": "INVALID_TASK_ID",
"category": "validation",
"message": "Task ID must match format 'task-{uuid}'",
"details": {
"field": "task_id",
"value": "invalid-id",
"expected_pattern": "^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
},
"retryable": False,
"retry_after_seconds": None,
"documentation_url": "https://docs.octollm.io/api/errors#INVALID_TASK_ID",
"request_id": "req-750e8400-e29b-41d4-a716-446655440000",
"timestamp": "2025-11-10T10:30:00Z",
"suggestions": [
"Ensure task_id starts with 'task-' followed by a valid UUID",
"Use the task creation endpoint to generate a valid task_id"
]
}
}
Orchestrator API
The Orchestrator exposes a REST API for task management and system monitoring.
POST /task
Create and submit a new task for execution.
Request
POST /v1/task HTTP/1.1
Host: orchestrator.octollm.svc.cluster.local
Content-Type: application/json
Authorization: Bearer <capability_token>
{
"goal": "Scan example.com for open ports and identify services",
"constraints": [
"Use only non-invasive scanning techniques",
"Complete within 60 seconds",
"Minimize network bandwidth"
],
"context": {
"target": "example.com",
"scan_type": "service_detection"
},
"acceptance_criteria": [
"All open ports identified",
"Services correctly detected",
"No false positives"
],
"priority": "high",
"budget": {
"max_tokens": 5000,
"max_time_seconds": 60,
"max_retries": 2
}
}
Response (202 Accepted)
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: /v1/task/task-550e8400-e29b-41d4-a716-446655440000
{
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"status": "accepted",
"message": "Task queued for processing",
"estimated_completion_seconds": 45,
"created_at": "2025-11-10T10:30:00Z"
}
Error Response (400 Bad Request)
HTTP/1.1 400 Bad Request
Content-Type: application/json
{
"error_code": "INVALID_BUDGET",
"category": "validation",
"message": "max_time_seconds must be positive",
"details": {
"field": "budget.max_time_seconds",
"value": -10,
"constraint": "minimum: 1"
},
"retryable": false,
"documentation_url": "https://docs.octollm.io/api/errors#INVALID_BUDGET",
"suggestions": [
"Set max_time_seconds to a positive integer",
"Typical values range from 10 to 300 seconds"
]
}
cURL Example
curl -X POST https://orchestrator.octollm.io/v1/task \
-H "Content-Type: application/json" \
-H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGc..." \
-d '{
"goal": "Scan example.com for open ports",
"constraints": ["Non-invasive only"],
"priority": "high"
}'
Python Client Example
import requests
def create_task(goal: str, priority: str = "medium") -> dict:
"""Create a new task."""
response = requests.post(
"https://orchestrator.octollm.io/v1/task",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {CAPABILITY_TOKEN}"
},
json={
"goal": goal,
"priority": priority,
"budget": {
"max_tokens": 5000,
"max_time_seconds": 60
}
}
)
response.raise_for_status()
return response.json()
# Usage
result = create_task("Scan example.com for vulnerabilities", priority="high")
print(f"Task ID: {result['task_id']}")
GET /task/
Retrieve the status and results of a task.
Request
GET /v1/task/task-550e8400-e29b-41d4-a716-446655440000 HTTP/1.1
Host: orchestrator.octollm.svc.cluster.local
Authorization: Bearer <capability_token>
Response (200 OK) - Running Task
HTTP/1.1 200 OK
Content-Type: application/json
{
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"progress": 0.65,
"current_step": "executor-001: Running nmap scan",
"created_at": "2025-11-10T10:30:00Z",
"started_at": "2025-11-10T10:30:02Z",
"estimated_completion": "2025-11-10T10:31:15Z",
"steps_completed": 2,
"steps_total": 4
}
Response (200 OK) - Completed Task
HTTP/1.1 200 OK
Content-Type: application/json
{
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"success": true,
"created_at": "2025-11-10T10:30:00Z",
"started_at": "2025-11-10T10:30:02Z",
"completed_at": "2025-11-10T10:31:12Z",
"duration_ms": 70000,
"result": {
"open_ports": [22, 80, 443],
"services": {
"22": "OpenSSH 8.2p1",
"80": "nginx/1.18.0",
"443": "nginx/1.18.0 (TLS 1.3)"
},
"confidence": 0.95
},
"provenance": {
"arm_id": "executor-001",
"processing_time_ms": 65000,
"tokens_consumed": 850,
"confidence": 0.95
}
}
Response (404 Not Found)
HTTP/1.1 404 Not Found
Content-Type: application/json
{
"error_code": "TASK_NOT_FOUND",
"category": "not_found",
"message": "Task with ID 'task-550e8400-e29b-41d4-a716-446655440000' not found",
"retryable": false,
"suggestions": [
"Verify the task_id is correct",
"Check if the task has expired (default TTL: 24 hours)"
]
}
POST /task/{task_id}/cancel
Cancel a running task.
Request
POST /v1/task/task-550e8400-e29b-41d4-a716-446655440000/cancel HTTP/1.1
Host: orchestrator.octollm.svc.cluster.local
Authorization: Bearer <capability_token>
Content-Type: application/json
{
"reason": "User requested cancellation"
}
Response (200 OK)
HTTP/1.1 200 OK
Content-Type: application/json
{
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"status": "cancelled",
"message": "Task cancellation initiated",
"cancelled_at": "2025-11-10T10:30:45Z"
}
GET /health
Health check endpoint for monitoring.
Request
GET /v1/health HTTP/1.1
Host: orchestrator.octollm.svc.cluster.local
Response (200 OK)
HTTP/1.1 200 OK
Content-Type: application/json
{
"status": "healthy",
"version": "1.0.0",
"timestamp": "2025-11-10T10:30:00Z",
"checks": {
"database": {"status": "up", "latency_ms": 5},
"redis": {"status": "up", "latency_ms": 1},
"qdrant": {"status": "up", "latency_ms": 3},
"arms": {
"planner-001": {"status": "up"},
"coder-001": {"status": "up"},
"executor-001": {"status": "up"},
"retriever-001": {"status": "up"},
"judge-001": {"status": "up"}
}
}
}
GET /metrics
Prometheus metrics endpoint.
Request
GET /v1/metrics HTTP/1.1
Host: orchestrator.octollm.svc.cluster.local
Response (200 OK)
HTTP/1.1 200 OK
Content-Type: text/plain; version=0.0.4
# HELP octollm_tasks_total Total tasks processed
# TYPE octollm_tasks_total counter
octollm_tasks_total{status="completed"} 1250
octollm_tasks_total{status="failed"} 45
octollm_tasks_total{status="cancelled"} 12
# HELP octollm_task_duration_seconds Task duration
# TYPE octollm_task_duration_seconds histogram
octollm_task_duration_seconds_bucket{le="1.0"} 120
octollm_task_duration_seconds_bucket{le="5.0"} 890
octollm_task_duration_seconds_bucket{le="10.0"} 1150
octollm_task_duration_seconds_bucket{le="+Inf"} 1307
octollm_task_duration_seconds_sum 8432.5
octollm_task_duration_seconds_count 1307
# HELP octollm_arms_active Currently active arms
# TYPE octollm_arms_active gauge
octollm_arms_active{arm_id="planner-001"} 1
octollm_arms_active{arm_id="coder-001"} 1
octollm_arms_active{arm_id="executor-001"} 1
Arm Interface Contract
All arms must implement a standard interface for interoperability with the orchestrator.
Standard Arm Endpoints
Every arm MUST expose these endpoints:
POST /{arm_id}/execute
Execute a task.
Request:
{
"task_contract": {
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"goal": "Generate Python function for JSON parsing",
"context": {"language": "python"},
"budget": {"max_tokens": 2000}
},
"capability_token": "eyJ0eXAiOiJKV1QiLCJhbGc..."
}
Response:
{
"task_id": "task-550e8400-e29b-41d4-a716-446655440000",
"success": true,
"result": {
"code": "def parse_json(data: str) -> dict: ...",
"language": "python",
"explanation": "Function includes error handling..."
},
"provenance": {
"arm_id": "coder-001",
"processing_time_ms": 1450,
"confidence": 0.92
}
}
GET /{arm_id}/health
Health check.
Response:
{
"status": "healthy",
"arm_id": "coder-001",
"version": "1.2.3",
"capabilities": ["code_generation", "code_analysis"],
"active_tasks": 3,
"max_concurrent_tasks": 20
}
GET /{arm_id}/capabilities
Get arm capabilities.
Response:
{
"arm_id": "coder-001",
"name": "Coder Arm",
"capabilities": ["code_generation", "code_analysis", "refactoring"],
"input_schema": {...},
"output_schema": {...},
"cost_tier": 3,
"average_latency_ms": 1500.0
}
Request Format
Standard request to arm:
class ArmRequest(BaseModel):
"""Standard request format for arm execution."""
task_contract: TaskContract
capability_token: str
request_id: str = Field(default_factory=lambda: f"req-{uuid.uuid4()}")
timeout_seconds: int = Field(default=30, ge=1, le=300)
# Example
request = ArmRequest(
task_contract=TaskContract(
task_id="task-550e8400-e29b-41d4-a716-446655440000",
goal="Generate code",
budget={"max_tokens": 2000}
),
capability_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
timeout_seconds=30
)
Response Format
Standard response from arm:
class ArmResponse(BaseModel):
"""Standard response format from arm execution."""
task_id: str
success: bool
result: Optional[Dict[str, Any]] = None
error: Optional[ErrorResponse] = None
provenance: ProvenanceMetadata
# Example - Success
response = ArmResponse(
task_id="task-550e8400-e29b-41d4-a716-446655440000",
success=True,
result={
"code": "def parse_json(data): ...",
"language": "python"
},
provenance=ProvenanceMetadata(
arm_id="coder-001",
processing_time_ms=1450,
confidence=0.92
)
)
# Example - Error
response = ArmResponse(
task_id="task-550e8400-e29b-41d4-a716-446655440000",
success=False,
error=ErrorResponse(
error_code="EXECUTION_TIMEOUT",
category="timeout",
message="Task execution exceeded timeout",
retryable=True,
retry_after_seconds=60
),
provenance=ProvenanceMetadata(
arm_id="coder-001",
processing_time_ms=30000,
confidence=0.0
)
)
Error Handling
Arms must handle errors gracefully and return structured error responses:
async def execute_task(request: ArmRequest) -> ArmResponse:
"""Execute task with comprehensive error handling."""
try:
# Validate capability token
if not verify_capability_token(request.capability_token):
return ArmResponse(
task_id=request.task_contract.task_id,
success=False,
error=ErrorResponse(
error_code="INVALID_CAPABILITY_TOKEN",
category="authentication",
message="Capability token is invalid or expired",
retryable=False
),
provenance=ProvenanceMetadata(
arm_id=ARM_ID,
processing_time_ms=0,
confidence=0.0
)
)
# Execute task with timeout
result = await asyncio.wait_for(
_execute_task_internal(request.task_contract),
timeout=request.timeout_seconds
)
return ArmResponse(
task_id=request.task_contract.task_id,
success=True,
result=result,
provenance=ProvenanceMetadata(...)
)
except asyncio.TimeoutError:
return ArmResponse(
task_id=request.task_contract.task_id,
success=False,
error=ErrorResponse(
error_code="EXECUTION_TIMEOUT",
category="timeout",
message=f"Task execution exceeded {request.timeout_seconds}s",
retryable=True,
retry_after_seconds=60
),
provenance=ProvenanceMetadata(...)
)
except Exception as e:
logger.exception("Unexpected error during task execution")
return ArmResponse(
task_id=request.task_contract.task_id,
success=False,
error=ErrorResponse(
error_code="INTERNAL_ERROR",
category="internal",
message="An unexpected error occurred",
details={"error_type": type(e).__name__},
retryable=True,
retry_after_seconds=30
),
provenance=ProvenanceMetadata(...)
)
Reflex Layer API
The Reflex Layer provides preprocessing, caching, and PII filtering.
POST /preprocess
Preprocess a request before routing to orchestrator.
Request
POST /v1/preprocess HTTP/1.1
Host: reflex.octollm.svc.cluster.local
Content-Type: application/json
{
"goal": "Find user John Smith's email address john.smith@example.com",
"context": {"user_id": "12345"}
}
Response
HTTP/1.1 200 OK
Content-Type: application/json
{
"preprocessed_goal": "Find user [REDACTED_NAME]'s email address [REDACTED_EMAIL]",
"preprocessed_context": {"user_id": "[REDACTED]"},
"pii_detected": true,
"pii_types": ["name", "email", "user_id"],
"cached": false,
"processing_time_ms": 15
}
GET /cache/
Retrieve cached result.
Request
GET /v1/cache/scan_example.com_ports HTTP/1.1
Host: reflex.octollm.svc.cluster.local
Response (200 OK)
HTTP/1.1 200 OK
Content-Type: application/json
{
"cache_key": "scan_example.com_ports",
"cached_result": {
"open_ports": [22, 80, 443],
"services": {...}
},
"cached_at": "2025-11-10T10:25:00Z",
"expires_at": "2025-11-10T10:30:00Z",
"hit": true
}
Response (404 Not Found)
HTTP/1.1 404 Not Found
Content-Type: application/json
{
"cache_key": "scan_example.com_ports",
"hit": false
}
POST /filter/pii
Filter PII from text.
Request
POST /v1/filter/pii HTTP/1.1
Host: reflex.octollm.svc.cluster.local
Content-Type: application/json
{
"text": "Contact John Smith at john.smith@example.com or call 555-123-4567"
}
Response
HTTP/1.1 200 OK
Content-Type: application/json
{
"filtered_text": "Contact [REDACTED_NAME] at [REDACTED_EMAIL] or call [REDACTED_PHONE]",
"pii_detected": true,
"pii_types": ["name", "email", "phone"],
"redactions": [
{"type": "name", "original": "John Smith", "position": [8, 18]},
{"type": "email", "original": "john.smith@example.com", "position": [22, 44]},
{"type": "phone", "original": "555-123-4567", "position": [53, 65]}
]
}
Authentication
OctoLLM uses capability-based authentication with JWT tokens.
Capability Tokens
Capability tokens are JWT tokens that encode:
- Granted capabilities
- Expiration time
- Issuer information
- Scope restrictions
Token Structure
{
"header": {
"alg": "RS256",
"typ": "JWT"
},
"payload": {
"iss": "octollm-orchestrator",
"sub": "coder-001",
"exp": 1731240000,
"iat": 1731236400,
"capabilities": [
"code_generation",
"memory_read:coder_memory",
"memory_write:action_log"
],
"scope": {
"entity_types": ["tool", "library"],
"max_tokens": 10000
}
},
"signature": "..."
}
Token Generation
import jwt
from datetime import datetime, timedelta
from typing import List, Dict, Any
def generate_capability_token(
arm_id: str,
capabilities: List[str],
scope: Dict[str, Any],
expires_in_hours: int = 24,
private_key: str = None
) -> str:
"""Generate a capability token for an arm."""
now = datetime.utcnow()
expires = now + timedelta(hours=expires_in_hours)
payload = {
"iss": "octollm-orchestrator",
"sub": arm_id,
"iat": int(now.timestamp()),
"exp": int(expires.timestamp()),
"capabilities": capabilities,
"scope": scope
}
token = jwt.encode(
payload,
private_key,
algorithm="RS256"
)
return token
# Example
token = generate_capability_token(
arm_id="coder-001",
capabilities=[
"code_generation",
"memory_read:coder_memory",
"memory_write:action_log"
],
scope={
"entity_types": ["tool", "library"],
"max_tokens": 10000
},
expires_in_hours=24,
private_key=PRIVATE_KEY
)
Token Verification
def verify_capability_token(
token: str,
required_capability: str,
public_key: str
) -> bool:
"""Verify capability token and check for required capability."""
try:
# Decode and verify token
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
issuer="octollm-orchestrator"
)
# Check expiration
if payload["exp"] < datetime.utcnow().timestamp():
return False
# Check capability
capabilities = payload.get("capabilities", [])
if required_capability not in capabilities:
return False
return True
except jwt.InvalidTokenError:
return False
Error Handling
Error Categories
| Category | Description | HTTP Status | Retryable |
|---|---|---|---|
validation | Invalid input | 400 | No |
authentication | Auth failure | 401 | No |
authorization | Permission denied | 403 | No |
not_found | Resource not found | 404 | No |
rate_limit | Rate limit exceeded | 429 | Yes |
timeout | Operation timeout | 504 | Yes |
internal | Internal server error | 500 | Yes |
external | External service error | 502 | Yes |
Error Codes
Common error codes:
INVALID_TASK_ID: Task ID format invalidINVALID_BUDGET: Budget parameters invalidINVALID_CAPABILITY_TOKEN: Authentication failureINSUFFICIENT_CAPABILITIES: Missing required capabilitiesTASK_NOT_FOUND: Task does not existRATE_LIMIT_EXCEEDED: Rate limit hitEXECUTION_TIMEOUT: Task exceeded time budgetMEMORY_LIMIT_EXCEEDED: Memory allocation failedINTERNAL_ERROR: Unexpected internal errorEXTERNAL_SERVICE_ERROR: External dependency failed
Retry Policies
import asyncio
from typing import Callable, TypeVar, Any
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[..., T],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> T:
"""Retry function with exponential backoff."""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
last_exception = e
# Check if retryable
if hasattr(e, 'retryable') and not e.retryable:
raise
if attempt == max_retries:
raise
# Calculate delay
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# Add jitter
if jitter:
import random
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
raise last_exception
Versioning
API Versioning
OctoLLM uses URL-based API versioning:
/v1/task # Version 1
/v2/task # Version 2 (future)
Backward Compatibility
Changes that are backward compatible:
- Adding new optional fields
- Adding new endpoints
- Adding new error codes
- Expanding enum values
Changes that break compatibility (require version bump):
- Removing or renaming fields
- Changing field types
- Removing endpoints
- Changing required fields
Deprecation Process
- Announce: Deprecation announced 6 months in advance
- Warning: Deprecated endpoints return
Deprecationheader - Support: Old version supported for 12 months
- Removal: Old version removed after support period
HTTP/1.1 200 OK
Deprecation: true
Sunset: Wed, 10 May 2026 10:00:00 GMT
Link: </v2/task>; rel="successor-version"
Rate Limiting
Global Rate Limits
| Endpoint | Limit | Window |
|---|---|---|
| POST /task | 100 requests | 1 minute |
| GET /task/{id} | 1000 requests | 1 minute |
| GET /health | Unlimited | - |
| GET /metrics | 60 requests | 1 minute |
Per-Arm Rate Limits
Each arm has individual rate limits based on max_concurrent_tasks:
- Planner: 15 concurrent
- Coder: 20 concurrent
- Executor: 10 concurrent
- Retriever: 25 concurrent
- Judge: 30 concurrent
Rate Limit Headers
HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 87
X-RateLimit-Reset: 1731236460
Rate limit exceeded:
HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1731236460
{
"error_code": "RATE_LIMIT_EXCEEDED",
"category": "rate_limit",
"message": "Rate limit of 100 requests per minute exceeded",
"retryable": true,
"retry_after_seconds": 60
}
OpenAPI Specification
Complete OpenAPI Schema
openapi: 3.0.3
info:
title: OctoLLM API
description: Distributed AI architecture for offensive security
version: 1.0.0
contact:
name: OctoLLM Team
url: https://octollm.io
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0
servers:
- url: https://api.octollm.io/v1
description: Production
- url: https://staging.octollm.io/v1
description: Staging
- url: http://localhost:8000/v1
description: Development
paths:
/task:
post:
summary: Create task
operationId: createTask
tags: [Tasks]
security:
- CapabilityToken: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/TaskContract'
responses:
'202':
description: Task accepted
content:
application/json:
schema:
type: object
properties:
task_id: {type: string}
status: {type: string}
created_at: {type: string, format: date-time}
'400':
description: Invalid input
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
/task/{task_id}:
get:
summary: Get task status
operationId: getTask
tags: [Tasks]
security:
- CapabilityToken: []
parameters:
- name: task_id
in: path
required: true
schema:
type: string
responses:
'200':
description: Task details
content:
application/json:
schema:
$ref: '#/components/schemas/TaskStatus'
'404':
description: Task not found
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
/health:
get:
summary: Health check
operationId: healthCheck
tags: [System]
responses:
'200':
description: System healthy
content:
application/json:
schema:
type: object
properties:
status: {type: string}
version: {type: string}
checks: {type: object}
components:
schemas:
TaskContract:
type: object
required: [task_id, goal]
properties:
task_id: {type: string}
goal: {type: string}
constraints: {type: array, items: {type: string}}
priority: {type: string, enum: [low, medium, high, critical]}
ErrorResponse:
type: object
required: [error_code, category, message]
properties:
error_code: {type: string}
category: {type: string}
message: {type: string}
details: {type: object}
retryable: {type: boolean}
securitySchemes:
CapabilityToken:
type: http
scheme: bearer
bearerFormat: JWT
Generated Client Libraries
Generate client libraries using OpenAPI Generator:
# Python client
openapi-generator-cli generate \
-i openapi.yaml \
-g python \
-o clients/python \
--additional-properties=packageName=octollm_client
# TypeScript client
openapi-generator-cli generate \
-i openapi.yaml \
-g typescript-fetch \
-o clients/typescript
# Go client
openapi-generator-cli generate \
-i openapi.yaml \
-g go \
-o clients/go
Document Maintainer: OctoLLM Core Team Last Review: 2025-11-10 Next Review: 2025-12-10