Source code for orka.registry

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Resource Registry
===============

The Resource Registry is a central dependency injection and resource management
system for the OrKa framework. It provides a consistent interface for initializing,
accessing, and managing shared resources such as language models, embedding models,
databases, and custom tools.

Key responsibilities:
-------------------
1. Lazy initialization of resources based on configuration
2. Centralized access to shared dependencies
3. Resource lifecycle management (initialization and cleanup)
4. Consistent error handling for resource failures
5. Support for custom resource types through dynamic imports

Supported resource types:
-----------------------
- sentence_transformer: Text embedding models
- redis: Redis database client
- openai: OpenAI API client
- custom: Dynamically loaded custom resources

The registry pattern helps maintain clean separation of concerns by isolating
resource initialization logic and providing a single source of truth for shared
dependencies across the framework.
"""

import importlib
import logging
from typing import Any, Dict

import redis.asyncio as redis
from openai import AsyncOpenAI

from .contracts import ResourceConfig

logger = logging.getLogger(__name__)

# Optional dependencies
try:
    from sentence_transformers import SentenceTransformer

    HAS_SENTENCE_TRANSFORMERS = True
except ImportError:
    SentenceTransformer = None
    HAS_SENTENCE_TRANSFORMERS = False
    logger.warning(
        "sentence_transformers not available. Install with: pip install sentence-transformers"
    )


[docs] class ResourceRegistry: """ Manages resource initialization, access, and lifecycle. The ResourceRegistry is responsible for lazily initializing resources when needed, providing access to them via a simple interface, and ensuring proper cleanup when the application shuts down. It acts as a central dependency provider for all OrKa components. """
[docs] def __init__(self, config: Dict[str, ResourceConfig]): """ Initialize the registry with resource configurations. Args: config: Dictionary mapping resource names to their configurations """ self._resources: Dict[str, Any] = {} self._config = config self._initialized = False
[docs] async def initialize(self) -> None: """ Initialize all resources based on their configurations. This method lazily initializes all configured resources, handling any initialization errors and ensuring resources are only initialized once. Raises: Exception: If any resource fails to initialize """ if self._initialized: return for name, resource_config in self._config.items(): try: resource = await self._init_resource(resource_config) self._resources[name] = resource except Exception as e: logger.error(f"Failed to initialize resource {name}: {e}") raise self._initialized = True
async def _init_resource(self, config: ResourceConfig) -> Any: """ Initialize a single resource based on its type and configuration. Args: config: Resource configuration containing type and parameters Returns: Initialized resource instance Raises: ValueError: If the resource type is unknown Exception: If resource initialization fails """ resource_type = config["type"] resource_config = config["config"] if resource_type == "sentence_transformer": if not HAS_SENTENCE_TRANSFORMERS: raise ImportError( "sentence_transformers is required for sentence_transformer resource. Install with: pip install sentence-transformers" ) return SentenceTransformer(resource_config["model_name"]) elif resource_type == "redis": return redis.from_url(resource_config["url"]) elif resource_type == "openai": return AsyncOpenAI(api_key=resource_config["api_key"]) elif resource_type == "custom": module_path = resource_config["module"] class_name = resource_config["class"] module = importlib.import_module(module_path) cls = getattr(module, class_name) return cls(**resource_config.get("init_args", {})) else: raise ValueError(f"Unknown resource type: {resource_type}")
[docs] def get(self, name: str) -> Any: """ Get a resource by name. Args: name: Name of the resource to retrieve Returns: The requested resource instance Raises: RuntimeError: If the registry has not been initialized KeyError: If the requested resource does not exist """ if not self._initialized: raise RuntimeError("Registry not initialized") if name not in self._resources: raise KeyError(f"Resource not found: {name}") return self._resources[name]
[docs] async def close(self) -> None: """ Clean up and close all resources. This method ensures proper cleanup of all resources when the application is shutting down, closing connections and releasing system resources. """ for name, resource in self._resources.items(): try: if hasattr(resource, "close"): await resource.close() elif hasattr(resource, "__aexit__"): await resource.__aexit__(None, None, None) except Exception as e: logger.error(f"Error closing resource {name}: {e}")
[docs] def init_registry(config: Dict[str, ResourceConfig]) -> ResourceRegistry: """ Create and initialize a new resource registry. Args: config: Dictionary mapping resource names to their configurations Returns: Initialized ResourceRegistry instance """ registry = ResourceRegistry(config) return registry