Source code for orka.memory.base_logger

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Base Memory Logger
=================

Abstract base class that defines the interface and common functionality for all
memory logger implementations in OrKa. This class provides the foundation for
persistent memory storage across different backends.

Core Responsibilities
--------------------

**Interface Definition**
- Defines abstract methods that all memory backends must implement
- Provides common initialization and configuration patterns
- Establishes consistent behavior across different storage backends

**Memory Lifecycle Management**
- Automatic memory decay based on configurable rules
- Importance scoring for memory retention decisions
- Memory type classification (short-term vs long-term)
- Category-based memory organization (logs vs stored memories)

**Data Optimization**
- Blob deduplication for large objects to reduce storage overhead
- Serialization mixins for consistent data handling
- File operation mixins for export/import functionality
- Configurable thresholds for optimization decisions

**Thread Safety**
- Thread-safe decay scheduling and management
- Concurrent access patterns for multi-threaded environments
- Proper resource cleanup and lifecycle management

Architecture Details
-------------------

**Memory Classification System**
- **Categories**: "log" (orchestration events) vs "stored" (persistent memories)
- **Types**: "short_term" (temporary) vs "long_term" (persistent)
- **Importance Scoring**: 0.0-1.0 scale based on event type and content
- **Decay Rules**: Configurable retention policies per category/type

**Blob Deduplication**
- SHA256 hashing for content identification
- Reference counting for cleanup decisions
- Configurable size threshold (default: 200 characters)
- Automatic cleanup of unused blobs

**Decay Management**
- Background thread for automatic cleanup
- Configurable check intervals (default: 30 minutes)
- Dry-run support for testing cleanup operations
- Graceful shutdown with proper thread cleanup

Implementation Requirements
--------------------------

**Required Abstract Methods**
All concrete implementations must provide:

- `log()` - Store orchestration events and memory entries
- `tail()` - Retrieve recent entries for debugging
- `cleanup_expired_memories()` - Remove expired entries
- `get_memory_stats()` - Provide storage statistics
- Redis-compatible methods: `hset`, `hget`, `hkeys`, `hdel`, `get`, `set`, `delete`
- Set operations: `smembers`, `sadd`, `srem`

**Optional Enhancements**
Implementations may provide:

- Vector search capabilities for semantic similarity
- Advanced filtering and querying options
- Performance optimizations for specific use cases
- Integration with external systems (Redis, Kafka, etc.)

Configuration Options
--------------------

**Decay Configuration**
```python
decay_config = {
    "enabled": True,
    "default_short_term_hours": 1.0,
    "default_long_term_hours": 24.0,
    "check_interval_minutes": 30,
    "memory_type_rules": {
        "long_term_events": ["success", "completion", "write", "result"],
        "short_term_events": ["debug", "processing", "start", "progress"]
    },
    "importance_rules": {
        "base_score": 0.5,
        "event_type_boosts": {"write": 0.3, "success": 0.2},
        "agent_type_boosts": {"memory": 0.2, "openai-answer": 0.1}
    }
}
```

**Blob Deduplication**
- `_blob_threshold`: Minimum size for deduplication (default: 200 chars)
- Automatic reference counting and cleanup
- SHA256 hashing for content identification

Usage Patterns
--------------

**Implementing a Custom Backend**
```python
from orka.memory.base_logger import BaseMemoryLogger

class CustomMemoryLogger(BaseMemoryLogger):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._storage = {}  # Your storage implementation

    def log(self, agent_id, event_type, payload, **kwargs):
        # Implement storage logic
        pass

    def cleanup_expired_memories(self, dry_run=False):
        # Implement cleanup logic
        pass

    # ... implement other abstract methods
```

**Memory Classification Logic**
- Orchestration logs are always classified as short-term
- Only "stored" memories can be classified as long-term
- Importance scoring influences retention decisions
- Event types and agent types affect classification

**Thread Safety Considerations**
- Decay scheduler runs in background thread
- Proper synchronization for concurrent access
- Graceful shutdown handling with stop events
- Resource cleanup on object destruction
"""

import hashlib
import json
import logging
import threading
from abc import ABC, abstractmethod
from datetime import UTC, datetime
from typing import Any

from .file_operations import FileOperationsMixin
from .serialization import SerializationMixin

logger = logging.getLogger(__name__)


[docs] class BaseMemoryLogger(ABC, SerializationMixin, FileOperationsMixin): """ Abstract base class for memory loggers. Defines the interface that must be implemented by all memory backends. """
[docs] def __init__( self, stream_key: str = "orka:memory", debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, ) -> None: """ Initialize the memory logger. Args: stream_key: Key for the memory stream. Defaults to "orka:memory". debug_keep_previous_outputs: If True, keeps previous_outputs in log files for debugging. decay_config: Configuration for memory decay functionality. """ self.stream_key = stream_key self.memory: list[dict[str, Any]] = [] # Local memory buffer self.debug_keep_previous_outputs = debug_keep_previous_outputs # Initialize decay configuration self.decay_config = self._init_decay_config(decay_config or {}) # Decay state management self._decay_thread = None self._decay_stop_event = threading.Event() self._last_decay_check = datetime.now(UTC) # Initialize automatic decay if enabled if self.decay_config.get("enabled", False): self._start_decay_scheduler() # Blob deduplication storage: SHA256 -> actual blob content self._blob_store: dict[str, Any] = {} # Track blob usage count for potential cleanup self._blob_usage: dict[str, int] = {} # Minimum size threshold for blob deduplication (in chars) self._blob_threshold = 200
def _init_decay_config(self, decay_config: dict[str, Any]) -> dict[str, Any]: """ Initialize decay configuration with defaults. Args: decay_config: Raw decay configuration Returns: Processed decay configuration with defaults applied """ default_config = { "enabled": False, # Disable by default to prevent logs from disappearing "default_short_term_hours": 1.0, "default_long_term_hours": 24.0, "check_interval_minutes": 30, "memory_type_rules": { "long_term_events": ["success", "completion", "write", "result"], "short_term_events": ["debug", "processing", "start", "progress"], }, "importance_rules": { "base_score": 0.5, "event_type_boosts": { "write": 0.3, "success": 0.2, "completion": 0.2, "result": 0.1, }, "agent_type_boosts": { "memory": 0.2, "openai-answer": 0.1, }, }, } # Deep merge with defaults merged_config = default_config.copy() for key, value in decay_config.items(): if isinstance(value, dict) and key in merged_config: merged_config[key].update(value) else: merged_config[key] = value return merged_config def _calculate_importance_score( self, event_type: str, agent_id: str, payload: dict[str, Any], ) -> float: """ Calculate importance score for a memory entry. Args: event_type: Type of the event agent_id: ID of the agent generating the event payload: Event payload Returns: Importance score between 0.0 and 1.0 """ rules = self.decay_config["importance_rules"] score = rules["base_score"] # Apply event type boosts event_boost = rules["event_type_boosts"].get(event_type, 0.0) score += event_boost # Apply agent type boosts for agent_type, boost in rules["agent_type_boosts"].items(): if agent_type in agent_id: score += boost break # Check payload for result indicators if isinstance(payload, dict): if payload.get("result") or payload.get("response"): score += 0.1 if payload.get("error"): score -= 0.1 # Clamp score between 0.0 and 1.0 return max(0.0, min(1.0, score)) def _classify_memory_type( self, event_type: str, importance_score: float, category: str = "log", ) -> str: """ Classify memory entry as short-term or long-term. Args: event_type: Type of the event importance_score: Calculated importance score category: Memory category ("stored" or "log") Returns: "short_term" or "long_term" """ # CRITICAL: Only "stored" memories should be classified as long-term # Orchestration logs should always be short-term to avoid confusion if category == "log": return "short_term" rules = self.decay_config["memory_type_rules"] # Check explicit rules first (only for stored memories) if event_type in rules["long_term_events"]: return "long_term" if event_type in rules["short_term_events"]: return "short_term" # Fallback to importance score (only for stored memories) return "long_term" if importance_score >= 0.7 else "short_term" def _classify_memory_category( self, event_type: str, agent_id: str, payload: dict[str, Any], log_type: str = "log", ) -> str: """ Classify memory entry category for separation between logs and stored memories. Args: event_type: Type of the event agent_id: ID of the agent generating the event payload: Event payload log_type: Explicit log type ("log" or "memory") Returns: "stored" for memory writer outputs, "log" for other events """ # 🎯 CRITICAL: Use explicit log_type parameter first if log_type == "memory": return "stored" elif log_type == "log": return "log" # Fallback to legacy detection (for backward compatibility) # Memory writes from memory writer nodes should be categorized as "stored" if event_type == "write" and ("memory" in agent_id.lower() or "writer" in agent_id.lower()): return "stored" # Check payload for memory content indicators if isinstance(payload, dict): # If payload contains content field, it's likely stored memory if payload.get("content") and payload.get("metadata"): return "stored" # If it's a memory operation result if payload.get("memory_object") or payload.get("memories"): return "stored" # Default to log for orchestration events return "log" def _start_decay_scheduler(self): """Start the automatic decay scheduler thread.""" if self._decay_thread is not None: return # Already running def decay_scheduler(): interval_seconds = self.decay_config["check_interval_minutes"] * 60 while not self._decay_stop_event.wait(interval_seconds): try: self.cleanup_expired_memories() except Exception as e: logger.error(f"Error during automatic memory decay: {e}") self._decay_thread = threading.Thread(target=decay_scheduler, daemon=True) self._decay_thread.start() logger.info( f"Started automatic memory decay scheduler (interval: {self.decay_config['check_interval_minutes']} minutes)", )
[docs] def stop_decay_scheduler(self): """Stop the automatic decay scheduler.""" if self._decay_thread is not None: self._decay_stop_event.set() self._decay_thread.join(timeout=5) self._decay_thread = None logger.info("Stopped automatic memory decay scheduler")
[docs] @abstractmethod def cleanup_expired_memories(self, dry_run: bool = False) -> dict[str, Any]: """ Clean up expired memory entries based on decay configuration. Args: dry_run: If True, return what would be deleted without actually deleting Returns: Dictionary containing cleanup statistics """
[docs] @abstractmethod def get_memory_stats(self) -> dict[str, Any]: """ Get memory usage statistics. Returns: Dictionary containing memory statistics """
[docs] @abstractmethod def log( self, agent_id: str, event_type: str, payload: dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: dict[str, Any] | None = None, agent_decay_config: dict[str, Any] | None = None, log_type: str = "log", # 🎯 NEW: "log" for orchestration, "memory" for stored memories ) -> None: """Log an event to the memory backend."""
[docs] @abstractmethod def tail(self, count: int = 10) -> list[dict[str, Any]]: """Retrieve the most recent events."""
[docs] @abstractmethod def hset(self, name: str, key: str, value: str | bytes | int | float) -> int: """Set a field in a hash structure."""
[docs] @abstractmethod def hget(self, name: str, key: str) -> str | None: """Get a field from a hash structure."""
[docs] @abstractmethod def hkeys(self, name: str) -> list[str]: """Get all keys in a hash structure."""
[docs] @abstractmethod def hdel(self, name: str, *keys: str) -> int: """Delete fields from a hash structure."""
[docs] @abstractmethod def smembers(self, name: str) -> list[str]: """Get all members of a set."""
[docs] @abstractmethod def sadd(self, name: str, *values: str) -> int: """Add members to a set."""
[docs] @abstractmethod def srem(self, name: str, *values: str) -> int: """Remove members from a set."""
[docs] @abstractmethod def get(self, key: str) -> str | None: """Get a value by key."""
[docs] @abstractmethod def set(self, key: str, value: str | bytes | int | float) -> bool: """Set a value by key."""
[docs] @abstractmethod def delete(self, *keys: str) -> int: """Delete keys."""
def _compute_blob_hash(self, obj: Any) -> str: """ Compute SHA256 hash of a JSON-serializable object. Args: obj: Object to hash Returns: SHA256 hash as hex string """ try: # Convert to canonical JSON string for consistent hashing json_str = json.dumps(obj, sort_keys=True, separators=(",", ":")) return hashlib.sha256(json_str.encode("utf-8")).hexdigest() except Exception: # If object can't be serialized, return hash of string representation return hashlib.sha256(str(obj).encode("utf-8")).hexdigest() def _should_deduplicate_blob(self, obj: Any) -> bool: """ Determine if an object should be deduplicated as a blob. Args: obj: Object to check Returns: True if object should be deduplicated """ try: # Only deduplicate JSON responses and large payloads if not isinstance(obj, dict): return False # Check if it looks like a JSON response has_response = "response" in obj has_result = "result" in obj if not (has_response or has_result): return False # Check size threshold json_str = json.dumps(obj, separators=(",", ":")) return len(json_str) >= self._blob_threshold except Exception: return False def _store_blob(self, obj: Any) -> str: """ Store a blob and return its reference hash. Args: obj: Object to store as blob Returns: SHA256 hash reference """ blob_hash = self._compute_blob_hash(obj) # Store the blob if not already present if blob_hash not in self._blob_store: self._blob_store[blob_hash] = obj self._blob_usage[blob_hash] = 0 # Increment usage count self._blob_usage[blob_hash] += 1 return blob_hash def _create_blob_reference( self, blob_hash: str, original_keys: list[str] = None, ) -> dict[str, Any]: """ Create a blob reference object. Args: blob_hash: SHA256 hash of the blob original_keys: List of keys that were in the original object (for reference) Returns: Blob reference dictionary """ ref = { "ref": blob_hash, "_type": "blob_reference", } if original_keys: ref["_original_keys"] = original_keys return ref def _deduplicate_object(self, obj: Any) -> Any: """ Recursively deduplicate an object, replacing large blobs with references. Args: obj: Object to deduplicate Returns: Deduplicated object with blob references """ if not isinstance(obj, dict): return obj # Check if this object should be stored as a blob if self._should_deduplicate_blob(obj): blob_hash = self._store_blob(obj) return self._create_blob_reference(blob_hash, list(obj.keys())) # Recursively deduplicate nested objects deduplicated = {} for key, value in obj.items(): if isinstance(value, dict): deduplicated[key] = self._deduplicate_object(value) elif isinstance(value, list): deduplicated[key] = [ self._deduplicate_object(item) if isinstance(item, dict) else item for item in value ] else: deduplicated[key] = value return deduplicated