orka.memory.base_logger module
Base Memory Logger
Abstract base class that defines the interface and common functionality for all memory logger implementations in OrKa. This class provides the foundation for persistent memory storage across different backends.
Core Responsibilities
Interface Definition - Defines abstract methods that all memory backends must implement - Provides common initialization and configuration patterns - Establishes consistent behavior across different storage backends
Memory Lifecycle Management - Automatic memory decay based on configurable rules - Importance scoring for memory retention decisions - Memory type classification (short-term vs long-term) - Category-based memory organization (logs vs stored memories)
Data Optimization - Blob deduplication for large objects to reduce storage overhead - Serialization mixins for consistent data handling - File operation mixins for export/import functionality - Configurable thresholds for optimization decisions
Thread Safety - Thread-safe decay scheduling and management - Concurrent access patterns for multi-threaded environments - Proper resource cleanup and lifecycle management
Architecture Details
Memory Classification System - Categories: “log” (orchestration events) vs “stored” (persistent memories) - Types: “short_term” (temporary) vs “long_term” (persistent) - Importance Scoring: 0.0-1.0 scale based on event type and content - Decay Rules: Configurable retention policies per category/type
Blob Deduplication - SHA256 hashing for content identification - Reference counting for cleanup decisions - Configurable size threshold (default: 200 characters) - Automatic cleanup of unused blobs
Decay Management - Background thread for automatic cleanup - Configurable check intervals (default: 30 minutes) - Dry-run support for testing cleanup operations - Graceful shutdown with proper thread cleanup
Implementation Requirements
Required Abstract Methods All concrete implementations must provide:
log() - Store orchestration events and memory entries
tail() - Retrieve recent entries for debugging
cleanup_expired_memories() - Remove expired entries
get_memory_stats() - Provide storage statistics
Redis-compatible methods: hset, hget, hkeys, hdel, get, set, delete
Set operations: smembers, sadd, srem
Optional Enhancements Implementations may provide:
Vector search capabilities for semantic similarity
Advanced filtering and querying options
Performance optimizations for specific use cases
Integration with external systems (Redis, Kafka, etc.)
Configuration Options
Decay Configuration ```python decay_config = {
“enabled”: True, “default_short_term_hours”: 1.0, “default_long_term_hours”: 24.0, “check_interval_minutes”: 30, “memory_type_rules”: {
“long_term_events”: [“success”, “completion”, “write”, “result”], “short_term_events”: [“debug”, “processing”, “start”, “progress”]
}, “importance_rules”: {
“base_score”: 0.5, “event_type_boosts”: {“write”: 0.3, “success”: 0.2}, “agent_type_boosts”: {“memory”: 0.2, “openai-answer”: 0.1}
}
}
Blob Deduplication - _blob_threshold: Minimum size for deduplication (default: 200 chars) - Automatic reference counting and cleanup - SHA256 hashing for content identification
Usage Patterns
Implementing a Custom Backend ```python from orka.memory.base_logger import BaseMemoryLogger
- class CustomMemoryLogger(BaseMemoryLogger):
- def __init__(self, **kwargs):
super().__init__(**kwargs) self._storage = {} # Your storage implementation
- def log(self, agent_id, event_type, payload, **kwargs):
# Implement storage logic pass
- def cleanup_expired_memories(self, dry_run=False):
# Implement cleanup logic pass
# … implement other abstract methods
Memory Classification Logic - Orchestration logs are always classified as short-term - Only “stored” memories can be classified as long-term - Importance scoring influences retention decisions - Event types and agent types affect classification
Thread Safety Considerations - Decay scheduler runs in background thread - Proper synchronization for concurrent access - Graceful shutdown handling with stop events - Resource cleanup on object destruction
- class orka.memory.base_logger.BaseMemoryLogger(stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None)[source]
Bases:
ABC
,SerializationMixin
,FileOperationsMixin
Abstract base class for memory loggers. Defines the interface that must be implemented by all memory backends.
- __init__(stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None) None [source]
Initialize the memory logger.
- Parameters:
stream_key – Key for the memory stream. Defaults to “orka:memory”.
debug_keep_previous_outputs – If True, keeps previous_outputs in log files for debugging.
decay_config – Configuration for memory decay functionality.
- abstractmethod cleanup_expired_memories(dry_run: bool = False) dict[str, Any] [source]
Clean up expired memory entries based on decay configuration.
- Parameters:
dry_run – If True, return what would be deleted without actually deleting
- Returns:
Dictionary containing cleanup statistics
- abstractmethod get_memory_stats() dict[str, Any] [source]
Get memory usage statistics.
- Returns:
Dictionary containing memory statistics
- abstractmethod log(agent_id: str, event_type: str, payload: dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: dict[str, Any] | None = None, agent_decay_config: dict[str, Any] | None = None, log_type: str = 'log') None [source]
Log an event to the memory backend.
- abstractmethod tail(count: int = 10) list[dict[str, Any]] [source]
Retrieve the most recent events.