orka.orchestrator package

Orchestrator Package

The orchestrator package contains the modular components that make up OrKa’s core orchestration engine. The orchestrator was designed with a modular architecture for specialized components while maintaining 100% backward compatibility.

Architecture Overview

The orchestrator uses a multiple inheritance composition pattern to combine specialized functionality from focused components:

Core Components

OrchestratorBase

Handles initialization, configuration loading, and basic setup

AgentFactory

Manages agent registry, instantiation, and the AGENT_TYPES mapping

ExecutionEngine

Contains the main execution loop, agent coordination, and workflow management

PromptRenderer

Handles Jinja2 template rendering and prompt formatting

ErrorHandler

Provides comprehensive error tracking, retry logic, and failure reporting

MetricsCollector

Collects LLM metrics, runtime information, and generates performance reports

Composition Strategy

The main Orchestrator class inherits from all components using multiple inheritance, ensuring that:

  1. Method Resolution Order is preserved for consistent behavior

  2. All functionality remains accessible through the same interface

  3. Zero breaking changes are introduced for existing code

  4. Internal modularity improves maintainability and testing

Usage Example

from orka.orchestrator import Orchestrator

# Initialize with YAML configuration
orchestrator = Orchestrator("workflow.yml")

# Run the workflow (uses all components seamlessly)
result = await orchestrator.run("input data")

Module Components

Available Modules:

  • base - Core initialization and configuration

  • agent_factory - Agent registry and instantiation

  • execution_engine - Main execution loop and coordination

  • prompt_rendering - Template processing and formatting

  • error_handling - Error tracking and retry logic

  • metrics - Performance metrics and reporting

Benefits of Modular Design

Maintainability

Each component has a single, focused responsibility

Testability

Components can be tested in isolation

Extensibility

New functionality can be added without affecting other components

Code Organization

Related functionality is grouped together logically

Backward Compatibility

Existing code continues to work without modification

class orka.orchestrator.AgentFactory[source]

Bases: object

Factory class for creating and initializing agents based on configuration.

class orka.orchestrator.ErrorHandler[source]

Bases: object

Handles error tracking, reporting, and recovery mechanisms.

class orka.orchestrator.ExecutionEngine[source]

Bases: object

🎼 The conductor of your AI orchestra - coordinates complex multi-agent workflows.

What makes execution intelligent: - Perfect Timing: Orchestrates agent execution with precise coordination - Context Flow: Maintains rich context across all workflow steps - Fault Tolerance: Graceful handling of failures with automatic recovery - Performance Intelligence: Real-time optimization and resource management - Scalable Architecture: From single-threaded to distributed execution

Execution Patterns:

1. Sequential Processing (most common): ```yaml orchestrator:

strategy: sequential agents: [classifier, router, processor, responder]

# Each agent receives full context from all previous steps ```

2. Parallel Processing (for speed): ```yaml orchestrator:

strategy: parallel agents: [validator_1, validator_2, validator_3]

# All agents run simultaneously, results aggregated ```

3. Decision Tree (for complex logic): ```yaml orchestrator:

strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator]

# Dynamic routing based on classification results ```

Advanced Features:

🔄 Intelligent Retry Logic: - Exponential backoff for transient failures - Context preservation across retry attempts - Configurable retry policies per agent type - Partial success handling for complex workflows

📊 Real-time Monitoring: - Agent execution timing and performance metrics - LLM token usage and cost tracking - Memory usage and optimization insights - Error pattern detection and alerting

⚡ Resource Management: - Connection pooling for external services - Agent lifecycle management and cleanup - Memory optimization for long-running workflows - Graceful shutdown and resource release

🎯 Production Features: - Distributed execution across multiple workers - Load balancing and auto-scaling capabilities - Health checks and service discovery - Comprehensive logging and audit trails

Perfect for: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications

enqueue_fork(agent_ids, fork_group_id)[source]

Add agents to the fork queue for processing.

async run(input_data)[source]

Main entry point for orchestrator execution. Creates empty logs list and delegates to the comprehensive error handling method.

async run_parallel_agents(agent_ids, fork_group_id, input_data, previous_outputs)[source]

Run multiple branches in parallel, with agents within each branch running sequentially. Returns a list of log entries for each forked agent.

class orka.orchestrator.MetricsCollector[source]

Bases: object

Handles metrics collection, aggregation, and reporting.

static build_previous_outputs(logs)[source]

Build a dictionary of previous agent outputs from the execution logs. Used to provide context to downstream agents.

class orka.orchestrator.Orchestrator(config_path)[source]

Bases: OrchestratorBase, AgentFactory, PromptRenderer, ErrorHandler, MetricsCollector, ExecutionEngine

The Orchestrator is the core engine that loads a YAML configuration, instantiates agents and nodes, and manages the execution of the reasoning workflow. It supports parallelism, dynamic routing, and full trace logging.

This class now inherits from multiple mixins to provide all functionality while maintaining the same public interface.

__init__(config_path)[source]

Initialize the Orchestrator with a YAML config file. Loads orchestrator and agent configs, sets up memory and fork management.

class orka.orchestrator.OrchestratorBase(config_path)[source]

Bases: object

Base orchestrator class that handles initialization and configuration.

This class provides the foundational infrastructure for the OrKa orchestration framework, including configuration loading, memory backend setup, and core state management. It is designed to be composed with other specialized classes through multiple inheritance.

The class automatically configures the appropriate backend based on environment variables and provides comprehensive error tracking capabilities for monitoring and debugging orchestration runs.

loader

Configuration file loader and validator

Type:

YAMLLoader

orchestrator_cfg

Orchestrator-specific configuration settings

Type:

dict

agent_cfgs

List of agent configuration objects

Type:

list

memory

Memory backend instance (Redis or Kafka)

fork_manager

Fork group manager for parallel execution

queue

Current agent execution queue

Type:

list

run_id

Unique identifier for this orchestration run

Type:

str

step_index

Current step counter for traceability

Type:

int

error_telemetry

Comprehensive error tracking and metrics

Type:

dict

__init__(config_path)[source]

Initialize the Orchestrator with a YAML config file.

Sets up all core infrastructure including configuration loading, memory backend selection, fork management, and error tracking systems.

Parameters:

config_path (str) – Path to the YAML configuration file

Environment Variables:

ORKA_MEMORY_BACKEND: Memory backend type (‘redis’ or ‘kafka’, default: ‘redis’) ORKA_DEBUG_KEEP_PREVIOUS_OUTPUTS: Keep previous outputs for debugging (‘true’/’false’) KAFKA_BOOTSTRAP_SERVERS: Kafka broker addresses (for Kafka backend) KAFKA_TOPIC_PREFIX: Topic prefix for Kafka topics (default: ‘orka-memory’) REDIS_URL: Redis connection URL (default: ‘redis://localhost:6380/0’)

enqueue_fork(agent_ids, fork_group_id)[source]

Enqueue a fork group for parallel execution.

class orka.orchestrator.PromptRenderer[source]

Bases: object

Handles prompt rendering and template processing using Jinja2.

This class provides methods for rendering dynamic prompts with context data, processing agent responses, and managing template-related operations within the orchestrator workflow.

The renderer supports complex template structures and provides robust error handling to ensure that template failures don’t interrupt workflow execution.

static normalize_bool(value)[source]

Normalize a value to boolean with support for complex agent responses.

This utility method handles the conversion of various data types to boolean values, with special support for complex agent response structures that may contain nested results.

Parameters:

value – The value to normalize (bool, str, dict, or other)

Returns:

The normalized boolean value

Return type:

bool

Supported Input Types:
  • bool: Returned as-is

  • str: ‘true’, ‘yes’ (case-insensitive) → True, others → False

  • dict: Extracts from ‘result’ or ‘response’ keys with recursive processing

  • other: Defaults to False

Example

# Simple cases
assert PromptRenderer.normalize_bool(True) == True
assert PromptRenderer.normalize_bool("yes") == True
assert PromptRenderer.normalize_bool("false") == False

# Complex agent response
response = {"result": {"response": "true"}}
assert PromptRenderer.normalize_bool(response) == True
render_prompt(template_str, payload)[source]

Render a Jinja2 template string with the given payload.

This method is the core template rendering functionality, taking a template string and context payload to produce a rendered prompt for agent execution.

Parameters:
  • template_str (str) – The Jinja2 template string to render

  • payload (dict) – Context data for template variable substitution

Returns:

The rendered template with variables substituted

Return type:

str

Raises:
  • ValueError – If template_str is not a string

  • jinja2.TemplateError – If template syntax is invalid

Example

template = "Hello {{ name }}, you have {{ count }} messages"
context = {"name": "Alice", "count": 5}
result = renderer.render_prompt(template, context)
# Returns: "Hello Alice, you have 5 messages"

Submodules