orka.fork_group_manager module
Fork Group Manager
The Fork Group Manager is responsible for coordinating parallel execution branches in the OrKa orchestration framework. It provides functionality to create, track, and manage groups of agents that need to run in parallel, with synchronization points for gathering their results.
Primary responsibilities:
Creating fork groups and registering agents within them
Tracking the completion status of parallel-executing agents
Determining when all branches of execution have completed
Managing the sequence of agents within each execution branch
Providing utilities for generating unique group IDs and Redis keys
This module enables complex workflow patterns like: - Parallel processing of the same input with different agents - Fan-out/fan-in patterns where work is distributed and then collected - Sequential chains of agents within parallel branches - Dynamic branching based on intermediate results
The implementation uses Redis as a coordination mechanism to ensure reliable operation in distributed environments.
- class orka.fork_group_manager.ForkGroupManager(redis_client)[source]
Bases:
object
Manages fork groups in the OrKa orchestrator. Handles the creation, tracking, and cleanup of fork groups for parallel execution.
A fork group represents a set of agent execution paths that need to run in parallel and eventually be synchronized. This manager keeps track of which agents are part of each group and which ones have completed their execution.
- __init__(redis_client)[source]
Initialize the fork group manager with a Redis client.
- Parameters:
redis_client – The Redis client instance.
- create_group(fork_group_id, agent_ids)[source]
Create a new fork group with the given agent IDs.
- Parameters:
fork_group_id (str) – ID of the fork group.
agent_ids (list) – List of agent IDs to include in the group.
- mark_agent_done(fork_group_id, agent_id)[source]
Mark an agent as done in the fork group.
- Parameters:
fork_group_id (str) – ID of the fork group.
agent_id (str) – ID of the agent to mark as done.
- is_group_done(fork_group_id)[source]
Check if all agents in the fork group are done.
- Parameters:
fork_group_id (str) – ID of the fork group.
- Returns:
True if all agents are done, False otherwise.
- Return type:
bool
- list_pending_agents(fork_group_id)[source]
Get a list of agents still pending in the fork group.
- Parameters:
fork_group_id (str) – ID of the fork group.
- Returns:
List of pending agent IDs.
- Return type:
list
- delete_group(fork_group_id)[source]
Delete the fork group from Redis.
- Parameters:
fork_group_id (str) – ID of the fork group to delete.
- generate_group_id(base_id)[source]
Generate a unique fork group ID based on the base ID and timestamp.
- Parameters:
base_id (str) – Base ID for the fork group.
- Returns:
A unique fork group ID.
- Return type:
str
- class orka.fork_group_manager.SimpleForkGroupManager[source]
Bases:
object
A simple in-memory fork group manager for use with non-Redis backends like Kafka. Provides the same interface as ForkGroupManager but stores data in memory.
Note: This implementation is not distributed and will not work across multiple orchestrator instances. Use only for single-instance deployments with Kafka.
- create_group(fork_group_id, agent_ids)[source]
Create a new fork group with the given agent IDs.
- Parameters:
fork_group_id (str) – ID of the fork group.
agent_ids (list) – List of agent IDs to include in the group.
- mark_agent_done(fork_group_id, agent_id)[source]
Mark an agent as done in the fork group.
- Parameters:
fork_group_id (str) – ID of the fork group.
agent_id (str) – ID of the agent to mark as done.
- is_group_done(fork_group_id)[source]
Check if all agents in the fork group are done.
- Parameters:
fork_group_id (str) – ID of the fork group.
- Returns:
True if all agents are done, False otherwise.
- Return type:
bool
- list_pending_agents(fork_group_id)[source]
Get a list of agents still pending in the fork group.
- Parameters:
fork_group_id (str) – ID of the fork group.
- Returns:
List of pending agent IDs.
- Return type:
list
- delete_group(fork_group_id)[source]
Delete the fork group from memory.
- Parameters:
fork_group_id (str) – ID of the fork group to delete.
- generate_group_id(base_id)[source]
Generate a unique fork group ID based on the base ID and timestamp.
- Parameters:
base_id (str) – Base ID for the fork group.
- Returns:
A unique fork group ID.
- Return type:
str
- track_branch_sequence(fork_group_id, agent_sequence)[source]
Track the sequence of agents in a branch.
- Parameters:
fork_group_id (str) – ID of the fork group.
agent_sequence (list) – List of agent IDs in sequence.