Source code for orka.orchestrator.base

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Base Orchestrator Module
========================

This module contains the core orchestrator base class that handles initialization,
configuration management, and setup of core infrastructure components including
memory backends, fork management, and error tracking.

The :class:`OrchestratorBase` class serves as the foundation for the main
:class:`~orka.orchestrator.Orchestrator` class through multiple inheritance composition.

Core Responsibilities
--------------------

**Configuration Management**
    * Loads and validates YAML configuration files
    * Extracts orchestrator and agent configurations
    * Handles environment variable overrides

**Infrastructure Setup**
    * Initializes memory backend (Redis or Kafka)
    * Configures fork group management for parallel execution
    * Sets up error tracking and telemetry systems

**Runtime State Management**
    * Maintains execution queue and step counters
    * Generates unique run identifiers for traceability
    * Tracks overall execution status and metrics
"""

import logging
import os
from typing import Any, Dict
from uuid import uuid4

from ..fork_group_manager import ForkGroupManager
from ..loader import YAMLLoader
from ..memory_logger import create_memory_logger

logger = logging.getLogger(__name__)


[docs] class OrchestratorBase: """ Base orchestrator class that handles initialization and configuration. This class provides the foundational infrastructure for the OrKa orchestration framework, including configuration loading, memory backend setup, and core state management. It is designed to be composed with other specialized classes through multiple inheritance. The class automatically configures the appropriate backend based on environment variables and provides comprehensive error tracking capabilities for monitoring and debugging orchestration runs. Attributes: loader (:class:`~orka.loader.YAMLLoader`): Configuration file loader and validator orchestrator_cfg (dict): Orchestrator-specific configuration settings agent_cfgs (list): List of agent configuration objects memory: Memory backend instance (Redis or Kafka) fork_manager: Fork group manager for parallel execution queue (list): Current agent execution queue run_id (str): Unique identifier for this orchestration run step_index (int): Current step counter for traceability error_telemetry (dict): Comprehensive error tracking and metrics """
[docs] def __init__(self, config_path): """ Initialize the Orchestrator with a YAML config file. Sets up all core infrastructure including configuration loading, memory backend selection, fork management, and error tracking systems. Args: config_path (str): Path to the YAML configuration file Environment Variables: ORKA_MEMORY_BACKEND: Memory backend type ('redis' or 'kafka', default: 'redis') ORKA_DEBUG_KEEP_PREVIOUS_OUTPUTS: Keep previous outputs for debugging ('true'/'false') KAFKA_BOOTSTRAP_SERVERS: Kafka broker addresses (for Kafka backend) KAFKA_TOPIC_PREFIX: Topic prefix for Kafka topics (default: 'orka-memory') REDIS_URL: Redis connection URL (default: 'redis://localhost:6380/0') """ self.loader = YAMLLoader(config_path) self.loader.validate() self.orchestrator_cfg = self.loader.get_orchestrator() self.agent_cfgs = self.loader.get_agents() # Memory backend configuration with RedisStack as default memory_backend = os.getenv("ORKA_MEMORY_BACKEND", "redisstack").lower() # Get debug flag from orchestrator config or environment debug_keep_previous_outputs = self.orchestrator_cfg.get("debug", {}).get( "keep_previous_outputs", False, ) debug_keep_previous_outputs = ( debug_keep_previous_outputs or os.getenv("ORKA_DEBUG_KEEP_PREVIOUS_OUTPUTS", "false").lower() == "true" ) # Extract decay configuration from orchestrator config and environment decay_config = self._init_decay_config() if memory_backend == "kafka": self.memory = create_memory_logger( backend="kafka", bootstrap_servers=os.getenv( "KAFKA_BOOTSTRAP_SERVERS", "localhost:9092", ), topic_prefix=os.getenv("KAFKA_TOPIC_PREFIX", "orka-memory"), debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, redis_url=os.getenv("REDIS_URL", "redis://localhost:6380/0"), enable_hnsw=True, vector_params={ "M": 16, "ef_construction": 200, "ef_runtime": 10, }, ) # For Kafka backend, now use Redis-based fork manager since we have Redis for memory self.fork_manager = ForkGroupManager(self.memory.redis) else: self.memory = create_memory_logger( backend="redisstack", redis_url=os.getenv("REDIS_URL", "redis://localhost:6380/0"), debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, enable_hnsw=True, vector_params={ "M": 16, "ef_construction": 200, "ef_runtime": 10, }, ) # For Redis, use the existing Redis-based fork manager self.fork_manager = ForkGroupManager(self.memory.redis) self.queue = self.orchestrator_cfg["agents"][:] # Initial agent execution queue self.run_id = str(uuid4()) # Unique run/session ID self.step_index = 0 # Step counter for traceability # Error tracking and telemetry self.error_telemetry = { "errors": [], # List of all errors encountered "retry_counters": {}, # Per-agent retry counts "partial_successes": [], # Agents that succeeded after retries "silent_degradations": [], # JSON parsing failures that fell back to raw text "status_codes": {}, # HTTP status codes for API calls "execution_status": "running", # overall status: running, completed, failed, partial "critical_failures": [], # Failures that stopped execution "recovery_actions": [], # Actions taken to recover from errors }
[docs] def enqueue_fork(self, agent_ids, fork_group_id): """ Enqueue a fork group for parallel execution. """
# This method will be implemented in the execution engine def _init_decay_config(self) -> Dict[str, Any]: """ Initialize decay configuration from orchestrator config and environment variables. Returns: Processed decay configuration with defaults applied """ # Start with default configuration decay_config = { "enabled": False, # Opt-in by default for backward compatibility "default_short_term_hours": 1.0, "default_long_term_hours": 24.0, "check_interval_minutes": 30, } # Extract from orchestrator YAML config orchestrator_memory_config = self.orchestrator_cfg.get("memory", {}) orchestrator_decay_config = orchestrator_memory_config.get("decay", {}) if orchestrator_decay_config: # Merge orchestrator-level decay config decay_config.update(orchestrator_decay_config) # Override with environment variables if present env_enabled = os.getenv("ORKA_MEMORY_DECAY_ENABLED") if env_enabled is not None: decay_config["enabled"] = env_enabled.lower() == "true" env_short_term = os.getenv("ORKA_MEMORY_DECAY_SHORT_TERM_HOURS") if env_short_term is not None: try: decay_config["default_short_term_hours"] = float(env_short_term) except ValueError: logger.warning( f"Invalid ORKA_MEMORY_DECAY_SHORT_TERM_HOURS value: {env_short_term}", ) env_long_term = os.getenv("ORKA_MEMORY_DECAY_LONG_TERM_HOURS") if env_long_term is not None: try: decay_config["default_long_term_hours"] = float(env_long_term) except ValueError: logger.warning(f"Invalid ORKA_MEMORY_DECAY_LONG_TERM_HOURS value: {env_long_term}") env_interval = os.getenv("ORKA_MEMORY_DECAY_CHECK_INTERVAL_MINUTES") if env_interval is not None: try: decay_config["check_interval_minutes"] = int(env_interval) except ValueError: logger.warning( f"Invalid ORKA_MEMORY_DECAY_CHECK_INTERVAL_MINUTES value: {env_interval}", ) # Log decay configuration if enabled if decay_config.get("enabled", False): logger.info( f"Memory decay enabled: short_term={decay_config['default_short_term_hours']}h, " f"long_term={decay_config['default_long_term_hours']}h, " f"check_interval={decay_config['check_interval_minutes']}min", ) else: logger.debug("Memory decay disabled") return decay_config