orka.orchestrator package
Orchestrator Package
The orchestrator package contains the modular components that make up OrKa’s core orchestration engine. The orchestrator was designed with a modular architecture for specialized components while maintaining 100% backward compatibility.
Architecture Overview
The orchestrator uses a multiple inheritance composition pattern to combine specialized functionality from focused components:
Core Components
OrchestratorBase
Handles initialization, configuration loading, and basic setup
AgentFactory
Manages agent registry, instantiation, and the AGENT_TYPES mapping
ExecutionEngine
Contains the main execution loop, agent coordination, and workflow management
PromptRenderer
Handles Jinja2 template rendering and prompt formatting
ErrorHandler
Provides comprehensive error tracking, retry logic, and failure reporting
MetricsCollector
Collects LLM metrics, runtime information, and generates performance reports
Composition Strategy
The main Orchestrator
class inherits from all components using multiple
inheritance, ensuring that:
Method Resolution Order is preserved for consistent behavior
All functionality remains accessible through the same interface
Zero breaking changes are introduced for existing code
Internal modularity improves maintainability and testing
Usage Example
from orka.orchestrator import Orchestrator
# Initialize with YAML configuration
orchestrator = Orchestrator("workflow.yml")
# Run the workflow (uses all components seamlessly)
result = await orchestrator.run("input data")
Module Components
Available Modules:
base
- Core initialization and configurationagent_factory
- Agent registry and instantiationexecution_engine
- Main execution loop and coordinationprompt_rendering
- Template processing and formattingerror_handling
- Error tracking and retry logicmetrics
- Performance metrics and reporting
Benefits of Modular Design
- Maintainability
Each component has a single, focused responsibility
- Testability
Components can be tested in isolation
- Extensibility
New functionality can be added without affecting other components
- Code Organization
Related functionality is grouped together logically
- Backward Compatibility
Existing code continues to work without modification
- class orka.orchestrator.AgentFactory[source]
Bases:
object
Factory class for creating and initializing agents based on configuration.
- class orka.orchestrator.ErrorHandler[source]
Bases:
object
Handles error tracking, reporting, and recovery mechanisms.
- class orka.orchestrator.ExecutionEngine[source]
Bases:
object
🎼 The conductor of your AI orchestra - coordinates complex multi-agent workflows.
What makes execution intelligent: - Perfect Timing: Orchestrates agent execution with precise coordination - Context Flow: Maintains rich context across all workflow steps - Fault Tolerance: Graceful handling of failures with automatic recovery - Performance Intelligence: Real-time optimization and resource management - Scalable Architecture: From single-threaded to distributed execution
Execution Patterns:
1. Sequential Processing (most common): ```yaml orchestrator:
strategy: sequential agents: [classifier, router, processor, responder]
# Each agent receives full context from all previous steps ```
2. Parallel Processing (for speed): ```yaml orchestrator:
strategy: parallel agents: [validator_1, validator_2, validator_3]
# All agents run simultaneously, results aggregated ```
3. Decision Tree (for complex logic): ```yaml orchestrator:
strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator]
# Dynamic routing based on classification results ```
Advanced Features:
🔄 Intelligent Retry Logic: - Exponential backoff for transient failures - Context preservation across retry attempts - Configurable retry policies per agent type - Partial success handling for complex workflows
📊 Real-time Monitoring: - Agent execution timing and performance metrics - LLM token usage and cost tracking - Memory usage and optimization insights - Error pattern detection and alerting
⚡ Resource Management: - Connection pooling for external services - Agent lifecycle management and cleanup - Memory optimization for long-running workflows - Graceful shutdown and resource release
🎯 Production Features: - Distributed execution across multiple workers - Load balancing and auto-scaling capabilities - Health checks and service discovery - Comprehensive logging and audit trails
Perfect for: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications
- class orka.orchestrator.MetricsCollector[source]
Bases:
object
Handles metrics collection, aggregation, and reporting.
- class orka.orchestrator.Orchestrator(config_path)[source]
Bases:
OrchestratorBase
,AgentFactory
,PromptRenderer
,ErrorHandler
,MetricsCollector
,ExecutionEngine
The Orchestrator is the core engine that loads a YAML configuration, instantiates agents and nodes, and manages the execution of the reasoning workflow. It supports parallelism, dynamic routing, and full trace logging.
This class now inherits from multiple mixins to provide all functionality while maintaining the same public interface.
- class orka.orchestrator.OrchestratorBase(config_path)[source]
Bases:
object
Base orchestrator class that handles initialization and configuration.
This class provides the foundational infrastructure for the OrKa orchestration framework, including configuration loading, memory backend setup, and core state management. It is designed to be composed with other specialized classes through multiple inheritance.
The class automatically configures the appropriate backend based on environment variables and provides comprehensive error tracking capabilities for monitoring and debugging orchestration runs.
- loader
Configuration file loader and validator
- Type:
- orchestrator_cfg
Orchestrator-specific configuration settings
- Type:
dict
- agent_cfgs
List of agent configuration objects
- Type:
list
- memory
Memory backend instance (Redis or Kafka)
- fork_manager
Fork group manager for parallel execution
- queue
Current agent execution queue
- Type:
list
- run_id
Unique identifier for this orchestration run
- Type:
str
- step_index
Current step counter for traceability
- Type:
int
- error_telemetry
Comprehensive error tracking and metrics
- Type:
dict
- __init__(config_path)[source]
Initialize the Orchestrator with a YAML config file.
Sets up all core infrastructure including configuration loading, memory backend selection, fork management, and error tracking systems.
- Parameters:
config_path (str) – Path to the YAML configuration file
- Environment Variables:
ORKA_MEMORY_BACKEND: Memory backend type (‘redis’ or ‘kafka’, default: ‘redis’) ORKA_DEBUG_KEEP_PREVIOUS_OUTPUTS: Keep previous outputs for debugging (‘true’/’false’) KAFKA_BOOTSTRAP_SERVERS: Kafka broker addresses (for Kafka backend) KAFKA_TOPIC_PREFIX: Topic prefix for Kafka topics (default: ‘orka-memory’) REDIS_URL: Redis connection URL (default: ‘redis://localhost:6380/0’)
- class orka.orchestrator.PromptRenderer[source]
Bases:
object
Handles prompt rendering and template processing using Jinja2.
This class provides methods for rendering dynamic prompts with context data, processing agent responses, and managing template-related operations within the orchestrator workflow.
The renderer supports complex template structures and provides robust error handling to ensure that template failures don’t interrupt workflow execution.
- static normalize_bool(value)[source]
Normalize a value to boolean with support for complex agent responses.
This utility method handles the conversion of various data types to boolean values, with special support for complex agent response structures that may contain nested results.
- Parameters:
value – The value to normalize (bool, str, dict, or other)
- Returns:
The normalized boolean value
- Return type:
bool
- Supported Input Types:
bool: Returned as-is
str: ‘true’, ‘yes’ (case-insensitive) → True, others → False
dict: Extracts from ‘result’ or ‘response’ keys with recursive processing
other: Defaults to False
Example
# Simple cases assert PromptRenderer.normalize_bool(True) == True assert PromptRenderer.normalize_bool("yes") == True assert PromptRenderer.normalize_bool("false") == False # Complex agent response response = {"result": {"response": "true"}} assert PromptRenderer.normalize_bool(response) == True
- render_prompt(template_str, payload)[source]
Render a Jinja2 template string with the given payload.
This method is the core template rendering functionality, taking a template string and context payload to produce a rendered prompt for agent execution.
- Parameters:
template_str (str) – The Jinja2 template string to render
payload (dict) – Context data for template variable substitution
- Returns:
The rendered template with variables substituted
- Return type:
str
- Raises:
ValueError – If template_str is not a string
jinja2.TemplateError – If template syntax is invalid
Example
template = "Hello {{ name }}, you have {{ count }} messages" context = {"name": "Alice", "count": 5} result = renderer.render_prompt(template, context) # Returns: "Hello Alice, you have 5 messages"
Submodules
- orka.orchestrator.agent_factory module
- orka.orchestrator.base module
- Base Orchestrator Module
OrchestratorBase
OrchestratorBase.loader
OrchestratorBase.orchestrator_cfg
OrchestratorBase.agent_cfgs
OrchestratorBase.memory
OrchestratorBase.fork_manager
OrchestratorBase.queue
OrchestratorBase.run_id
OrchestratorBase.step_index
OrchestratorBase.error_telemetry
OrchestratorBase.__init__()
OrchestratorBase.enqueue_fork()
- orka.orchestrator.error_handling module
- orka.orchestrator.execution_engine module
- orka.orchestrator.metrics module
- orka.orchestrator.prompt_rendering module