Source code for orka.startup.cleanup

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Service Cleanup
===============

This module handles cleanup and shutdown of OrKa services.
"""

import logging
import subprocess
from typing import Dict

from .infrastructure.kafka import cleanup_kafka_docker
from .infrastructure.redis import terminate_redis_process

logger = logging.getLogger(__name__)


[docs] def cleanup_services(backend: str, processes: Dict[str, subprocess.Popen] = None) -> None: """ Clean up and stop services for the specified backend. Args: backend: The backend type ('redis', 'redisstack', 'kafka', or 'dual') processes: Dictionary of running processes to terminate """ try: # Terminate native processes if processes: for name, proc in processes.items(): if name == "redis": terminate_redis_process(proc) # Generic process termination elif proc and proc.poll() is None: # Process is still running print(f"🛑 Stopping {name} process...") proc.terminate() try: proc.wait(timeout=5) print(f"✅ {name} stopped gracefully") except subprocess.TimeoutExpired: print(f"⚠️ Force killing {name} process...") proc.kill() proc.wait() # Stop Docker services for Kafka if needed if backend in ["kafka", "dual"]: cleanup_kafka_docker() logger.info("All services stopped.") except Exception as e: logger.error(f"Error stopping services: {e}")
[docs] def terminate_all_processes(processes: Dict[str, subprocess.Popen]) -> None: """ Terminate all managed processes gracefully. Args: processes: Dictionary of process names to process objects """ for name, proc in processes.items(): if proc and proc.poll() is None: # Process is still running try: print(f"🛑 Stopping {name} process...") proc.terminate() proc.wait(timeout=5) print(f"✅ {name} stopped gracefully") except subprocess.TimeoutExpired: print(f"⚠️ Force killing {name} process...") proc.kill() proc.wait() except Exception as e: print(f"⚠️ Error stopping {name}: {e}")
[docs] def force_kill_processes(processes: Dict[str, subprocess.Popen]) -> None: """ Force kill all managed processes. Args: processes: Dictionary of process names to process objects """ for name, proc in processes.items(): if proc and proc.poll() is None: # Process is still running try: print(f"⚠️ Force killing {name} process...") proc.kill() proc.wait() except Exception as e: print(f"⚠️ Error force killing {name}: {e}")
[docs] def cleanup_specific_backend(backend: str) -> None: """ Clean up services specific to a backend type. Args: backend: The backend type ('redis', 'redisstack', 'kafka', or 'dual') """ if backend in ["kafka", "dual"]: cleanup_kafka_docker()
# Redis cleanup is handled by process termination # since it's managed as a native process