orka.fork_group_manager module

Fork Group Manager

The Fork Group Manager is responsible for coordinating parallel execution branches in the OrKa orchestration framework. It provides functionality to create, track, and manage groups of agents that need to run in parallel, with synchronization points for gathering their results.

Primary responsibilities:

  1. Creating fork groups and registering agents within them

  2. Tracking the completion status of parallel-executing agents

  3. Determining when all branches of execution have completed

  4. Managing the sequence of agents within each execution branch

  5. Providing utilities for generating unique group IDs and Redis keys

This module enables complex workflow patterns like: - Parallel processing of the same input with different agents - Fan-out/fan-in patterns where work is distributed and then collected - Sequential chains of agents within parallel branches - Dynamic branching based on intermediate results

The implementation uses Redis as a coordination mechanism to ensure reliable operation in distributed environments.

class orka.fork_group_manager.ForkGroupManager(redis_client)[source]

Bases: object

Manages fork groups in the OrKa orchestrator. Handles the creation, tracking, and cleanup of fork groups for parallel execution.

A fork group represents a set of agent execution paths that need to run in parallel and eventually be synchronized. This manager keeps track of which agents are part of each group and which ones have completed their execution.

__init__(redis_client)[source]

Initialize the fork group manager with a Redis client.

Parameters:

redis_client – The Redis client instance.

create_group(fork_group_id, agent_ids)[source]

Create a new fork group with the given agent IDs.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_ids (list) – List of agent IDs to include in the group.

mark_agent_done(fork_group_id, agent_id)[source]

Mark an agent as done in the fork group.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_id (str) – ID of the agent to mark as done.

is_group_done(fork_group_id)[source]

Check if all agents in the fork group are done.

Parameters:

fork_group_id (str) – ID of the fork group.

Returns:

True if all agents are done, False otherwise.

Return type:

bool

list_pending_agents(fork_group_id)[source]

Get a list of agents still pending in the fork group.

Parameters:

fork_group_id (str) – ID of the fork group.

Returns:

List of pending agent IDs.

Return type:

list

delete_group(fork_group_id)[source]

Delete the fork group from Redis.

Parameters:

fork_group_id (str) – ID of the fork group to delete.

generate_group_id(base_id)[source]

Generate a unique fork group ID based on the base ID and timestamp.

Parameters:

base_id (str) – Base ID for the fork group.

Returns:

A unique fork group ID.

Return type:

str

track_branch_sequence(fork_group_id, agent_sequence)[source]

Track the sequence of agents in a branch.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_sequence (list) – List of agent IDs in sequence.

next_in_sequence(fork_group_id, agent_id)[source]

Get the next agent in the sequence after the current agent.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_id (str) – ID of the current agent.

Returns:

ID of the next agent, or None if there is no next agent.

Return type:

str

class orka.fork_group_manager.SimpleForkGroupManager[source]

Bases: object

A simple in-memory fork group manager for use with non-Redis backends like Kafka. Provides the same interface as ForkGroupManager but stores data in memory.

Note: This implementation is not distributed and will not work across multiple orchestrator instances. Use only for single-instance deployments with Kafka.

__init__()[source]

Initialize the simple fork group manager with in-memory storage.

create_group(fork_group_id, agent_ids)[source]

Create a new fork group with the given agent IDs.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_ids (list) – List of agent IDs to include in the group.

mark_agent_done(fork_group_id, agent_id)[source]

Mark an agent as done in the fork group.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_id (str) – ID of the agent to mark as done.

is_group_done(fork_group_id)[source]

Check if all agents in the fork group are done.

Parameters:

fork_group_id (str) – ID of the fork group.

Returns:

True if all agents are done, False otherwise.

Return type:

bool

list_pending_agents(fork_group_id)[source]

Get a list of agents still pending in the fork group.

Parameters:

fork_group_id (str) – ID of the fork group.

Returns:

List of pending agent IDs.

Return type:

list

delete_group(fork_group_id)[source]

Delete the fork group from memory.

Parameters:

fork_group_id (str) – ID of the fork group to delete.

generate_group_id(base_id)[source]

Generate a unique fork group ID based on the base ID and timestamp.

Parameters:

base_id (str) – Base ID for the fork group.

Returns:

A unique fork group ID.

Return type:

str

track_branch_sequence(fork_group_id, agent_sequence)[source]

Track the sequence of agents in a branch.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_sequence (list) – List of agent IDs in sequence.

next_in_sequence(fork_group_id, agent_id)[source]

Get the next agent in the sequence after the current agent.

Parameters:
  • fork_group_id (str) – ID of the fork group.

  • agent_id (str) – ID of the current agent.

Returns:

ID of the next agent, or None if there is no next agent.

Return type:

str

remove_group(group_id)[source]

Remove a group (for compatibility with existing code).

Parameters:

group_id (str) – ID of the group to remove.

Raises:

KeyError – If the group doesn’t exist.