orka.nodes.rag_node module

class orka.nodes.rag_node.RAGNode(node_id: str, registry: Registry, prompt: str = '', queue: str = 'default', timeout: float | None = 30.0, max_concurrency: int = 10, top_k: int = 5, score_threshold: float = 0.7)[source]

Bases: BaseNode

RAG Node Implementation

A specialized node that performs Retrieval-Augmented Generation (RAG) operations by combining semantic search with language model generation.

Core Functionality

RAG Process: 1. Query Processing: Extract and prepare the input query 2. Embedding Generation: Convert query to vector representation 3. Memory Search: Find relevant documents using semantic similarity 4. Context Formatting: Structure retrieved documents for LLM consumption 5. Answer Generation: Use LLM to generate response based on context

Integration Points: - Memory Backend: Searches for relevant documents using vector similarity - Embedder Service: Generates query embeddings for semantic search - LLM Service: Generates final answers based on retrieved context - Registry System: Accesses shared resources through dependency injection

Architecture Details

Node Configuration: - top_k: Number of documents to retrieve (default: 5) - score_threshold: Minimum similarity score for relevance (default: 0.7) - timeout: Maximum execution time for the operation - max_concurrency: Limit on parallel executions

Resource Management: - Lazy initialization of expensive resources (memory, embedder, LLM) - Registry-based dependency injection for shared services - Automatic resource cleanup and lifecycle management - Thread-safe execution for concurrent operations

Error Handling: - Graceful handling of missing or invalid queries - Fallback responses when no relevant documents found - Structured error reporting with context preservation - Automatic retry logic for transient failures

Implementation Features

Search Capabilities: - Vector similarity search using embeddings - Configurable relevance thresholds - Top-k result limiting for performance - Metadata filtering and namespace support

Context Management: - Intelligent document formatting for LLM consumption - Source attribution and reference tracking - Context length optimization for model limits - Structured output with sources and confidence scores

LLM Integration: - Dynamic prompt construction with retrieved context - Configurable model parameters and settings - Response quality validation and filtering - Token usage tracking and optimization

Usage Examples

Basic Configuration: ```yaml agents:

  • id: rag_assistant type: rag top_k: 5 score_threshold: 0.7 timeout: 30.0

```

Advanced Configuration: ```yaml agents:

  • id: specialized_rag type: rag top_k: 10 score_threshold: 0.8 max_concurrency: 5 llm_config:

    model: “gpt-4” temperature: 0.1 max_tokens: 500

```

Integration with Memory: `python # The node automatically integrates with the memory system # Memory backend provides semantic search capabilities # Embedder service generates query vectors # LLM service generates final responses `

Response Format

Successful Response: ```json {

“result”: {

“answer”: “Generated response based on retrieved context”, “sources”: [

{

“content”: “Source document content”, “score”: 0.85, “metadata”: {…}

}

]

}, “status”: “success”, “error”: null, “metadata”: {“node_id”: “rag_assistant”}

}

Error Response: ```json {

“result”: null, “status”: “error”, “error”: “Query is required for RAG operation”, “metadata”: {“node_id”: “rag_assistant”}

}

No Results Response: ```json {

“result”: {

“answer”: “I couldn’t find any relevant information to answer your question.”, “sources”: []

}, “status”: “success”, “error”: null, “metadata”: {“node_id”: “rag_assistant”}

}

Performance Considerations

Optimization Features: - Lazy resource initialization to reduce startup time - Configurable concurrency limits for resource management - Efficient context formatting to minimize token usage - Caching strategies for frequently accessed documents

Scalability: - Supports high-throughput query processing - Memory-efficient document handling - Parallel processing capabilities - Resource pooling for external services

Monitoring: - Execution timing and performance metrics - Search quality and relevance tracking - LLM usage and cost monitoring - Error rate and pattern analysis

async initialize() None[source]

Initialize the node and its resources.

async run(context: Context) dict[str, Any][source]

Run the RAG node with the given context.