Source code for orka.tui.components

"""
UI component builders for TUI interface panels and displays.
"""

import datetime

try:
    from rich.align import Align
    from rich.box import HEAVY, ROUNDED, SIMPLE
    from rich.layout import Layout
    from rich.markup import escape
    from rich.panel import Panel
    from rich.table import Table
    from rich.text import Text

    RICH_AVAILABLE = True
except ImportError:
    RICH_AVAILABLE = False


[docs] class ComponentBuilder: """Builds UI components for the TUI interface.""" def __init__(self, data_manager): self.data_manager = data_manager @property def stats(self): return self.data_manager.stats @property def memory_data(self): return self.data_manager.memory_data @property def performance_history(self): return self.data_manager.performance_history @property def memory_logger(self): return self.data_manager.memory_logger @property def backend(self): return self.data_manager.backend @property def running(self): return getattr(self.data_manager, "running", True)
[docs] def create_compact_header(self): """Create a compact header.""" current_time = datetime.datetime.now().strftime("%H:%M:%S") status_color = "green" if self.running else "red" header_text = Text() header_text.append("🚀 OrKa Monitor ", style="bold blue") header_text.append(f"| {self.backend.upper()} ", style="cyan") header_text.append(f"| {current_time} ", style="dim") header_text.append("●", style=f"bold {status_color}") return Panel(Align.center(header_text), box=SIMPLE, style="blue")
[docs] def create_compact_stats_panel(self): """Create a compact stats panel with comprehensive metrics.""" if not self.stats.current: return Panel("Loading...", title="📊 Memory Statistics") stats = self.stats.current # Create a detailed table with all metrics table = Table(show_header=False, box=None, padding=(0, 1)) table.add_column(style="cyan", width=14) table.add_column(style="white", width=8, justify="right") table.add_column(style="green", width=6) table.add_column(style="yellow", width=4) # Core metrics with trends core_metrics = [ ("Total Entries", stats.get("total_entries", 0), "entries"), ("Stored Memories", stats.get("stored_memories", 0), "mem"), ("Orchestration", stats.get("orchestration_logs", 0), "logs"), ("Active", stats.get("active_entries", 0), "act"), ("Expired", stats.get("expired_entries", 0), "exp"), ] for name, value, unit in core_metrics: # Get trend information key = name.lower().replace(" ", "_") trend = self.stats.get_trend(key) rate = self.stats.get_rate(key) # Trend icon if trend == "↗": trend_display = "[green]↗[/green]" elif trend == "↘": trend_display = "[red]↘[/red]" else: trend_display = "[dim]→[/dim]" table.add_row(f" {name}", f"[bold]{value:,}[/bold]", unit, trend_display) # Backend health with more details table.add_row("", "", "", "") # Separator decay_enabled = stats.get("decay_enabled", False) backend_status = "✅" if hasattr(self.memory_logger, "client") else "❌" table.add_row(" Backend", f"{self.backend.upper()}", backend_status, "") table.add_row(" Decay", "✅" if decay_enabled else "❌", "auto", "") # Performance if available if self.performance_history: latest_perf = self.performance_history[-1] avg_time = latest_perf.get("average_search_time", 0) perf_icon = "⚡" if avg_time < 0.1 else "⚠" if avg_time < 0.5 else "🐌" table.add_row(" Search", f"{avg_time:.3f}s", "time", f"[cyan]{perf_icon}[/cyan]") return Panel(table, title="📊 Memory Statistics & Health", box=ROUNDED)
[docs] def create_compact_memories_panel(self): """Create a compact memories panel with comprehensive details.""" if not self.memory_data: return Panel("No memories", title="🧠 Recent Memories") table = Table(show_header=True, header_style="bold magenta", box=None, padding=(0, 1)) table.add_column("Time", style="dim", width=5) table.add_column("Node", style="cyan", width=10) table.add_column("Type", style="green", width=8) table.add_column("Content", style="white", width=25) table.add_column("Score", style="yellow", width=4) table.add_column("TTL", style="red", width=8) # Show 6 memories with full details for i, mem in enumerate(self.memory_data[:6]): # Handle bytes content raw_content = mem.get("content", "") if isinstance(raw_content, bytes): raw_content = raw_content.decode("utf-8", errors="replace") content = raw_content[:22] + ("..." if len(raw_content) > 22 else "") # Handle bytes for node_id raw_node_id = mem.get("node_id", "unknown") node_id = ( raw_node_id.decode("utf-8", errors="replace") if isinstance(raw_node_id, bytes) else str(raw_node_id) )[:8] # Limit node_id length # Handle memory type raw_memory_type = mem.get("memory_type", "unknown") memory_type = ( raw_memory_type.decode("utf-8", errors="replace") if isinstance(raw_memory_type, bytes) else str(raw_memory_type) )[:6] # Shorten type # Handle importance score raw_importance = mem.get("importance_score", 0) if isinstance(raw_importance, bytes): try: importance = float(raw_importance.decode()) except: importance = 0.0 else: importance = float(raw_importance) if raw_importance else 0.0 # Handle timestamp try: raw_timestamp = mem.get("timestamp", 0) if isinstance(raw_timestamp, bytes): timestamp = int(raw_timestamp.decode()) else: timestamp = int(raw_timestamp) if raw_timestamp else 0 if timestamp > 1000000000000: # milliseconds dt = datetime.datetime.fromtimestamp(timestamp / 1000) else: # seconds dt = datetime.datetime.fromtimestamp(timestamp) time_str = dt.strftime("%H:%M") except: time_str = "??:??" # Handle TTL raw_ttl = mem.get("ttl_formatted", "?") ttl = ( raw_ttl.decode("utf-8", errors="replace") if isinstance(raw_ttl, bytes) else str(raw_ttl) )[:8] # Limit TTL length # Color code TTL if "h" in ttl and int(ttl.split("h")[0]) > 1: ttl_display = f"[green]{ttl}[/green]" elif "m" in ttl or ("h" in ttl and int(ttl.split("h")[0]) <= 1): ttl_display = f"[yellow]{ttl}[/yellow]" elif ttl == "Never": ttl_display = "[blue]∞[/blue]" else: ttl_display = f"[red]{ttl}[/red]" table.add_row( time_str, node_id, memory_type, content, f"{importance:.1f}", ttl_display, ) return Panel( table, title=f"🧠 Recent Memories ({len(self.memory_data)} total)", box=ROUNDED, )
[docs] def create_compact_performance_panel(self): """Create a compact performance panel with comprehensive metrics.""" table = Table(show_header=False, box=None, padding=(0, 1)) table.add_column(style="cyan", width=11) table.add_column(style="white", width=8, justify="right") table.add_column(style="green", width=6) table.add_column(style="yellow", width=4) if not self.performance_history: table.add_row(" Status", "Collecting", "data", "⏳") return Panel(table, title="⚡ Performance & System", box=ROUNDED) latest_perf = self.performance_history[-1] # Performance metrics with status indicators avg_search_time = latest_perf.get("average_search_time", 0) if avg_search_time < 0.1: perf_status = "⚡" elif avg_search_time < 0.5: perf_status = "⚠" else: perf_status = "🐌" table.add_row( " Search Speed", f"{avg_search_time:.3f}s", "time", f"[cyan]{perf_status}[/cyan]", ) # Vector search metrics for RedisStack if self.backend == "redisstack": try: if hasattr(self.memory_logger, "client"): # HNSW Index status index_info = self.memory_logger.client.ft("enhanced_memory_idx").info() docs = index_info.get("num_docs", 0) indexing = index_info.get("indexing", False) table.add_row(" Vector Docs", f"{docs:,}", "docs", "📊") table.add_row( " HNSW Index", "Active" if indexing else "Idle", "status", "✅" if indexing else "⏸", ) # Redis system info redis_info = self.memory_logger.client.info() memory_used = redis_info.get("used_memory_human", "N/A") clients = redis_info.get("connected_clients", 0) ops_per_sec = redis_info.get("instantaneous_ops_per_sec", 0) table.add_row(" Memory Used", memory_used, "mem", "💾") table.add_row(" Clients", f"{clients}", "conn", "🔗") table.add_row( " Ops/sec", f"{ops_per_sec}", "rate", "⚡" if ops_per_sec > 10 else "📈", ) # Module detection try: modules = self.memory_logger.client.execute_command("MODULE", "LIST") module_count = len(modules) if modules else 0 table.add_row(" Modules", f"{module_count}", "ext", "🔌") except: table.add_row(" Modules", "Unknown", "ext", "❓") except Exception as e: table.add_row(" Vector", "Error", "state", "❌") table.add_row(" Redis", str(e)[:6], "err", "💥") else: # Basic Redis metrics table.add_row(" Backend", self.backend.upper(), "type", "🗄️") table.add_row( " Status", "Connected", "conn", "✅" if hasattr(self.memory_logger, "client") else "❌", ) # Memory operations if available if hasattr(self.memory_logger, "get_performance_metrics"): try: perf = self.memory_logger.get_performance_metrics() writes = perf.get("memory_writes", 0) reads = perf.get("memory_reads", 0) table.add_row(" Writes/min", f"{writes}", "ops", "✏️") table.add_row(" Reads/min", f"{reads}", "ops", "👁️") except: pass return Panel(table, title="⚡ Performance & System Health", box=ROUNDED)
[docs] def create_header(self): """Create header with title and status.""" current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") status_color = "green" if self.running else "red" backend_info = f"Backend: [bold]{self.backend}[/bold]" header_text = Text() header_text.append("🚀 OrKa Memory Monitor ", style="bold blue") header_text.append(f"| {backend_info} ", style="dim") header_text.append(f"| {current_time} ", style="dim") header_text.append("● LIVE", style=f"bold {status_color}") return Panel( Align.center(header_text), box=HEAVY, style="blue", )
[docs] def create_stats_panel(self): """Create comprehensive memory statistics panel with trending.""" if not self.stats.current: return Panel("Loading statistics...", title="📊 Memory Statistics") stats = self.stats.current # Create statistics table with trending information table = Table(show_header=False, box=None, padding=(0, 1)) table.add_column(style="cyan", width=18) table.add_column(style="white", width=12) table.add_column(style="green", width=8) table.add_column(style="yellow", width=6) # Core metrics with trends core_metrics = [ ("Total Entries", stats.get("total_entries", 0), "entries"), ("Stored Memories", stats.get("stored_memories", 0), "memories"), ("Orchestration Logs", stats.get("orchestration_logs", 0), "logs"), ("Active Entries", stats.get("active_entries", 0), "active"), ("Expired Entries", stats.get("expired_entries", 0), "expired"), ] table.add_row("[bold]Core Metrics:[/bold]", "", "", "") for name, value, unit in core_metrics: # Get trend and rate information key = name.lower().replace(" ", "_") trend = self.stats.get_trend(key) rate = self.stats.get_rate(key) # Format rate display rate_text = "" if abs(rate) > 0.01: if rate > 0: rate_text = f"[green]+{rate:.1f}/s[/green]" else: rate_text = f"[red]{rate:.1f}/s[/red]" else: rate_text = "[dim]stable[/dim]" # Trend icon with color if trend == "↗": trend_display = "[green]↗[/green]" elif trend == "↘": trend_display = "[red]↘[/red]" else: trend_display = "[dim]→[/dim]" table.add_row( f" {name}", f"[bold]{value:,}[/bold]", unit, f"{trend_display} {rate_text}", ) # Backend health indicators table.add_row("", "", "", "") # Separator table.add_row("[bold]Backend Health:[/bold]", "", "", "") # Decay status decay_enabled = stats.get("decay_enabled", False) decay_status = "✅ Active" if decay_enabled else "❌ Inactive" table.add_row(" Memory Decay", decay_status, "", "") # Backend type with status backend_status = "✅ Online" if hasattr(self.memory_logger, "client") else "❌ Offline" table.add_row(" Backend", f"{self.backend.upper()}", backend_status, "") # Performance indicator if self.performance_history: latest_perf = self.performance_history[-1] avg_search_time = latest_perf.get("average_search_time", 0) if avg_search_time < 0.1: perf_status = "[green]⚡ Fast[/green]" elif avg_search_time < 0.5: perf_status = "[yellow]⚠ Moderate[/yellow]" else: perf_status = "[red]🐌 Slow[/red]" table.add_row(" Performance", perf_status, f"{avg_search_time:.3f}s", "") return Panel(table, title="📊 Memory Statistics & Health", box=ROUNDED)
[docs] def create_recent_memories_panel(self): """Create recent memories panel with full details.""" if not self.memory_data: return Panel("No memories found", title="🧠 Recent Memories") table = Table(show_header=True, header_style="bold magenta", box=ROUNDED) table.add_column("Time", style="dim", width=8) table.add_column("Node", style="cyan", width=15) table.add_column("Type", style="green", width=12) table.add_column("Content", style="white", width=40) table.add_column("Score", style="yellow", width=6) table.add_column("TTL", style="red", width=12) for i, mem in enumerate(self.memory_data[:8]): # Show top 8 for better detail # Handle bytes content with better decoding raw_content = mem.get("content", "") if isinstance(raw_content, bytes): raw_content = raw_content.decode("utf-8", errors="replace") content = raw_content[:35] + ("..." if len(raw_content) > 35 else "") # Handle bytes for node_id raw_node_id = mem.get("node_id", "unknown") node_id = ( raw_node_id.decode("utf-8", errors="replace") if isinstance(raw_node_id, bytes) else str(raw_node_id) ) # Handle memory type raw_memory_type = mem.get("memory_type", "unknown") memory_type = ( raw_memory_type.decode("utf-8", errors="replace") if isinstance(raw_memory_type, bytes) else str(raw_memory_type) ) # Handle importance score raw_importance = mem.get("importance_score", 0) if isinstance(raw_importance, bytes): try: importance = float(raw_importance.decode()) except: importance = 0.0 else: importance = float(raw_importance) if raw_importance else 0.0 # Handle timestamp with better formatting try: raw_timestamp = mem.get("timestamp", 0) if isinstance(raw_timestamp, bytes): timestamp = int(raw_timestamp.decode()) else: timestamp = int(raw_timestamp) if raw_timestamp else 0 if timestamp > 1000000000000: # milliseconds dt = datetime.datetime.fromtimestamp(timestamp / 1000) else: # seconds dt = datetime.datetime.fromtimestamp(timestamp) time_str = dt.strftime("%H:%M:%S") except: time_str = "??:??:??" # Handle TTL with full information raw_ttl = mem.get("ttl_formatted", "N/A") ttl = ( raw_ttl.decode("utf-8", errors="replace") if isinstance(raw_ttl, bytes) else str(raw_ttl) ) raw_expires = mem.get("expires_at_formatted", "") expires_at = ( raw_expires.decode("utf-8", errors="replace") if isinstance(raw_expires, bytes) else str(raw_expires) ) has_expiry = mem.get("has_expiry", False) # Enhanced TTL display with color coding if ttl == "0s" or "Expired" in ttl: ttl_style = f"[red]💀 {ttl}[/red]" elif "Never" in ttl: ttl_style = f"[green]♾️ {ttl}[/green]" elif any(unit in ttl for unit in ["s", "m", "h"]): if "h" in ttl: ttl_style = f"[green]⏰ {ttl}[/green]" elif "m" in ttl: ttl_style = f"[yellow]⏰ {ttl}[/yellow]" else: # seconds ttl_style = f"[red]⚠️ {ttl}[/red]" else: ttl_style = ttl # Memory type color coding type_color = ( "green" if memory_type == "long_term" else "yellow" if memory_type == "short_term" else "dim" ) table.add_row( time_str, node_id[:15], f"[{type_color}]{memory_type[:12]}[/{type_color}]", escape(content), f"{importance:.2f}", ttl_style, ) return Panel(table, title="🧠 Recent Stored Memories", box=ROUNDED)
[docs] def create_simple_chart(self, data, width=25, height=3): """Create a simple ASCII chart for trending data.""" if not data or len(data) < 2: return "[dim]No data[/dim]" # Normalize data to chart height max_val = max(data) if max(data) > 0 else 1 min_val = min(data) if max_val == min_val: return "[dim]Stable[/dim]" chart_lines = [] for row in range(height): line = "" for i, value in enumerate(data[-width:]): normalized = (value - min_val) / (max_val - min_val) * (height - 1) if normalized >= (height - 1 - row): line += "█" else: line += " " chart_lines.append(line) return "\n".join(chart_lines)
# Placeholder methods for missing components referenced in the original code
[docs] def create_memory_browser(self): """Create memory browser view (placeholder).""" return Panel("Memory browser not implemented yet", title="🧠 Memory Browser")
[docs] def create_performance_view(self): """Create performance view (placeholder).""" return Panel("Performance view not implemented yet", title="⚡ Performance View")
[docs] def create_performance_panel(self): """Create comprehensive performance metrics panel.""" if not self.performance_history and not hasattr( self.memory_logger, "get_performance_metrics", ): return Panel("No performance data available", title="🚀 Performance") # Create layout for performance view layout = Layout() layout.split_column( Layout(name="header", size=3), Layout(name="main"), Layout(name="footer", size=3), ) # Split main into performance sections layout["main"].split_row( Layout(name="left"), Layout(name="right"), ) layout["left"].split_column( Layout(name="performance_metrics", size=15), Layout(name="quality_metrics"), ) layout["right"].update( Panel( "[dim]Performance charts would go here[/dim]", title="📈 Performance Trends", box=ROUNDED, ), ) layout["header"].update( Panel( "🚀 Performance Monitoring - Real-time Memory & System Metrics", box=HEAVY, style="bold green", ), ) # Create performance table with comprehensive metrics table = Table(show_header=False, box=None, padding=(0, 1)) table.add_column(style="cyan", width=20) table.add_column(style="white", width=15) table.add_column(style="green", width=10) # Get latest performance metrics if hasattr(self.memory_logger, "get_performance_metrics"): try: latest = self.memory_logger.get_performance_metrics() # Core performance metrics table.add_row("[bold]Search Performance:[/bold]", "", "") table.add_row(" HNSW Searches", f"{latest.get('hybrid_searches', 0):,}", "ops") table.add_row(" Vector Searches", f"{latest.get('vector_searches', 0):,}", "ops") table.add_row( " Avg Search Time", f"{latest.get('average_search_time', 0):.3f}", "sec", ) table.add_row( " Cache Hit Rate", f"{(latest.get('cache_hits', 0) / max(1, latest.get('total_searches', 1)) * 100):.1f}", "%", ) table.add_row("", "", "") # Separator table.add_row("[bold]Memory Operations:[/bold]", "", "") table.add_row(" Memory Writes", f"{latest.get('memory_writes', 0):,}", "/min") table.add_row(" Memory Reads", f"{latest.get('memory_reads', 0):,}", "/min") table.add_row(" Total Memories", f"{latest.get('memory_count', 0):,}", "stored") # Index health (RedisStack specific) index_status = latest.get("index_status", {}) if index_status and index_status.get("status") != "unavailable": table.add_row("", "", "") # Separator table.add_row("[bold]HNSW Index Health:[/bold]", "", "") table.add_row( " Index Status", "✅ Active" if index_status.get("indexing", False) else "⏸️ Idle", "", ) table.add_row(" Documents", f"{index_status.get('num_docs', 0):,}", "docs") table.add_row( " Index Progress", f"{index_status.get('percent_indexed', 100):.1f}", "%", ) if index_status.get("index_options"): opts = index_status["index_options"] table.add_row(" HNSW M", str(opts.get("M", 16)), "") table.add_row( " EF Construction", str(opts.get("ef_construction", 200)), "", ) except Exception as e: table.add_row("Performance Error:", str(e)[:30], "") layout["performance_metrics"].update( Panel(table, title="⚡ Performance Metrics", box=ROUNDED), ) # Quality metrics quality_table = Table(show_header=False, box=None, padding=(0, 1)) quality_table.add_column(style="cyan", width=18) quality_table.add_column(style="white", width=12) quality_table.add_column(style="green", width=8) if hasattr(self.memory_logger, "get_performance_metrics"): try: perf = self.memory_logger.get_performance_metrics() quality_metrics = perf.get("memory_quality", {}) if quality_metrics: quality_table.add_row("[bold]Memory Quality:[/bold]", "", "") quality_table.add_row( " Avg Importance", f"{quality_metrics.get('avg_importance_score', 0):.2f}", "/5.0", ) quality_table.add_row( " Long-term %", f"{quality_metrics.get('long_term_percentage', 0):.1f}", "%", ) quality_table.add_row( " High Quality %", f"{quality_metrics.get('high_quality_percentage', 0):.1f}", "%", ) quality_table.add_row( " Avg Content Size", f"{quality_metrics.get('avg_content_length', 0):.0f}", "chars", ) quality_table.add_row("", "", "") # Separator quality_table.add_row("[bold]Quality Distribution:[/bold]", "", "") # Quality score distribution score_ranges = quality_metrics.get("score_distribution", {}) for range_name, count in score_ranges.items(): quality_table.add_row(f" {range_name}", f"{count:,}", "memories") else: quality_table.add_row("No quality metrics", "available", "") except Exception as e: quality_table.add_row("Quality Error:", str(e)[:15], "") layout["quality_metrics"].update( Panel(quality_table, title="⭐ Memory Quality", box=ROUNDED), ) layout["footer"].update(self.create_footer()) return layout
[docs] def create_config_view(self): """Create comprehensive configuration view with backend testing.""" import time layout = Layout() layout.split_column( Layout(name="header", size=3), Layout(name="main"), Layout(name="footer", size=3), ) # Split main for configuration sections layout["main"].split_row( Layout(name="left"), Layout(name="right"), ) layout["left"].split_column( Layout(name="backend_config", size=15), Layout(name="decay_config", size=12), Layout(name="connection_test"), ) layout["right"].split_column( Layout(name="system_info", size=18), Layout(name="module_info"), ) layout["header"].update( Panel( "🔧 Configuration & System Health - Backend Testing & Diagnostics", box=HEAVY, style="bold magenta", ), ) # Backend configuration testing backend_table = Table(show_header=False, box=None, padding=(0, 1)) backend_table.add_column(style="cyan", width=20) backend_table.add_column(style="white", width=15) backend_table.add_column(style="green", width=8) backend_table.add_row("[bold]Backend Configuration:[/bold]", "", "") backend_table.add_row(" Type", self.backend.upper(), "") # Test backend connectivity if hasattr(self.memory_logger, "client"): try: self.memory_logger.client.ping() backend_table.add_row(" Connection", "✅ Active", "") # Get Redis info redis_info = self.memory_logger.client.info() backend_table.add_row( " Redis Version", redis_info.get("redis_version", "Unknown"), "", ) backend_table.add_row(" Mode", redis_info.get("redis_mode", "standalone"), "") # Test memory operations try: test_key = "orka:tui:health_check" self.memory_logger.client.set(test_key, "test", ex=5) test_result = self.memory_logger.client.get(test_key) if test_result: backend_table.add_row(" Read/Write", "✅ Working", "") self.memory_logger.client.delete(test_key) else: backend_table.add_row(" Read/Write", "❌ Failed", "") except Exception: backend_table.add_row(" Read/Write", "❌ Error", "") except Exception as e: backend_table.add_row(" Connection", "❌ Failed", "") backend_table.add_row(" Error", str(e)[:15], "") else: backend_table.add_row(" Connection", "❌ No Client", "") # Backend-specific tests if self.backend == "redisstack": backend_table.add_row("", "", "") # Separator backend_table.add_row("[bold]RedisStack Tests:[/bold]", "", "") # Test vector search capabilities try: if hasattr(self.memory_logger, "client"): # Check for search module modules = self.memory_logger.client.execute_command("MODULE", "LIST") has_search = any("search" in str(module).lower() for module in modules) if has_search: backend_table.add_row(" Search Module", "✅ Loaded", "") # Test index existence try: index_info = self.memory_logger.client.ft("enhanced_memory_idx").info() backend_table.add_row(" HNSW Index", "✅ Available", "") backend_table.add_row( " Documents", f"{index_info.get('num_docs', 0):,}", "docs", ) except Exception: backend_table.add_row(" HNSW Index", "❌ Missing", "") else: backend_table.add_row(" Search Module", "❌ Missing", "") except Exception as e: backend_table.add_row(" Module Check", f"❌ {str(e)[:10]}", "") layout["backend_config"].update( Panel(backend_table, title="🔌 Backend Health", box=ROUNDED), ) # Decay configuration decay_table = Table(show_header=False, box=None, padding=(0, 1)) decay_table.add_column(style="cyan", width=18) decay_table.add_column(style="white", width=15) decay_table.add_column(style="green", width=8) if hasattr(self.memory_logger, "decay_config"): config = self.memory_logger.decay_config decay_table.add_row("[bold]Memory Decay:[/bold]", "", "") if config and config.get("enabled", False): decay_table.add_row(" Status", "✅ Enabled", "") decay_table.add_row( " Short-term TTL", f"{config.get('default_short_term_hours', 1)}", "hours", ) decay_table.add_row( " Long-term TTL", f"{config.get('default_long_term_hours', 24)}", "hours", ) decay_table.add_row( " Check Interval", f"{config.get('check_interval_minutes', 30)}", "min", ) # Test decay functionality try: test_result = self.memory_logger.cleanup_expired_memories(dry_run=True) decay_table.add_row(" Cleanup Test", "✅ Working", "") decay_table.add_row( " Last Check", test_result.get("duration_seconds", 0), "sec", ) except Exception: decay_table.add_row(" Cleanup Test", "❌ Error", "") else: decay_table.add_row(" Status", "❌ Disabled", "") decay_table.add_row(" Reason", "Not configured", "") else: decay_table.add_row(" Status", "❌ Unavailable", "") layout["decay_config"].update(Panel(decay_table, title="⏰ Memory Decay", box=ROUNDED)) # Connection testing conn_table = Table(show_header=False, box=None, padding=(0, 1)) conn_table.add_column(style="cyan", width=20) conn_table.add_column(style="white", width=12) conn_table.add_column(style="green", width=8) conn_table.add_row("[bold]Connection Testing:[/bold]", "", "") # Test different operations if hasattr(self.memory_logger, "client"): try: # Latency test start_time = time.time() self.memory_logger.client.ping() latency = (time.time() - start_time) * 1000 if latency < 5: latency_status = "[green]⚡ Excellent[/green]" elif latency < 20: latency_status = "[yellow]⚠ Good[/yellow]" else: latency_status = "[red]🐌 Slow[/red]" conn_table.add_row(" Ping Latency", f"{latency:.1f}ms", latency_status) # Memory stats test start_time = time.time() stats = self.memory_logger.get_memory_stats() stats_time = (time.time() - start_time) * 1000 conn_table.add_row(" Stats Query", f"{stats_time:.1f}ms", "✅") # Search test (if available) if hasattr(self.memory_logger, "search_memories"): start_time = time.time() search_results = self.memory_logger.search_memories(" ", num_results=1) search_time = (time.time() - start_time) * 1000 conn_table.add_row(" Search Test", f"{search_time:.1f}ms", "✅") except Exception as e: conn_table.add_row(" Test Failed", str(e)[:15], "❌") layout["connection_test"].update( Panel(conn_table, title="🔍 Connection Tests", box=ROUNDED), ) # System information (comprehensive) system_table = Table(show_header=False, box=None, padding=(0, 1)) system_table.add_column(style="cyan", width=18) system_table.add_column(style="white", width=15) system_table.add_column(style="green", width=8) if hasattr(self.memory_logger, "client"): try: redis_info = self.memory_logger.client.info() system_table.add_row("[bold]Redis System:[/bold]", "", "") system_table.add_row(" Version", redis_info.get("redis_version", "Unknown"), "") system_table.add_row( " Architecture", redis_info.get("arch_bits", "Unknown"), "bit", ) system_table.add_row(" OS", redis_info.get("os", "Unknown"), "") system_table.add_row("", "", "") # Separator system_table.add_row("[bold]Memory Usage:[/bold]", "", "") system_table.add_row( " Used Memory", redis_info.get("used_memory_human", "N/A"), "", ) system_table.add_row( " Peak Memory", redis_info.get("used_memory_peak_human", "N/A"), "", ) system_table.add_row( " Memory Ratio", f"{redis_info.get('used_memory_peak_perc', '0')}%", "", ) except Exception as e: system_table.add_row("System Error:", str(e)[:15], "") layout["system_info"].update(Panel(system_table, title="🖥️ System Information", box=ROUNDED)) # Module information module_table = Table(show_header=True, header_style="bold cyan", box=ROUNDED) module_table.add_column("Module", style="cyan", width=15) module_table.add_column("Version", style="white", width=12) module_table.add_column("Status", style="green", width=10) if hasattr(self.memory_logger, "client"): try: modules = self.memory_logger.client.execute_command("MODULE", "LIST") if modules: for module in modules: if isinstance(module, list) and len(module) >= 4: name = ( module[1].decode() if isinstance(module[1], bytes) else str(module[1]) ) version = ( module[3].decode() if isinstance(module[3], bytes) else str(module[3]) ) # Status based on module name if "search" in name.lower(): status = "✅ Vector Ready" elif "json" in name.lower(): status = "✅ JSON Ready" elif "timeseries" in name.lower(): status = "✅ TS Ready" else: status = "✅ Loaded" module_table.add_row(name, version, status) else: module_table.add_row("No modules", "N/A", "❌ Plain Redis") except Exception as e: module_table.add_row("Error", str(e)[:10], "❌ Failed") layout["module_info"].update(Panel(module_table, title="🔌 Redis Modules", box=ROUNDED)) layout["footer"].update(self.create_footer()) return layout