Source code for orka.cli.memory.watch

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Memory Watch Functionality
==========================

This module contains memory watch functionality with TUI interface support.
"""

import json
import os
import sys
import time

from orka.memory_logger import create_memory_logger


[docs] def memory_watch(args): """Modern TUI interface with Textual (default) or Rich fallback.""" # Check if user explicitly wants fallback interface if getattr(args, "fallback", False): print("ℹ️ Using basic terminal interface as requested") return _memory_watch_fallback(args) try: # Use the modern TUI interface (defaults to Textual) from orka.tui_interface import ModernTUIInterface tui = ModernTUIInterface() return tui.run(args) except ImportError as e: print(f"❌ Could not import TUI interface: {e}") print("Falling back to basic terminal interface...") return _memory_watch_fallback(args) except Exception as e: print(f"❌ Error starting memory watch: {e}", file=sys.stderr) import traceback traceback.print_exc() return 1
def _memory_watch_fallback(args): """Fallback memory watch with basic interface.""" try: backend = getattr(args, "backend", None) or os.getenv("ORKA_MEMORY_BACKEND", "redisstack") # Provide proper Redis URL based on backend if backend == "redisstack": redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0") else: redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") memory = create_memory_logger(backend=backend, redis_url=redis_url) if getattr(args, "json", False): return _memory_watch_json(memory, backend, args) else: return _memory_watch_display(memory, backend, args) except Exception as e: print(f"❌ Error in fallback memory watch: {e}", file=sys.stderr) return 1 def _memory_watch_json(memory, backend: str, args): """JSON mode memory watch with continuous updates.""" try: while True: try: stats = memory.get_memory_stats() output = { "timestamp": stats.get("timestamp"), "backend": backend, "stats": stats, } # Add recent stored memories try: if hasattr(memory, "get_recent_stored_memories"): recent_memories = memory.get_recent_stored_memories(5) elif hasattr(memory, "search_memories"): recent_memories = memory.search_memories( query=" ", num_results=5, log_type="memory", ) else: recent_memories = [] output["recent_stored_memories"] = recent_memories except Exception as e: output["recent_memories_error"] = str(e) # Add performance metrics for RedisStack if backend == "redisstack" and hasattr(memory, "get_performance_metrics"): try: output["performance"] = memory.get_performance_metrics() except Exception: pass print(json.dumps(output, indent=2, default=str)) time.sleep(args.interval) except KeyboardInterrupt: break except Exception as e: print(json.dumps({"error": str(e), "backend": backend}), file=sys.stderr) time.sleep(args.interval) except KeyboardInterrupt: pass return 0 def _memory_watch_display(memory, backend: str, args): """Interactive display mode with continuous updates.""" try: while True: try: # Clear screen unless disabled if not getattr(args, "no_clear", False): os.system("cls" if os.name == "nt" else "clear") print("=== OrKa Memory Watch ===") print( f"Backend: {backend} | Interval: {getattr(args, 'interval', 5)}s | Press Ctrl+C to exit", ) print("-" * 60) # Get comprehensive stats stats = memory.get_memory_stats() # Display basic metrics print("📊 Memory Statistics:") print(f" Total Entries: {stats.get('total_entries', 0)}") print(f" Active Entries: {stats.get('active_entries', 0)}") print(f" Expired Entries: {stats.get('expired_entries', 0)}") print(f" Stored Memories: {stats.get('stored_memories', 0)}") print(f" Orchestration Logs: {stats.get('orchestration_logs', 0)}") # Show recent stored memories print("\n🧠 Recent Stored Memories:") try: # Get recent memories using the dedicated method if hasattr(memory, "get_recent_stored_memories"): recent_memories = memory.get_recent_stored_memories(5) elif hasattr(memory, "search_memories"): recent_memories = memory.search_memories( query=" ", num_results=5, log_type="memory", ) else: recent_memories = [] if recent_memories: for i, mem in enumerate(recent_memories, 1): # Handle bytes content from decode_responses=False raw_content = mem.get("content", "") if isinstance(raw_content, bytes): raw_content = raw_content.decode() content = raw_content[:100] + ("..." if len(raw_content) > 100 else "") # Handle bytes for other fields raw_node_id = mem.get("node_id", "unknown") node_id = ( raw_node_id.decode() if isinstance(raw_node_id, bytes) else raw_node_id ) print(f" [{i}] {node_id}: {content}") else: print(" No stored memories found") except Exception as e: print(f" ❌ Error retrieving memories: {e}") time.sleep(getattr(args, "interval", 5)) except KeyboardInterrupt: break except Exception as e: print(f"❌ Error in memory watch: {e}", file=sys.stderr) time.sleep(getattr(args, "interval", 5)) except KeyboardInterrupt: pass return 0