Source code for orka.orchestrator.metrics

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Metrics Collection and Reporting
===============================

Handles LLM metrics extraction, aggregation, and reporting.
"""

import logging
import os
import platform
import subprocess
from datetime import datetime, timezone

logger = logging.getLogger(__name__)


[docs] class MetricsCollector: """ Handles metrics collection, aggregation, and reporting. """ def _extract_llm_metrics(self, agent, result): """ Extract LLM metrics from agent result or agent state. Args: agent: The agent instance result: The agent's result Returns: dict or None: LLM metrics if found """ # Check if result is a dict with _metrics if isinstance(result, dict) and "_metrics" in result: return result["_metrics"] # Check if agent has stored metrics (for binary/classification agents) if hasattr(agent, "_last_metrics") and agent._last_metrics: return agent._last_metrics return None def _get_runtime_environment(self): """ Get runtime environment information for debugging and reproducibility. """ env_info = { "platform": platform.platform(), "python_version": platform.python_version(), "timestamp": datetime.now(timezone.utc).isoformat(), } # Get Git SHA if available try: git_sha = ( subprocess.check_output( ["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL, cwd=os.getcwd(), timeout=5, ) .decode() .strip() ) env_info["git_sha"] = git_sha[:12] # Short SHA except: env_info["git_sha"] = "unknown" # Check for Docker environment if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_CONTAINER"): env_info["docker_image"] = os.environ.get("DOCKER_IMAGE", "unknown") else: env_info["docker_image"] = None # GPU information try: import GPUtil gpus = GPUtil.getGPUs() if gpus: env_info["gpu_type"] = ( f"{gpus[0].name} ({len(gpus)} GPU{'s' if len(gpus) > 1 else ''})" ) else: env_info["gpu_type"] = "none" except: env_info["gpu_type"] = "unknown" # Pricing version (current month-year) env_info["pricing_version"] = "2025-01" return env_info def _generate_meta_report(self, logs): """ Generate a meta report with aggregated metrics from execution logs. Args: logs: List of execution log entries Returns: dict: Meta report with aggregated metrics """ total_duration = 0 total_tokens = 0 total_cost_usd = 0 total_llm_calls = 0 latencies = [] agent_metrics = {} model_usage = {} # Track seen metrics to avoid double-counting due to deduplication seen_metrics = set() def extract_metrics_recursively(data, source_agent_id="unknown"): """Recursively extract _metrics from nested data structures, avoiding duplicates.""" found_metrics = [] if isinstance(data, dict): # Check if this dict has _metrics if "_metrics" in data: metrics = data["_metrics"] # Create a unique identifier for this metrics object metrics_id = ( metrics.get("model", ""), metrics.get("tokens", 0), metrics.get("prompt_tokens", 0), metrics.get("completion_tokens", 0), metrics.get("latency_ms", 0), metrics.get("cost_usd", 0), ) # Only add if we haven't seen this exact metrics before if metrics_id not in seen_metrics: seen_metrics.add(metrics_id) found_metrics.append((metrics, source_agent_id)) # Recursively check all values for key, value in data.items(): if key != "_metrics": # Avoid infinite recursion sub_metrics = extract_metrics_recursively(value, source_agent_id) found_metrics.extend(sub_metrics) elif isinstance(data, list): for item in data: sub_metrics = extract_metrics_recursively(item, source_agent_id) found_metrics.extend(sub_metrics) return found_metrics for log_entry in logs: # Aggregate execution duration duration = log_entry.get("duration", 0) total_duration += duration agent_id = log_entry.get("agent_id", "unknown") # Extract all LLM metrics from the log entry recursively all_metrics = [] # First check for llm_metrics at root level (legacy format) if log_entry.get("llm_metrics"): all_metrics.append((log_entry["llm_metrics"], agent_id)) # Then recursively search for _metrics in payload if log_entry.get("payload"): payload_metrics = extract_metrics_recursively(log_entry["payload"], agent_id) all_metrics.extend(payload_metrics) # Process all found metrics for llm_metrics, source_agent in all_metrics: if not llm_metrics: continue total_llm_calls += 1 total_tokens += llm_metrics.get("tokens", 0) # Handle null costs (real local LLM cost calculation may return None) cost = llm_metrics.get("cost_usd") if cost is not None: total_cost_usd += cost else: # Check if we should fail on null costs if os.environ.get("ORKA_LOCAL_COST_POLICY") == "null_fail": raise ValueError( f"Pipeline failed due to null cost in agent '{source_agent}' " f"(model: {llm_metrics.get('model', 'unknown')}). " f"Configure real cost calculation or use cloud models.", ) logger.warning( f"Agent '{source_agent}' returned null cost - excluding from total", ) latency = llm_metrics.get("latency_ms", 0) if latency > 0: latencies.append(latency) # Track per-agent metrics (use the source agent, which could be nested) if source_agent not in agent_metrics: agent_metrics[source_agent] = { "calls": 0, "tokens": 0, "cost_usd": 0, "latencies": [], } agent_metrics[source_agent]["calls"] += 1 agent_metrics[source_agent]["tokens"] += llm_metrics.get("tokens", 0) if cost is not None: agent_metrics[source_agent]["cost_usd"] += cost if latency > 0: agent_metrics[source_agent]["latencies"].append(latency) # Track model usage model = llm_metrics.get("model", "unknown") if model not in model_usage: model_usage[model] = { "calls": 0, "tokens": 0, "cost_usd": 0, } model_usage[model]["calls"] += 1 model_usage[model]["tokens"] += llm_metrics.get("tokens", 0) if cost is not None: model_usage[model]["cost_usd"] += cost # Calculate averages avg_latency_ms = sum(latencies) / len(latencies) if latencies else 0 # Calculate per-agent average latencies and clean up the latencies list for agent_id in agent_metrics: agent_latencies = agent_metrics[agent_id]["latencies"] agent_metrics[agent_id]["avg_latency_ms"] = ( sum(agent_latencies) / len(agent_latencies) if agent_latencies else 0 ) # Remove the temporary latencies list to clean up the output del agent_metrics[agent_id]["latencies"] # Get runtime environment information runtime_env = self._get_runtime_environment() return { "total_duration": round(total_duration, 3), "total_llm_calls": total_llm_calls, "total_tokens": total_tokens, "total_cost_usd": round(total_cost_usd, 6), "avg_latency_ms": round(avg_latency_ms, 2), "agent_breakdown": agent_metrics, "model_usage": model_usage, "runtime_environment": runtime_env, "execution_stats": { "total_agents_executed": len(logs), "run_id": self.run_id, "generated_at": datetime.now(timezone.utc).isoformat(), }, }
[docs] @staticmethod def build_previous_outputs(logs): """ Build a dictionary of previous agent outputs from the execution logs. Used to provide context to downstream agents. """ outputs = {} for log in logs: agent_id = log["agent_id"] payload = log.get("payload", {}) # Case: regular agent output if "result" in payload: outputs[agent_id] = payload["result"] # Case: JoinNode with merged dict if "result" in payload and isinstance(payload["result"], dict): merged = payload["result"].get("merged") if isinstance(merged, dict): outputs.update(merged) return outputs