Skip to content

Latest commit

 

History

History
802 lines (673 loc) · 25.7 KB

File metadata and controls

802 lines (673 loc) · 25.7 KB

🏥 Healthcare Navigator - Technical Explanation

1. Agent Workflow (ReAct Pattern)

The Healthcare Navigator follows a sophisticated ReAct (Reasoning-Acting-Observation) pattern designed specifically for medical information processing with safety-first principles.

Step-by-Step Processing Flow

Phase 1: Input Reception & Sanitization

async def process_query(self, query_text: str, context: Dict[str, Any] = None) -> AgentResponse:
    # 1. Input sanitization and validation
    query_text = sanitize_input(query_text)
    
    # 2. Create structured medical query object
    medical_query = MedicalQuery(query_text=query_text)

Security Measures:

  • HTML/JavaScript injection prevention
  • SQL injection pattern detection
  • Medical query format validation
  • Length and content restrictions

Phase 2: Context Preparation & Memory Retrieval

    # 3. Retrieve conversation context and user preferences
    full_context = await self._prepare_context(context)
    # Includes:
    # - Last 10 conversation interactions
    # - User preferences (age, allergies, medical history)
    # - Session metadata and state

Memory Integration:

  • Conversation History: SQLite-backed persistent storage
  • User Preferences: Age, allergies, medical conditions
  • Session State: Current conversation context and flow
  • Cache Lookup: Previous similar queries for optimization

Phase 3: Reasoning - Query Analysis & Planning

    # 4. REASONING PHASE: Analyze query and create execution plan
    if self.context_planner and full_context.get('conversation_history'):
        plan = await self.context_planner.create_context_aware_plan(medical_query, full_context)
    else:
        plan = await self.query_planner.analyze_query(medical_query, full_context)

Planning Intelligence:

  • Query Classification: Medical term, drug info, symptoms, documents, entities
  • Intent Detection: Urgency assessment (low/moderate/high/emergency)
  • Tool Selection: Determine required APIs and processing tools
  • Context Integration: Merge conversation history and user preferences
  • Plan Optimization: Minimize API calls and maximize efficiency

Example Planning Output:

QueryPlan(
    query_id="symptom_analysis_001",
    query_type=QueryType.SYMPTOMS,
    urgency_level=UrgencyLevel.MODERATE,
    key_entities=["headache", "fever"],
    tools_required=["analyze_symptoms", "search_medical_term"],
    execution_steps=[
        {
            "step_number": 1,
            "tool_name": "analyze_symptoms",
            "input_data": {"symptoms": "headache and fever"},
            "confidence_threshold": 0.7
        }
    ]
)

Phase 4: Acting - Tool Execution & API Coordination

    # 5. ACTING PHASE: Execute the plan with healthcare tools
    medical_response = await self.executor.execute_plan(plan, full_context)

Execution Strategy:

  • Parallel Processing: Multiple API calls when beneficial
  • Retry Logic: Exponential backoff for transient failures
  • Timeout Management: Per-tool timeout configuration
  • Fallback Chains: Primary → Secondary → Cache → AI → Error
  • Result Synthesis: Combine multiple data sources coherently

Tool Execution Example:

async def execute_symptom_analysis(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
    # Extract symptoms and context
    symptoms = input_data.get('query_entities', [])
    user_context = input_data.get('context', {})
    
    # Create comprehensive analysis prompt
    analysis_prompt = f"""
    Analyze these symptoms: {symptoms}
    User context: Age {user_context.get('user_age', 'unknown')}
    
    Provide:
    1. Possible Causes
    2. Severity Assessment  
    3. Self-Care Recommendations
    4. When to Seek Care
    5. Red Flags
    """
    
    # Execute with Gemini API
    result = await self.api_manager.gemini_client.generate_response(analysis_prompt)
    
    # Process and structure response
    return self._structure_symptom_response(result, symptoms)

Phase 5: Observation - Result Processing & Memory Update

    # 6. OBSERVATION PHASE: Store results and update memory
    conversation_stored = self.conversation_memory.store_interaction(
        medical_query, medical_response, self.session_id
    )

Learning & Memory Update:

  • Conversation Storage: Query-response pairs with metadata
  • Performance Metrics: Processing times, confidence scores
  • User Feedback: Implicit feedback from continued interaction
  • Cache Updates: Store successful API responses for reuse

2. Key Modules Deep Dive

Planner Module (src/planner.py)

Core Responsibilities:

class QueryPlanner:
    def analyze_query(self, query: MedicalQuery, context: Dict) -> QueryPlan:
        """
        1. Classify query type using Gemini API
        2. Extract medical entities using NLP
        3. Assess urgency level based on keywords
        4. Select appropriate tools and APIs
        5. Generate step-by-step execution plan
        """

Query Classification Logic:

def _classify_query_type(self, query_text: str) -> QueryType:
    # Medical term patterns
    if re.search(r'\b(what is|define|explain)\b.*\b(disease|condition|syndrome)\b', query_text, re.I):
        return QueryType.MEDICAL_TERM
    
    # Drug information patterns  
    if re.search(r'\b(drug|medication|medicine|pill)\b', query_text, re.I):
        return QueryType.DRUG_INFO
    
    # Symptom analysis patterns
    if re.search(r'\b(symptoms?|feel|pain|ache|hurt)\b', query_text, re.I):
        return QueryType.SYMPTOMS
    
    # Document processing patterns
    if 'upload' in query_text.lower() or 'document' in query_text.lower():
        return QueryType.DOCUMENT_SUMMARY

Context-Aware Planning:

class ContextAwarePlanner:
    def create_context_aware_plan(self, query: MedicalQuery, context: Dict) -> QueryPlan:
        """
        Enhanced planning with conversation history integration:
        1. Analyze conversation flow and continuity
        2. Identify follow-up questions and clarifications
        3. Integrate user preferences (age, allergies, history)
        4. Optimize plan based on previous successful patterns
        5. Adjust urgency based on conversation context
        """

Executor Module (src/executor.py)

Tool Coordination Architecture:

class ToolExecutor:
    def __init__(self):
        self.tools = {
            'search_medical_term': {
                'function': self.execute_medical_term_lookup,
                'type': 'api_call',
                'timeout': 10.0,
                'retry_count': 3
            },
            'get_drug_information': {
                'function': self.execute_drug_info_lookup,
                'type': 'api_call', 
                'timeout': 15.0,
                'retry_count': 3
            },
            'analyze_symptoms': {
                'function': self.execute_symptom_analysis,
                'type': 'nlp_processing',
                'timeout': 20.0,
                'retry_count': 2
            }
        }

Execution Flow with Error Handling:

async def _execute_step(self, step: Dict, context: Dict, plan: QueryPlan) -> ExecutionResult:
    tool_name = step.get('tool_name')
    tool_config = self.tools[tool_name]
    
    try:
        # Execute with retry logic and timeout
        result_data = await self._execute_with_retry(
            tool_config['function'],
            step.get('input_data', {}),
            tool_config['retry_count'],
            tool_config['timeout']
        )
        
        return ExecutionResult(
            success=True,
            data=result_data,
            tool_name=tool_name,
            execution_time=execution_time,
            confidence_score=result_data.get('confidence_score', 0.7)
        )
        
    except Exception as e:
        # Graceful error handling with fallback
        return self._handle_tool_failure(tool_name, e, step)

Memory System (src/memory.py)

Multi-Tier Memory Architecture:

1. Conversation Memory (Persistent)

class ConversationMemory:
    def store_interaction(self, query: MedicalQuery, response: MedicalResponse, session_id: str):
        """
        SQLite storage with structured schema:
        - Query text and metadata
        - Response content and sources
        - Confidence scores and processing times
        - User context and preferences
        """
        
    def get_context(self, session_id: str, limit: int = 10) -> List[Dict]:
        """
        Retrieve relevant conversation history:
        - Recent interactions for continuity
        - Similar queries for pattern recognition
        - User preference evolution over time
        """

2. Cache Manager (Performance)

class CacheManager:
    def __init__(self):
        self.cache = TTLCache(maxsize=1000, ttl=3600)  # 1 hour TTL
        self.api_cache = {}  # API-specific caching
        
    def cache_api_response(self, key: str, response: Dict, ttl: int):
        """
        Intelligent caching strategy:
        - Medical terms: 24 hour TTL (stable information)
        - Drug recalls: 1 hour TTL (frequently updated)
        - Symptom analysis: 30 minutes TTL (context-dependent)
        """

3. User Preferences (Personalization)

class UserPreferences:
    def set_preference(self, key: str, value: Any, session_id: str):
        """
        Persistent user data:
        - Age and demographic information
        - Known allergies and medical conditions
        - Interaction preferences and history
        - Privacy and data retention settings
        """

3. Tool Integration Details

Google Gemini API Integration

Configuration & Safety:

class GeminiAPIClient:
    def __init__(self):
        self.model = genai.GenerativeModel('gemini-2.0-flash')
        self.system_instructions = self._get_healthcare_system_instructions()
        
    def _get_healthcare_system_instructions(self) -> str:
        return """
        You are an intelligent healthcare navigator AI assistant.
        
        CRITICAL GUIDELINES:
        - You do NOT provide medical diagnoses or treatment recommendations
        - Always include appropriate medical disclaimers
        - Recommend consulting healthcare professionals for medical concerns
        - For urgent symptoms, advise seeking immediate medical attention
        
        RESPONSE REQUIREMENTS:
        - Provide ONLY the final, user-friendly response
        - Do NOT show your reasoning process or internal thinking
        - Use plain language, avoiding unnecessary medical jargon
        - Structure information logically with clear sections
        """

Function Calling Integration:

async def generate_response(self, prompt: str, use_functions: bool = True) -> GeminiResponse:
    # Prepare function tools for structured API calls
    if use_functions and self.functions:
        function_declarations = self._create_function_declarations()
        tools = [genai.types.Tool(function_declarations=function_declarations)]
        
        response = self.model.generate_content(
            prompt,
            generation_config=generation_config,
            tools=tools
        )
        
        # Process function calls and generate follow-up response
        return await self._process_response(response, prompt)

WHO ICD-11 API Integration

OAuth2 Authentication Flow:

class WHOICDClient:
    async def authenticate(self) -> str:
        """
        OAuth2 Client Credentials flow:
        1. Request access token using client credentials
        2. Cache token with expiration tracking
        3. Automatic refresh before expiration
        4. Fallback handling for authentication failures
        """
        auth_data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'scope': 'icdapi_access',
            'grant_type': 'client_credentials'
        }
        
        response = await self._make_auth_request(auth_data)
        return response['access_token']

Medical Term Search:

async def search_medical_term(self, term: str) -> APIResult:
    """
    Comprehensive medical term lookup:
    1. Search ICD-11 database for exact matches
    2. Fuzzy matching for similar terms
    3. Category-based search for broader context
    4. Definition extraction and simplification
    """
    headers = {'Authorization': f'Bearer {await self.get_access_token()}'}
    
    search_url = f"{self.base_url}/entity/search"
    params = {
        'q': term,
        'subtreeFilterUsesFoundationDescendants': 'false',
        'includeKeywordResult': 'true'
    }
    
    response = await self._make_request('GET', search_url, headers=headers, params=params)
    return self._process_search_results(response)

OpenFDA API Integration

Drug Safety Information Retrieval:

class OpenFDAClient:
    async def get_drug_information(self, drug_name: str) -> APIResult:
        """
        Comprehensive drug safety analysis:
        1. Drug label information (official FDA data)
        2. Recall information (safety alerts)
        3. Adverse event reports (side effects)
        4. Enforcement actions (regulatory actions)
        """
        
        # Parallel API calls for comprehensive data
        tasks = [
            self._get_drug_labels(drug_name),
            self._get_drug_recalls(drug_name),
            self._get_adverse_events(drug_name)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return self._combine_drug_data(results, drug_name)

Rate Limiting & Error Handling:

class RateLimiter:
    def __init__(self, requests_per_minute: int = 240):
        self.requests_per_minute = requests_per_minute
        self.requests = deque()
        
    async def acquire(self):
        """
        Token bucket rate limiting:
        - 240 requests per minute (FDA limit)
        - Exponential backoff on rate limit exceeded
        - Request queuing for burst handling
        """
        now = time.time()
        
        # Remove requests older than 1 minute
        while self.requests and self.requests[0] <= now - 60:
            self.requests.popleft()
            
        if len(self.requests) >= self.requests_per_minute:
            sleep_time = 60 - (now - self.requests[0])
            await asyncio.sleep(sleep_time)
            
        self.requests.append(now)

Medical NLP Processing

Entity Extraction Pipeline:

class MedicalEntityExtractor:
    def __init__(self):
        self.nlp = spacy.load("en_core_med7_lg")  # Medical NLP model
        self.drug_ner = DrugNER()  # Specialized drug recognition
        
    async def extract_entities(self, text: str) -> Dict[str, List[MedicalEntity]]:
        """
        Multi-stage entity extraction:
        1. Medical NER using MedSpaCy
        2. Drug-specific NER for medication names
        3. Symptom extraction using medical vocabularies
        4. Confidence scoring and validation
        """
        
        # Process with medical NLP model
        doc = self.nlp(text)
        entities = []
        
        for ent in doc.ents:
            entity = MedicalEntity(
                text=ent.text,
                entity_type=self._map_entity_type(ent.label_),
                confidence=ent._.confidence if hasattr(ent._, 'confidence') else 0.8,
                start_pos=ent.start_char,
                end_pos=ent.end_char
            )
            entities.append(entity)
            
        return self._group_entities_by_type(entities)

4. Observability & Testing

Structured Logging System

Log Configuration:

def setup_logging() -> logging.Logger:
    """
    Multi-level logging with structured output:
    - File logging: Daily rotation with detailed information
    - Console logging: Real-time monitoring during development
    - Structured format: JSON-compatible for log analysis
    """
    
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
    )
    
    # File handler with daily rotation
    file_handler = logging.handlers.TimedRotatingFileHandler(
        f'logs/healthcare_navigator_{datetime.now().strftime("%Y%m%d")}.log',
        when='midnight',
        interval=1,
        backupCount=30
    )
    
    return logger

Tracing & Decision Logging:

class AgentTracer:
    def log_query_processing(self, query: str, plan: QueryPlan):
        logger.info(f"QUERY_PROCESSING", extra={
            'query_text': query[:100],
            'query_type': plan.query_type.value,
            'urgency_level': plan.urgency_level.value,
            'tools_required': plan.tools_required,
            'execution_steps': len(plan.execution_steps)
        })
        
    def log_api_call(self, api_name: str, endpoint: str, response_time: float, success: bool):
        logger.info(f"API_CALL", extra={
            'api_name': api_name,
            'endpoint': endpoint,
            'response_time_ms': response_time * 1000,
            'success': success,
            'timestamp': datetime.utcnow().isoformat()
        })
        
    def log_tool_execution(self, tool_name: str, input_data: Dict, result: ExecutionResult):
        logger.info(f"TOOL_EXECUTION", extra={
            'tool_name': tool_name,
            'execution_time_ms': result.execution_time * 1000,
            'success': result.success,
            'confidence_score': result.confidence_score,
            'error': result.error if not result.success else None
        })

Performance Monitoring

Metrics Collection:

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'query_processing_times': [],
            'api_response_times': defaultdict(list),
            'cache_hit_rates': [],
            'error_rates': defaultdict(int),
            'user_satisfaction_scores': []
        }
        
    def record_query_processing(self, processing_time: float, success: bool):
        self.metrics['query_processing_times'].append(processing_time)
        if not success:
            self.metrics['error_rates']['query_processing'] += 1
            
    def get_performance_summary(self) -> Dict[str, Any]:
        return {
            'avg_query_time': np.mean(self.metrics['query_processing_times']),
            'p95_query_time': np.percentile(self.metrics['query_processing_times'], 95),
            'cache_hit_rate': np.mean(self.metrics['cache_hit_rates']),
            'overall_error_rate': sum(self.metrics['error_rates'].values()) / len(self.metrics['query_processing_times'])
        }

Testing Strategy

Unit Testing:

# tests/test_agent.py
class TestHealthcareNavigatorAgent:
    @pytest.mark.asyncio
    async def test_medical_term_query(self):
        agent = HealthcareNavigatorAgent("test_session")
        response = await agent.process_query("What is diabetes?")
        
        assert response.response.response_text is not None
        assert "diabetes" in response.response.response_text.lower()
        assert any("disclaimer" in disclaimer.lower() for disclaimer in response.response.disclaimers)
        assert response.response.confidence_score > 0.5

Integration Testing:

# tests/test_integration.py
class TestAPIIntegration:
    @pytest.mark.asyncio
    async def test_who_icd_integration(self):
        client = WHOICDClient()
        result = await client.search_medical_term("hypertension")
        
        assert result.success
        assert result.data['definition'] is not None
        assert result.data['icd_code'] is not None

End-to-End Testing:

#!/bin/bash
# test_setup.py - Comprehensive system testing

echo "🧪 Running Healthcare Navigator Tests..."

# Test CLI interface
python cli.py --query "What is diabetes?" > test_output.txt
if grep -q "diabetes" test_output.txt; then
    echo "✅ CLI medical term query test passed"
else
    echo "❌ CLI medical term query test failed"
fi

# Test web interface (headless)
python -c "
import asyncio
from src.agent import HealthcareNavigatorAgent

async def test_agent():
    agent = HealthcareNavigatorAgent('test')
    response = await agent.process_query('Tell me about aspirin side effects')
    assert 'aspirin' in response.response.response_text.lower()
    print('✅ Agent drug query test passed')

asyncio.run(test_agent())
"

# Test API health
python -c "
import asyncio
from src.agent import HealthcareNavigatorAgent

async def test_health():
    agent = HealthcareNavigatorAgent('test')
    status = await agent.get_system_status()
    assert status['system_healthy'] == True
    print('✅ System health check passed')

asyncio.run(test_health())
"

echo "🎉 All tests completed successfully!"

5. Known Limitations & Edge Cases

Performance Bottlenecks

1. API Response Times

Issue: External API calls can be slow, especially WHO ICD-11 OAuth flow

# Mitigation strategies implemented:
- Aggressive caching (1-24 hour TTL based on data type)
- Parallel API calls where possible
- Timeout management (10-20 seconds per API)
- Fallback chains for degraded performance

2. Memory Usage with Large Documents

Issue: Document processing can consume significant memory

# Current limitations:
- Maximum file size: 20MB
- Memory usage scales with document size
- No streaming processing for very large files

# Mitigation:
- File size validation before processing
- Chunked processing for large documents
- Memory cleanup after processing

Input Handling Edge Cases

1. Ambiguous Medical Queries

Issue: Queries that could have multiple interpretations

# Example problematic queries:
"I have a cold" - Could be symptom analysis or general information
"Tell me about depression" - Medical condition vs. mental health support

# Current handling:
- Context-aware disambiguation using conversation history
- User clarification prompts when ambiguity detected
- Default to safest interpretation (medical information vs. diagnosis)

2. Non-English Input

Issue: Limited support for non-English medical queries

# Current limitations:
- Primary support for English only
- Medical NLP models trained on English text
- API responses primarily in English

# Partial mitigation:
- Basic language detection
- Graceful error messages for unsupported languages
- Future: Multi-language medical NLP models

API Integration Limitations

1. WHO ICD-11 API Rate Limits

Issue: OAuth token refresh and rate limiting

# Limitations:
- Token expires every hour
- Rate limits not clearly documented
- Occasional authentication failures

# Mitigation:
- Proactive token refresh (55-minute intervals)
- Exponential backoff on rate limit errors
- Fallback to Gemini AI when WHO API unavailable

2. OpenFDA Data Completeness

Issue: Not all drugs have complete FDA data

# Data gaps:
- Newer drugs may lack comprehensive recall data
- Generic drugs may have limited information
- International drugs not in FDA database

# Handling:
- Clear indication when data is limited
- Fallback to AI-generated drug information
- Recommendation to consult healthcare providers

Medical Safety Limitations

1. Diagnostic Limitations

Issue: Cannot provide medical diagnoses despite user expectations

# User expectation management:
- Clear disclaimers in every medical response
- Explicit statements about not providing diagnoses
- Consistent referral to healthcare professionals
- Emergency detection and immediate care recommendations

2. Drug Interaction Checking

Issue: Limited ability to check complex drug interactions

# Current limitations:
- Basic allergy checking against user preferences
- No comprehensive drug interaction database
- Cannot account for all medical conditions

# Safety measures:
- Clear warnings about consulting pharmacists
- Emphasis on professional medical advice
- Allergy alerts when user data available

Scalability Considerations

1. Database Performance

Issue: SQLite limitations for high-concurrency scenarios

# Current setup:
- SQLite suitable for development and small deployments
- Single-file database with limited concurrent writes
- No built-in replication or clustering

# Production recommendations:
- PostgreSQL for production deployments
- Database connection pooling
- Read replicas for conversation history

2. Session Management

Issue: In-memory session state doesn't scale horizontally

# Current limitations:
- Session state stored in application memory
- No session sharing between instances
- Lost sessions on application restart

# Scaling solutions:
- Redis-based session storage
- Stateless session design
- Load balancer session affinity

Future Improvement Areas

1. Enhanced Medical NLP

  • Integration with more specialized medical NLP models
  • Support for medical abbreviations and terminology
  • Improved entity linking to medical ontologies

2. Personalization Engine

  • Machine learning-based user preference learning
  • Adaptive response formatting based on user expertise
  • Personalized medical information recommendations

3. Multi-Modal Support

  • Medical image analysis integration
  • Voice input and output capabilities
  • Integration with wearable device data

4. Advanced Safety Features

  • Real-time medical emergency detection
  • Integration with emergency services APIs
  • Advanced drug interaction checking

The Healthcare Navigator represents a robust foundation for medical information access while maintaining appropriate limitations and safety boundaries. The system is designed to be transparent about its capabilities and limitations, ensuring users understand both what it can and cannot provide in terms of medical guidance.