# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning
"""
Agent Factory
=============
Factory for creating and initializing agents and nodes based on configuration.
"""
import logging
from datetime import datetime
from ..agents import (
agents,
llm_agents,
local_llm_agents,
validation_and_structuring_agent,
)
from ..nodes import failing_node, failover_node, fork_node, join_node, router_node
from ..nodes.memory_reader_node import MemoryReaderNode
from ..nodes.memory_writer_node import MemoryWriterNode
from ..tools.search_tools import DuckDuckGoTool
logger = logging.getLogger(__name__)
AGENT_TYPES = {
"binary": agents.BinaryAgent,
"classification": agents.ClassificationAgent,
"local_llm": local_llm_agents.LocalLLMAgent,
"openai-answer": llm_agents.OpenAIAnswerBuilder,
"openai-binary": llm_agents.OpenAIBinaryAgent,
"openai-classification": llm_agents.OpenAIClassificationAgent,
"validate_and_structure": validation_and_structuring_agent.ValidationAndStructuringAgent,
"duckduckgo": DuckDuckGoTool,
"router": router_node.RouterNode,
"failover": failover_node.FailoverNode,
"failing": failing_node.FailingNode,
"join": join_node.JoinNode,
"fork": fork_node.ForkNode,
"memory": "special_handler", # This will be handled specially in init_single_agent
}
[docs]
class AgentFactory:
"""
Factory class for creating and initializing agents based on configuration.
"""
def _init_agents(self):
"""
Instantiate all agents/nodes as defined in the YAML config.
Returns a dict mapping agent IDs to their instances.
"""
print(self.orchestrator_cfg)
print(self.agent_cfgs)
instances = {}
def init_single_agent(cfg):
agent_cls = AGENT_TYPES.get(cfg["type"])
if not agent_cls:
raise ValueError(f"Unsupported agent type: {cfg['type']}")
agent_type = cfg["type"].strip().lower()
agent_id = cfg["id"]
# Remove fields not needed for instantiation
clean_cfg = cfg.copy()
clean_cfg.pop("id", None)
clean_cfg.pop("type", None)
clean_cfg.pop("prompt", None)
clean_cfg.pop("queue", None)
print(
f"{datetime.now()} > [ORKA][INIT] Instantiating agent {agent_id} of type {agent_type}",
)
# Special handling for node types with unique constructor signatures
if agent_type in ("router"):
# RouterNode expects node_id and params
prompt = cfg.get("prompt", None)
queue = cfg.get("queue", None)
return agent_cls(node_id=agent_id, **clean_cfg)
if agent_type in ("fork", "join"):
# Fork/Join nodes need memory_logger for group management
prompt = cfg.get("prompt", None)
queue = cfg.get("queue", None)
return agent_cls(
node_id=agent_id,
prompt=prompt,
queue=queue,
memory_logger=self.memory,
**clean_cfg,
)
if agent_type == "failover":
# FailoverNode takes a list of child agent instances
queue = cfg.get("queue", None)
child_instances = [
init_single_agent(child_cfg) for child_cfg in cfg.get("children", [])
]
return agent_cls(
node_id=agent_id,
children=child_instances,
queue=queue,
)
if agent_type == "failing":
prompt = cfg.get("prompt", None)
queue = cfg.get("queue", None)
return agent_cls(
node_id=agent_id,
prompt=prompt,
queue=queue,
**clean_cfg,
)
# Special handling for memory agent type
if agent_type == "memory" or agent_cls == "special_handler":
# Special handling for memory nodes based on operation
operation = cfg.get("config", {}).get("operation", "read")
prompt = cfg.get("prompt", None)
queue = cfg.get("queue", None)
namespace = cfg.get("namespace", "default")
# Extract agent-level decay configuration and merge with global config
agent_decay_config = cfg.get("decay", {})
merged_decay_config = {}
if hasattr(self, "memory") and hasattr(self.memory, "decay_config"):
# Start with global decay config as base
merged_decay_config = self.memory.decay_config.copy()
if agent_decay_config:
# Deep merge agent-specific decay config
for key, value in agent_decay_config.items():
if (
key in merged_decay_config
and isinstance(merged_decay_config[key], dict)
and isinstance(value, dict)
):
# Deep merge nested dictionaries
merged_decay_config[key].update(value)
else:
# Direct override for non-dict values
merged_decay_config[key] = value
else:
# No global config available, use agent config as-is (with defaults)
merged_decay_config = agent_decay_config
# Clean the config to remove any already processed fields
memory_cfg = clean_cfg.copy()
memory_cfg.pop(
"decay",
None,
) # Remove decay from clean_cfg as it's handled separately
if operation == "write":
# Use memory writer node for write operations
vector_enabled = memory_cfg.get("vector", False)
return MemoryWriterNode(
node_id=agent_id,
prompt=prompt,
queue=queue,
namespace=namespace,
vector=vector_enabled,
key_template=cfg.get("key_template"),
metadata=cfg.get("metadata", {}),
decay_config=merged_decay_config,
memory_logger=self.memory,
)
else: # default to read
# Use memory reader node for read operations
return MemoryReaderNode(
node_id=agent_id,
prompt=prompt,
queue=queue,
namespace=namespace,
limit=memory_cfg.get("limit", 10),
similarity_threshold=memory_cfg.get(
"similarity_threshold",
0.6,
),
decay_config=merged_decay_config,
memory_logger=self.memory,
)
# Special handling for search tools
if agent_type in ("duckduckgo"):
prompt = cfg.get("prompt", None)
queue = cfg.get("queue", None)
return agent_cls(
tool_id=agent_id,
prompt=prompt,
queue=queue,
**clean_cfg,
)
# Special handling for validation agent
if agent_type == "validate_and_structure":
# Create params dictionary with all configuration
params = {
"agent_id": agent_id,
"prompt": cfg.get("prompt", ""),
"queue": cfg.get("queue", None),
"store_structure": cfg.get("store_structure"),
**clean_cfg,
}
return agent_cls(params=params)
# Default agent instantiation
prompt = cfg.get("prompt", None)
queue = cfg.get("queue", None)
return agent_cls(agent_id=agent_id, prompt=prompt, queue=queue, **clean_cfg)
for cfg in self.agent_cfgs:
agent = init_single_agent(cfg)
instances[cfg["id"]] = agent
return instances