orka.memory.base_logger module

Base Memory Logger

Abstract base class that defines the interface and common functionality for all memory logger implementations in OrKa. This class provides the foundation for persistent memory storage across different backends.

Core Responsibilities

Interface Definition - Defines abstract methods that all memory backends must implement - Provides common initialization and configuration patterns - Establishes consistent behavior across different storage backends

Memory Lifecycle Management - Automatic memory decay based on configurable rules - Importance scoring for memory retention decisions - Memory type classification (short-term vs long-term) - Category-based memory organization (logs vs stored memories)

Data Optimization - Blob deduplication for large objects to reduce storage overhead - Serialization mixins for consistent data handling - File operation mixins for export/import functionality - Configurable thresholds for optimization decisions

Thread Safety - Thread-safe decay scheduling and management - Concurrent access patterns for multi-threaded environments - Proper resource cleanup and lifecycle management

Architecture Details

Memory Classification System - Categories: “log” (orchestration events) vs “stored” (persistent memories) - Types: “short_term” (temporary) vs “long_term” (persistent) - Importance Scoring: 0.0-1.0 scale based on event type and content - Decay Rules: Configurable retention policies per category/type

Blob Deduplication - SHA256 hashing for content identification - Reference counting for cleanup decisions - Configurable size threshold (default: 200 characters) - Automatic cleanup of unused blobs

Decay Management - Background thread for automatic cleanup - Configurable check intervals (default: 30 minutes) - Dry-run support for testing cleanup operations - Graceful shutdown with proper thread cleanup

Implementation Requirements

Required Abstract Methods All concrete implementations must provide:

  • log() - Store orchestration events and memory entries

  • tail() - Retrieve recent entries for debugging

  • cleanup_expired_memories() - Remove expired entries

  • get_memory_stats() - Provide storage statistics

  • Redis-compatible methods: hset, hget, hkeys, hdel, get, set, delete

  • Set operations: smembers, sadd, srem

Optional Enhancements Implementations may provide:

  • Vector search capabilities for semantic similarity

  • Advanced filtering and querying options

  • Performance optimizations for specific use cases

  • Integration with external systems (Redis, Kafka, etc.)

Configuration Options

Decay Configuration ```python decay_config = {

“enabled”: True, “default_short_term_hours”: 1.0, “default_long_term_hours”: 24.0, “check_interval_minutes”: 30, “memory_type_rules”: {

“long_term_events”: [“success”, “completion”, “write”, “result”], “short_term_events”: [“debug”, “processing”, “start”, “progress”]

}, “importance_rules”: {

“base_score”: 0.5, “event_type_boosts”: {“write”: 0.3, “success”: 0.2}, “agent_type_boosts”: {“memory”: 0.2, “openai-answer”: 0.1}

}

}

Blob Deduplication - _blob_threshold: Minimum size for deduplication (default: 200 chars) - Automatic reference counting and cleanup - SHA256 hashing for content identification

Usage Patterns

Implementing a Custom Backend ```python from orka.memory.base_logger import BaseMemoryLogger

class CustomMemoryLogger(BaseMemoryLogger):
def __init__(self, **kwargs):

super().__init__(**kwargs) self._storage = {} # Your storage implementation

def log(self, agent_id, event_type, payload, **kwargs):

# Implement storage logic pass

def cleanup_expired_memories(self, dry_run=False):

# Implement cleanup logic pass

# … implement other abstract methods

```

Memory Classification Logic - Orchestration logs are always classified as short-term - Only “stored” memories can be classified as long-term - Importance scoring influences retention decisions - Event types and agent types affect classification

Thread Safety Considerations - Decay scheduler runs in background thread - Proper synchronization for concurrent access - Graceful shutdown handling with stop events - Resource cleanup on object destruction

class orka.memory.base_logger.BaseMemoryLogger(stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None)[source]

Bases: ABC, SerializationMixin, FileOperationsMixin

Abstract base class for memory loggers. Defines the interface that must be implemented by all memory backends.

__init__(stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None) None[source]

Initialize the memory logger.

Parameters:
  • stream_key – Key for the memory stream. Defaults to “orka:memory”.

  • debug_keep_previous_outputs – If True, keeps previous_outputs in log files for debugging.

  • decay_config – Configuration for memory decay functionality.

stop_decay_scheduler()[source]

Stop the automatic decay scheduler.

abstractmethod cleanup_expired_memories(dry_run: bool = False) dict[str, Any][source]

Clean up expired memory entries based on decay configuration.

Parameters:

dry_run – If True, return what would be deleted without actually deleting

Returns:

Dictionary containing cleanup statistics

abstractmethod get_memory_stats() dict[str, Any][source]

Get memory usage statistics.

Returns:

Dictionary containing memory statistics

abstractmethod log(agent_id: str, event_type: str, payload: dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: dict[str, Any] | None = None, agent_decay_config: dict[str, Any] | None = None, log_type: str = 'log') None[source]

Log an event to the memory backend.

abstractmethod tail(count: int = 10) list[dict[str, Any]][source]

Retrieve the most recent events.

abstractmethod hset(name: str, key: str, value: str | bytes | int | float) int[source]

Set a field in a hash structure.

abstractmethod hget(name: str, key: str) str | None[source]

Get a field from a hash structure.

abstractmethod hkeys(name: str) list[str][source]

Get all keys in a hash structure.

abstractmethod hdel(name: str, *keys: str) int[source]

Delete fields from a hash structure.

abstractmethod smembers(name: str) list[str][source]

Get all members of a set.

abstractmethod sadd(name: str, *values: str) int[source]

Add members to a set.

abstractmethod srem(name: str, *values: str) int[source]

Remove members from a set.

abstractmethod get(key: str) str | None[source]

Get a value by key.

abstractmethod set(key: str, value: str | bytes | int | float) bool[source]

Set a value by key.

abstractmethod delete(*keys: str) int[source]

Delete keys.