Source code for orka.orchestrator.error_handling

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Error Handling
==============

Comprehensive error tracking, reporting, and recovery mechanisms.
"""

import json
import os
from datetime import datetime, timezone


[docs] class ErrorHandler: """ Handles error tracking, reporting, and recovery mechanisms. """ def _record_error( self, error_type, agent_id, error_msg, exception=None, step=None, status_code=None, recovery_action=None, ): """ Record an error in the error telemetry system. Args: error_type: Type of error (agent_failure, json_parsing, api_error, etc.) agent_id: ID of the agent that failed error_msg: Human readable error message exception: The actual exception object (optional) step: Step number where error occurred status_code: HTTP status code if applicable recovery_action: Action taken to recover (retry, fallback, etc.) """ error_entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "type": error_type, "agent_id": agent_id, "message": error_msg, "step": step or self.step_index, "run_id": self.run_id, } if exception: error_entry["exception"] = { "type": str(type(exception).__name__), "message": str(exception), "traceback": str(exception.__traceback__) if hasattr(exception, "__traceback__") else None, } if status_code: error_entry["status_code"] = status_code self.error_telemetry["status_codes"][agent_id] = status_code if recovery_action: error_entry["recovery_action"] = recovery_action self.error_telemetry["recovery_actions"].append( { "timestamp": error_entry["timestamp"], "agent_id": agent_id, "action": recovery_action, }, ) self.error_telemetry["errors"].append(error_entry) # Log error to console print(f"🚨 [ORKA-ERROR] {error_type} in {agent_id}: {error_msg}") def _record_retry(self, agent_id): """Record a retry attempt for an agent.""" if agent_id not in self.error_telemetry["retry_counters"]: self.error_telemetry["retry_counters"][agent_id] = 0 self.error_telemetry["retry_counters"][agent_id] += 1 def _record_partial_success(self, agent_id, retry_count): """Record that an agent succeeded after retries.""" self.error_telemetry["partial_successes"].append( { "timestamp": datetime.now(timezone.utc).isoformat(), "agent_id": agent_id, "retry_count": retry_count, }, ) def _record_silent_degradation(self, agent_id, degradation_type, details): """Record silent degradations like JSON parsing failures.""" self.error_telemetry["silent_degradations"].append( { "timestamp": datetime.now(timezone.utc).isoformat(), "agent_id": agent_id, "type": degradation_type, "details": details, }, ) def _save_error_report(self, logs, final_error=None): """ Save comprehensive error report with all logged data up to the failure point. """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_dir = os.getenv("ORKA_LOG_DIR", "logs") os.makedirs(log_dir, exist_ok=True) # Determine final execution status if final_error: self.error_telemetry["execution_status"] = "failed" self.error_telemetry["critical_failures"].append( { "timestamp": datetime.now(timezone.utc).isoformat(), "error": str(final_error), "step": self.step_index, }, ) elif self.error_telemetry["errors"]: self.error_telemetry["execution_status"] = "partial" else: self.error_telemetry["execution_status"] = "completed" # Generate meta report even on failure try: meta_report = self._generate_meta_report(logs) except Exception as e: self._record_error( "meta_report_generation", "meta_report", f"Failed to generate meta report: {e}", e, ) meta_report = { "error": "Failed to generate meta report", "partial_data": { "total_agents_executed": len(logs), "run_id": self.run_id, }, } # Create comprehensive error report error_report = { "orka_execution_report": { "run_id": self.run_id, "timestamp": timestamp, "execution_status": self.error_telemetry["execution_status"], "error_telemetry": self.error_telemetry, "meta_report": meta_report, "execution_logs": logs, "total_steps_attempted": self.step_index, "total_errors": len(self.error_telemetry["errors"]), "total_retries": sum(self.error_telemetry["retry_counters"].values()), "agents_with_errors": list( set(error["agent_id"] for error in self.error_telemetry["errors"]), ), "memory_snapshot": self._capture_memory_snapshot(), }, } # Save error report error_report_path = os.path.join(log_dir, f"orka_error_report_{timestamp}.json") try: with open(error_report_path, "w") as f: json.dump(error_report, f, indent=2, default=str) print(f"📋 Error report saved: {error_report_path}") except Exception as e: print(f"❌ Failed to save error report: {e}") # Also save to memory backend try: trace_path = os.path.join(log_dir, f"orka_trace_{timestamp}.json") self.memory.save_to_file(trace_path) print(f"📋 Execution trace saved: {trace_path}") except Exception as e: print(f"⚠️ Failed to save trace to memory backend: {e}") return error_report_path def _capture_memory_snapshot(self): """Capture current state of memory backend for debugging.""" try: if hasattr(self.memory, "memory") and self.memory.memory: return { "total_entries": len(self.memory.memory), "last_10_entries": self.memory.memory[-10:] if len(self.memory.memory) >= 10 else self.memory.memory, "backend_type": type(self.memory).__name__, } except Exception as e: return {"error": f"Failed to capture memory snapshot: {e}"} return {"status": "no_memory_data"}