# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning
"""
Execution Engine
===============
The ExecutionEngine is the core component responsible for coordinating and executing
multi-agent workflows within the OrKa orchestration framework.
Core Responsibilities
--------------------
**Agent Coordination:**
- Sequential execution of agents based on configuration
- Context propagation between agents with previous outputs
- Dynamic queue management for workflow control
- Error handling and retry logic with exponential backoff
**Execution Patterns:**
- **Sequential Processing**: Default execution pattern where agents run one after another
- **Parallel Execution**: Fork/join patterns for concurrent agent execution
- **Conditional Branching**: Router nodes for dynamic workflow paths
- **Memory Operations**: Integration with memory nodes for data persistence
**Error Management:**
- Comprehensive error tracking and telemetry collection
- Automatic retry with configurable maximum attempts
- Graceful degradation and fallback strategies
- Detailed error reporting and recovery actions
Architecture Details
-------------------
**Execution Flow:**
1. **Queue Processing**: Agents are processed from the configured queue
2. **Context Building**: Input data and previous outputs are combined into payload
3. **Agent Execution**: Individual agents are executed with full context
4. **Result Processing**: Outputs are captured and added to execution history
5. **Queue Management**: Next agents are determined based on results
**Context Management:**
- Input data is preserved throughout the workflow
- Previous outputs from all agents are available to subsequent agents
- Execution metadata (timestamps, step indices) is tracked
- Error context is maintained for debugging and recovery
**Concurrency Handling:**
- Thread pool executor for parallel agent execution
- Fork group management for coordinated parallel operations
- Async/await patterns for non-blocking operations
- Resource pooling for efficient memory usage
Implementation Features
----------------------
**Agent Execution:**
- Support for both sync and async agent implementations
- Automatic detection of agent execution patterns
- Timeout handling with configurable limits
- Resource cleanup after agent completion
**Memory Integration:**
- Automatic logging of agent execution events
- Memory backend integration for persistent storage
- Context preservation across workflow steps
- Trace ID propagation for debugging
**Error Handling:**
- Exception capture and structured error reporting
- Retry logic with exponential backoff
- Error telemetry collection for monitoring
- Graceful failure recovery
**Performance Optimization:**
- Efficient context building and propagation
- Minimal memory overhead for large workflows
- Optimized queue processing algorithms
- Resource pooling for external connections
Execution Patterns
-----------------
**Sequential Execution:**
```yaml
orchestrator:
strategy: sequential
agents: [classifier, processor, responder]
```
**Parallel Execution:**
```yaml
orchestrator:
strategy: parallel
fork_groups:
- agents: [validator_1, validator_2, validator_3]
join_agent: aggregator
```
**Conditional Branching:**
```yaml
agents:
- id: router
type: router
conditions:
- condition: "{{ classification == 'urgent' }}"
next_agents: [urgent_handler]
- condition: "{{ classification == 'normal' }}"
next_agents: [normal_handler]
```
Integration Points
-----------------
**Memory System:**
- Automatic event logging for all agent executions
- Context preservation in memory backend
- Trace ID propagation for request tracking
- Performance metrics collection
**Error Handling:**
- Structured error reporting with context
- Retry mechanisms with configurable policies
- Error telemetry for monitoring and alerting
- Recovery action recommendations
**Monitoring:**
- Real-time execution metrics
- Agent performance tracking
- Resource usage monitoring
- Error rate and pattern analysis
"""
import asyncio
import inspect
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import UTC, datetime
from time import time
logger = logging.getLogger(__name__)
[docs]
class ExecutionEngine:
"""
🎼 **The conductor of your AI orchestra** - coordinates complex multi-agent workflows.
**What makes execution intelligent:**
- **Perfect Timing**: Orchestrates agent execution with precise coordination
- **Context Flow**: Maintains rich context across all workflow steps
- **Fault Tolerance**: Graceful handling of failures with automatic recovery
- **Performance Intelligence**: Real-time optimization and resource management
- **Scalable Architecture**: From single-threaded to distributed execution
**Execution Patterns:**
**1. Sequential Processing** (most common):
```yaml
orchestrator:
strategy: sequential
agents: [classifier, router, processor, responder]
# Each agent receives full context from all previous steps
```
**2. Parallel Processing** (for speed):
```yaml
orchestrator:
strategy: parallel
agents: [validator_1, validator_2, validator_3]
# All agents run simultaneously, results aggregated
```
**3. Decision Tree** (for complex logic):
```yaml
orchestrator:
strategy: decision-tree
agents: [classifier, router, [path_a, path_b], aggregator]
# Dynamic routing based on classification results
```
**Advanced Features:**
**🔄 Intelligent Retry Logic:**
- Exponential backoff for transient failures
- Context preservation across retry attempts
- Configurable retry policies per agent type
- Partial success handling for complex workflows
**📊 Real-time Monitoring:**
- Agent execution timing and performance metrics
- LLM token usage and cost tracking
- Memory usage and optimization insights
- Error pattern detection and alerting
**⚡ Resource Management:**
- Connection pooling for external services
- Agent lifecycle management and cleanup
- Memory optimization for long-running workflows
- Graceful shutdown and resource release
**🎯 Production Features:**
- Distributed execution across multiple workers
- Load balancing and auto-scaling capabilities
- Health checks and service discovery
- Comprehensive logging and audit trails
**Perfect for:**
- Multi-step AI reasoning workflows
- High-throughput content processing pipelines
- Real-time decision systems with complex branching
- Fault-tolerant distributed AI applications
"""
[docs]
async def run(self, input_data):
"""
Main entry point for orchestrator execution.
Creates empty logs list and delegates to the comprehensive error handling method.
"""
logs = []
try:
return await self._run_with_comprehensive_error_handling(input_data, logs)
except Exception as e:
self._record_error(
"orchestrator_run",
"orchestrator",
f"Fatal error during run: {e}",
e,
recovery_action="abort",
)
self.error_telemetry["execution_status"] = "failed"
self.error_telemetry["critical_failures"].append(
{"error": str(e), "timestamp": datetime.now(UTC).isoformat()},
)
self._save_error_report(logs, e)
raise
async def _run_with_comprehensive_error_handling(self, input_data, logs):
"""
Main execution loop with comprehensive error handling wrapper.
"""
queue = self.orchestrator_cfg["agents"][:]
while queue:
agent_id = queue.pop(0)
try:
agent = self.agents[agent_id]
agent_type = agent.type
self.step_index += 1
# Build payload for the agent: current input and all previous outputs
payload = {
"input": input_data,
"previous_outputs": self.build_previous_outputs(logs),
}
freezed_payload = json.dumps(
payload,
) # Freeze the payload as a string for logging/debug
print(
f"{datetime.now()} > [ORKA] {self.step_index} > Running agent '{agent_id}' of type '{agent_type}', payload: {freezed_payload}",
)
log_entry = {
"agent_id": agent_id,
"event_type": agent.__class__.__name__,
"timestamp": datetime.now(UTC).isoformat(),
}
start_time = time()
# Attempt to run agent with retry logic
max_retries = 3
retry_count = 0
agent_result = None
while retry_count <= max_retries:
try:
agent_result = await self._execute_single_agent(
agent_id,
agent,
agent_type,
payload,
input_data,
queue,
logs,
)
# If we had retries, record partial success
if retry_count > 0:
self._record_partial_success(agent_id, retry_count)
# Handle waiting status - re-queue the agent
if isinstance(agent_result, dict) and agent_result.get("status") in [
"waiting",
"timeout",
]:
if agent_result.get("status") == "waiting":
queue.append(agent_id) # Re-queue for later
# For these statuses, we should continue to the next agent in queue
continue
break # Success - exit retry loop
except Exception as agent_error:
retry_count += 1
self._record_retry(agent_id)
self._record_error(
"agent_execution",
agent_id,
f"Attempt {retry_count} failed: {agent_error}",
agent_error,
recovery_action="retry" if retry_count <= max_retries else "skip",
)
if retry_count <= max_retries:
print(
f"🔄 [ORKA-RETRY] Agent {agent_id} failed, retrying ({retry_count}/{max_retries})",
)
await asyncio.sleep(1) # Brief delay before retry
else:
print(
f"❌ [ORKA-SKIP] Agent {agent_id} failed {max_retries} times, skipping",
)
# Create a failure result
agent_result = {
"status": "failed",
"error": str(agent_error),
"retries_attempted": retry_count - 1,
}
break
# Process the result (success or failure)
if agent_result is not None:
# Log the result and timing for this step
duration = round(time() - start_time, 4)
payload_out = {"input": input_data, "result": agent_result}
payload_out["previous_outputs"] = payload["previous_outputs"]
log_entry["duration"] = duration
# Extract LLM metrics if present (even from failed agents)
try:
llm_metrics = self._extract_llm_metrics(agent, agent_result)
if llm_metrics:
log_entry["llm_metrics"] = llm_metrics
except Exception as metrics_error:
self._record_error(
"metrics_extraction",
agent_id,
f"Failed to extract metrics: {metrics_error}",
metrics_error,
recovery_action="continue",
)
log_entry["payload"] = payload_out
logs.append(log_entry)
# Save to memory even if agent failed
try:
if agent_type != "forknode":
self.memory.log(
agent_id,
agent.__class__.__name__,
payload_out,
step=self.step_index,
run_id=self.run_id,
)
except Exception as memory_error:
self._record_error(
"memory_logging",
agent_id,
f"Failed to log to memory: {memory_error}",
memory_error,
recovery_action="continue",
)
print(
f"{datetime.now()} > [ORKA] {self.step_index} > Agent '{agent_id}' returned: {agent_result}",
)
except Exception as step_error:
# Catch-all for any other step-level errors
self._record_error(
"step_execution",
agent_id,
f"Step execution failed: {step_error}",
step_error,
recovery_action="continue",
)
print(
f"⚠️ [ORKA-STEP-ERROR] Step {self.step_index} failed for {agent_id}: {step_error}",
)
continue # Continue to next agent
# Generate meta report with aggregated metrics
meta_report = self._generate_meta_report(logs)
# Save logs to file at the end of the run
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_dir = os.getenv("ORKA_LOG_DIR", "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, f"orka_trace_{timestamp}.json")
# Store meta report in memory for saving
meta_report_entry = {
"agent_id": "meta_report",
"event_type": "MetaReport",
"timestamp": datetime.now(UTC).isoformat(),
"payload": {
"meta_report": meta_report,
"run_id": self.run_id,
"timestamp": timestamp,
},
}
self.memory.memory.append(meta_report_entry)
# Save to memory backend
self.memory.save_to_file(log_path)
# Cleanup memory backend resources to prevent hanging
try:
self.memory.close()
except Exception as e:
print(f"Warning: Failed to cleanly close memory backend: {e!s}")
# Print meta report summary
print("\n" + "=" * 50)
print("ORKA EXECUTION META REPORT")
print("=" * 50)
print(f"Total Execution Time: {meta_report['total_duration']:.3f}s")
print(f"Total LLM Calls: {meta_report['total_llm_calls']}")
print(f"Total Tokens: {meta_report['total_tokens']}")
print(f"Total Cost: ${meta_report['total_cost_usd']:.6f}")
print(f"Average Latency: {meta_report['avg_latency_ms']:.2f}ms")
print("=" * 50)
return logs
async def _execute_single_agent(
self,
agent_id,
agent,
agent_type,
payload,
input_data,
queue,
logs,
):
"""
Execute a single agent with proper error handling and status tracking.
Returns the result of the agent execution.
"""
# Handle RouterNode: dynamic routing based on previous outputs
if agent_type == "routernode":
decision_key = agent.params.get("decision_key")
routing_map = agent.params.get("routing_map")
if decision_key is None:
raise ValueError("Router agent must have 'decision_key' in params.")
raw_decision_value = payload["previous_outputs"].get(decision_key)
normalized = self.normalize_bool(raw_decision_value)
payload["previous_outputs"][decision_key] = "true" if normalized else "false"
result = agent.run(payload)
next_agents = result if isinstance(result, list) else [result]
# For router nodes, we need to update the queue
queue.clear()
queue.extend(next_agents)
payload_out = {
"input": input_data,
"decision_key": decision_key,
"decision_value": "true" if normalized else "false",
"raw_decision_value": str(raw_decision_value),
"routing_map": str(routing_map),
"next_agents": str(next_agents),
}
self._add_prompt_to_payload(agent, payload_out, payload)
return payload_out
# Handle ForkNode: run multiple agents in parallel branches
elif agent_type == "forknode":
result = await agent.run(self, payload)
fork_targets = agent.config.get("targets", [])
# Flatten branch steps for parallel execution
flat_targets = []
for branch in fork_targets:
if isinstance(branch, list):
flat_targets.extend(branch)
else:
flat_targets.append(branch)
fork_targets = flat_targets
if not fork_targets:
raise ValueError(
f"ForkNode '{agent_id}' requires non-empty 'targets' list.",
)
fork_group_id = self.fork_manager.generate_group_id(agent_id)
self.fork_manager.create_group(fork_group_id, fork_targets)
payload["fork_group_id"] = fork_group_id
mode = agent.config.get(
"mode",
"sequential",
) # Default to sequential if not set
payload_out = {
"input": input_data,
"fork_group": fork_group_id,
"fork_targets": fork_targets,
}
self._add_prompt_to_payload(agent, payload_out, payload)
self.memory.log(
agent_id,
agent.__class__.__name__,
payload_out,
step=self.step_index,
run_id=self.run_id,
)
print(
f"{datetime.now()} > [ORKA][FORK][PARALLEL] {self.step_index} > Running forked agents in parallel for group {fork_group_id}",
)
fork_logs = await self.run_parallel_agents(
fork_targets,
fork_group_id,
input_data,
payload["previous_outputs"],
)
logs.extend(fork_logs) # Add forked agent logs to the main log
return payload_out
# Handle JoinNode: wait for all forked agents to finish, then join results
elif agent_type == "joinnode":
fork_group_id = self.memory.hget(
f"fork_group_mapping:{agent.group_id}",
"group_id",
)
if fork_group_id:
fork_group_id = (
fork_group_id.decode() if isinstance(fork_group_id, bytes) else fork_group_id
)
else:
fork_group_id = agent.group_id # fallback
payload["fork_group_id"] = fork_group_id # inject
result = agent.run(payload)
payload_out = {
"input": input_data,
"fork_group_id": fork_group_id,
"result": result,
}
self._add_prompt_to_payload(agent, payload_out, payload)
if not fork_group_id:
raise ValueError(
f"JoinNode '{agent_id}' missing required group_id.",
)
# Handle different JoinNode statuses
if result.get("status") == "waiting":
print(
f"{datetime.now()} > [ORKA][JOIN][WAITING] {self.step_index} > Node '{agent_id}' is still waiting on fork group: {fork_group_id}",
)
queue.append(agent_id)
self.memory.log(
agent_id,
agent.__class__.__name__,
payload_out,
step=self.step_index,
run_id=self.run_id,
)
# Return waiting status instead of continue
return {"status": "waiting", "result": result}
elif result.get("status") == "timeout":
print(
f"{datetime.now()} > [ORKA][JOIN][TIMEOUT] {self.step_index} > Node '{agent_id}' timed out waiting for fork group: {fork_group_id}",
)
self.memory.log(
agent_id,
agent.__class__.__name__,
payload_out,
step=self.step_index,
run_id=self.run_id,
)
# Clean up the fork group even on timeout
self.fork_manager.delete_group(fork_group_id)
return {"status": "timeout", "result": result}
elif result.get("status") == "done":
self.fork_manager.delete_group(
fork_group_id,
) # Clean up fork group after successful join
return payload_out
else:
# Normal Agent: run and handle result
# Render prompt before running agent if agent has a prompt
self._render_agent_prompt(agent, payload)
if agent_type in ("memoryreadernode", "memorywriternode", "failovernode"):
# Memory nodes and failover nodes have async run methods
result = await agent.run(payload)
else:
# Regular synchronous agent
result = agent.run(payload)
# If agent is waiting (e.g., for async input), return waiting status
if isinstance(result, dict) and result.get("status") == "waiting":
print(
f"{datetime.now()} > [ORKA][WAITING] {self.step_index} > Node '{agent_id}' is still waiting: {result.get('received')}",
)
queue.append(agent_id)
return {"status": "waiting", "result": result}
# After normal agent finishes, mark it done if it's part of a fork
fork_group = payload.get("input", {})
if fork_group:
self.fork_manager.mark_agent_done(fork_group, agent_id)
# Check if this agent has a next-in-sequence step in its branch
next_agent = self.fork_manager.next_in_sequence(fork_group, agent_id)
if next_agent:
print(
f"{datetime.now()} > [ORKA][FORK-SEQUENCE] {self.step_index} > Agent '{agent_id}' finished. Enqueuing next in sequence: '{next_agent}'",
)
self.enqueue_fork([next_agent], fork_group)
payload_out = {"input": input_data, "result": result}
self._add_prompt_to_payload(agent, payload_out, payload)
return payload_out
async def _run_agent_async(self, agent_id, input_data, previous_outputs):
"""
Run a single agent asynchronously.
"""
agent = self.agents[agent_id]
payload = {"input": input_data, "previous_outputs": previous_outputs}
# Render prompt before running agent if agent has a prompt
self._render_agent_prompt(agent, payload)
# 🔍 DEBUG: Check if template rendering worked (only if DEBUG logging is enabled)
if logger.isEnabledFor(logging.DEBUG) and hasattr(agent, "prompt") and agent.prompt:
if "formatted_prompt" in payload:
original_template = agent.prompt
rendered_template = payload["formatted_prompt"]
# Check if template was actually rendered (changed from original)
if original_template != rendered_template:
logger.debug(f"Agent '{agent_id}' template rendered successfully")
else:
logger.debug(f"Agent '{agent_id}' template unchanged - possible template issue")
# Inspect the run method to see if it needs orchestrator
run_method = agent.run
sig = inspect.signature(run_method)
needs_orchestrator = len(sig.parameters) > 1 # More than just 'self'
is_async = inspect.iscoroutinefunction(run_method)
if needs_orchestrator:
# Node that needs orchestrator
result = run_method(self, payload)
if is_async or asyncio.iscoroutine(result):
result = await result
elif is_async:
# Async node/agent that doesn't need orchestrator
result = await run_method(payload)
else:
# Synchronous agent
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, run_method, payload)
return agent_id, result
async def _run_branch_async(self, branch_agents, input_data, previous_outputs):
"""
Run a sequence of agents in a branch sequentially.
"""
branch_results = {}
for agent_id in branch_agents:
agent_id, result = await self._run_agent_async(
agent_id,
input_data,
previous_outputs,
)
branch_results[agent_id] = result
# Update previous_outputs for the next agent in the branch
previous_outputs = {**previous_outputs, **branch_results}
return branch_results
[docs]
async def run_parallel_agents(
self,
agent_ids,
fork_group_id,
input_data,
previous_outputs,
):
"""
Run multiple branches in parallel, with agents within each branch running sequentially.
Returns a list of log entries for each forked agent.
"""
# 🔧 GENERIC FIX: Ensure complete context is passed to forked agents
logger.debug(
f"run_parallel_agents called with previous_outputs keys: {list(previous_outputs.keys())}",
)
# Enhanced debugging: Check the structure of previous_outputs (only if DEBUG enabled)
if logger.isEnabledFor(logging.DEBUG):
for agent_id, agent_result in previous_outputs.items():
if isinstance(agent_result, dict):
# Check for common nested structures
if "memories" in agent_result:
memories = agent_result["memories"]
logger.debug(
f"Agent '{agent_id}' has {len(memories) if isinstance(memories, list) else 'non-list'} memories",
)
if "result" in agent_result:
logger.debug(f"Agent '{agent_id}' has nested result structure")
# Get the fork node to understand the branch structure
# Fork group ID format: {node_id}_{timestamp}, so we need to remove the timestamp
fork_node_id = "_".join(
fork_group_id.split("_")[:-1],
) # Remove the last part (timestamp)
fork_node = self.agents[fork_node_id]
branches = fork_node.targets
# 🔧 GENERIC FIX: Ensure previous_outputs is properly structured
# Make a deep copy to avoid modifying the original
enhanced_previous_outputs = self._ensure_complete_context(previous_outputs)
# Run each branch in parallel
branch_tasks = [
self._run_branch_async(branch, input_data, enhanced_previous_outputs)
for branch in branches
]
# Wait for all branches to complete
branch_results = await asyncio.gather(*branch_tasks)
# Process results and create logs
forked_step_index = 0
result_logs = []
updated_previous_outputs = enhanced_previous_outputs.copy()
# Flatten branch results into a single list of (agent_id, result) pairs
all_results = []
for branch_result in branch_results:
all_results.extend(branch_result.items())
for agent_id, result in all_results:
forked_step_index += 1
step_index = f"{self.step_index}[{forked_step_index}]"
# Ensure result is awaited if it's a coroutine
if asyncio.iscoroutine(result):
result = await result
# Save result to Redis for JoinNode
join_state_key = "waitfor:join_parallel_checks:inputs"
self.memory.hset(join_state_key, agent_id, json.dumps(result))
# Create log entry with current previous_outputs (before updating with this agent's result)
payload_data = {"result": result}
agent = self.agents[agent_id]
payload_context = {
"input": input_data,
"previous_outputs": updated_previous_outputs,
}
self._add_prompt_to_payload(agent, payload_data, payload_context)
log_data = {
"agent_id": agent_id,
"event_type": f"ForkedAgent-{self.agents[agent_id].__class__.__name__}",
"timestamp": datetime.now(UTC).isoformat(),
"payload": payload_data,
"previous_outputs": updated_previous_outputs.copy(),
"step": step_index,
"run_id": self.run_id,
}
result_logs.append(log_data)
# Log to memory
self.memory.log(
agent_id,
f"ForkedAgent-{self.agents[agent_id].__class__.__name__}",
payload_data,
step=step_index,
run_id=self.run_id,
previous_outputs=updated_previous_outputs.copy(),
)
# Update previous_outputs with this agent's result AFTER logging
updated_previous_outputs[agent_id] = result
return result_logs
def _ensure_complete_context(self, previous_outputs):
"""
Generic method to ensure previous_outputs has complete context for template rendering.
This handles various agent result structures and ensures templates can access data.
"""
enhanced_outputs = {}
for agent_id, agent_result in previous_outputs.items():
# Start with the original result
enhanced_outputs[agent_id] = agent_result
# If the result is a complex structure, ensure it's template-friendly
if isinstance(agent_result, dict):
# Handle different common agent result patterns
# Pattern 1: Direct result (like memory nodes)
if "memories" in agent_result and isinstance(agent_result["memories"], list):
logger.debug(
f"Agent '{agent_id}' has {len(agent_result['memories'])} memories directly accessible",
)
# Pattern 2: Nested result structure
elif "result" in agent_result and isinstance(agent_result["result"], dict):
nested_result = agent_result["result"]
logger.debug(
f"Agent '{agent_id}' has nested result with keys: {list(nested_result.keys())}",
)
# For nested structures, also provide direct access to common fields
if "memories" in nested_result:
# Create a version that allows both access patterns
enhanced_outputs[agent_id] = {
**agent_result, # Keep original structure
"memories": nested_result["memories"], # Direct access
}
logger.debug(f"Agent '{agent_id}' enhanced with direct memory access")
if "response" in nested_result:
enhanced_outputs[agent_id] = {
**enhanced_outputs.get(agent_id, agent_result),
"response": nested_result["response"], # Direct access
}
logger.debug(f"Agent '{agent_id}' enhanced with direct response access")
return enhanced_outputs
[docs]
def enqueue_fork(self, agent_ids, fork_group_id):
"""
Add agents to the fork queue for processing.
"""
for agent_id in agent_ids:
self.queue.append(agent_id)