The Healthcare Navigator follows a sophisticated ReAct (Reasoning-Acting-Observation) pattern designed specifically for medical information processing with safety-first principles.
async def process_query(self, query_text: str, context: Dict[str, Any] = None) -> AgentResponse:
# 1. Input sanitization and validation
query_text = sanitize_input(query_text)
# 2. Create structured medical query object
medical_query = MedicalQuery(query_text=query_text)Security Measures:
- HTML/JavaScript injection prevention
- SQL injection pattern detection
- Medical query format validation
- Length and content restrictions
# 3. Retrieve conversation context and user preferences
full_context = await self._prepare_context(context)
# Includes:
# - Last 10 conversation interactions
# - User preferences (age, allergies, medical history)
# - Session metadata and stateMemory Integration:
- Conversation History: SQLite-backed persistent storage
- User Preferences: Age, allergies, medical conditions
- Session State: Current conversation context and flow
- Cache Lookup: Previous similar queries for optimization
# 4. REASONING PHASE: Analyze query and create execution plan
if self.context_planner and full_context.get('conversation_history'):
plan = await self.context_planner.create_context_aware_plan(medical_query, full_context)
else:
plan = await self.query_planner.analyze_query(medical_query, full_context)Planning Intelligence:
- Query Classification: Medical term, drug info, symptoms, documents, entities
- Intent Detection: Urgency assessment (low/moderate/high/emergency)
- Tool Selection: Determine required APIs and processing tools
- Context Integration: Merge conversation history and user preferences
- Plan Optimization: Minimize API calls and maximize efficiency
Example Planning Output:
QueryPlan(
query_id="symptom_analysis_001",
query_type=QueryType.SYMPTOMS,
urgency_level=UrgencyLevel.MODERATE,
key_entities=["headache", "fever"],
tools_required=["analyze_symptoms", "search_medical_term"],
execution_steps=[
{
"step_number": 1,
"tool_name": "analyze_symptoms",
"input_data": {"symptoms": "headache and fever"},
"confidence_threshold": 0.7
}
]
) # 5. ACTING PHASE: Execute the plan with healthcare tools
medical_response = await self.executor.execute_plan(plan, full_context)Execution Strategy:
- Parallel Processing: Multiple API calls when beneficial
- Retry Logic: Exponential backoff for transient failures
- Timeout Management: Per-tool timeout configuration
- Fallback Chains: Primary → Secondary → Cache → AI → Error
- Result Synthesis: Combine multiple data sources coherently
Tool Execution Example:
async def execute_symptom_analysis(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Extract symptoms and context
symptoms = input_data.get('query_entities', [])
user_context = input_data.get('context', {})
# Create comprehensive analysis prompt
analysis_prompt = f"""
Analyze these symptoms: {symptoms}
User context: Age {user_context.get('user_age', 'unknown')}
Provide:
1. Possible Causes
2. Severity Assessment
3. Self-Care Recommendations
4. When to Seek Care
5. Red Flags
"""
# Execute with Gemini API
result = await self.api_manager.gemini_client.generate_response(analysis_prompt)
# Process and structure response
return self._structure_symptom_response(result, symptoms) # 6. OBSERVATION PHASE: Store results and update memory
conversation_stored = self.conversation_memory.store_interaction(
medical_query, medical_response, self.session_id
)Learning & Memory Update:
- Conversation Storage: Query-response pairs with metadata
- Performance Metrics: Processing times, confidence scores
- User Feedback: Implicit feedback from continued interaction
- Cache Updates: Store successful API responses for reuse
Core Responsibilities:
class QueryPlanner:
def analyze_query(self, query: MedicalQuery, context: Dict) -> QueryPlan:
"""
1. Classify query type using Gemini API
2. Extract medical entities using NLP
3. Assess urgency level based on keywords
4. Select appropriate tools and APIs
5. Generate step-by-step execution plan
"""Query Classification Logic:
def _classify_query_type(self, query_text: str) -> QueryType:
# Medical term patterns
if re.search(r'\b(what is|define|explain)\b.*\b(disease|condition|syndrome)\b', query_text, re.I):
return QueryType.MEDICAL_TERM
# Drug information patterns
if re.search(r'\b(drug|medication|medicine|pill)\b', query_text, re.I):
return QueryType.DRUG_INFO
# Symptom analysis patterns
if re.search(r'\b(symptoms?|feel|pain|ache|hurt)\b', query_text, re.I):
return QueryType.SYMPTOMS
# Document processing patterns
if 'upload' in query_text.lower() or 'document' in query_text.lower():
return QueryType.DOCUMENT_SUMMARYContext-Aware Planning:
class ContextAwarePlanner:
def create_context_aware_plan(self, query: MedicalQuery, context: Dict) -> QueryPlan:
"""
Enhanced planning with conversation history integration:
1. Analyze conversation flow and continuity
2. Identify follow-up questions and clarifications
3. Integrate user preferences (age, allergies, history)
4. Optimize plan based on previous successful patterns
5. Adjust urgency based on conversation context
"""Tool Coordination Architecture:
class ToolExecutor:
def __init__(self):
self.tools = {
'search_medical_term': {
'function': self.execute_medical_term_lookup,
'type': 'api_call',
'timeout': 10.0,
'retry_count': 3
},
'get_drug_information': {
'function': self.execute_drug_info_lookup,
'type': 'api_call',
'timeout': 15.0,
'retry_count': 3
},
'analyze_symptoms': {
'function': self.execute_symptom_analysis,
'type': 'nlp_processing',
'timeout': 20.0,
'retry_count': 2
}
}Execution Flow with Error Handling:
async def _execute_step(self, step: Dict, context: Dict, plan: QueryPlan) -> ExecutionResult:
tool_name = step.get('tool_name')
tool_config = self.tools[tool_name]
try:
# Execute with retry logic and timeout
result_data = await self._execute_with_retry(
tool_config['function'],
step.get('input_data', {}),
tool_config['retry_count'],
tool_config['timeout']
)
return ExecutionResult(
success=True,
data=result_data,
tool_name=tool_name,
execution_time=execution_time,
confidence_score=result_data.get('confidence_score', 0.7)
)
except Exception as e:
# Graceful error handling with fallback
return self._handle_tool_failure(tool_name, e, step)Multi-Tier Memory Architecture:
class ConversationMemory:
def store_interaction(self, query: MedicalQuery, response: MedicalResponse, session_id: str):
"""
SQLite storage with structured schema:
- Query text and metadata
- Response content and sources
- Confidence scores and processing times
- User context and preferences
"""
def get_context(self, session_id: str, limit: int = 10) -> List[Dict]:
"""
Retrieve relevant conversation history:
- Recent interactions for continuity
- Similar queries for pattern recognition
- User preference evolution over time
"""class CacheManager:
def __init__(self):
self.cache = TTLCache(maxsize=1000, ttl=3600) # 1 hour TTL
self.api_cache = {} # API-specific caching
def cache_api_response(self, key: str, response: Dict, ttl: int):
"""
Intelligent caching strategy:
- Medical terms: 24 hour TTL (stable information)
- Drug recalls: 1 hour TTL (frequently updated)
- Symptom analysis: 30 minutes TTL (context-dependent)
"""class UserPreferences:
def set_preference(self, key: str, value: Any, session_id: str):
"""
Persistent user data:
- Age and demographic information
- Known allergies and medical conditions
- Interaction preferences and history
- Privacy and data retention settings
"""Configuration & Safety:
class GeminiAPIClient:
def __init__(self):
self.model = genai.GenerativeModel('gemini-2.0-flash')
self.system_instructions = self._get_healthcare_system_instructions()
def _get_healthcare_system_instructions(self) -> str:
return """
You are an intelligent healthcare navigator AI assistant.
CRITICAL GUIDELINES:
- You do NOT provide medical diagnoses or treatment recommendations
- Always include appropriate medical disclaimers
- Recommend consulting healthcare professionals for medical concerns
- For urgent symptoms, advise seeking immediate medical attention
RESPONSE REQUIREMENTS:
- Provide ONLY the final, user-friendly response
- Do NOT show your reasoning process or internal thinking
- Use plain language, avoiding unnecessary medical jargon
- Structure information logically with clear sections
"""Function Calling Integration:
async def generate_response(self, prompt: str, use_functions: bool = True) -> GeminiResponse:
# Prepare function tools for structured API calls
if use_functions and self.functions:
function_declarations = self._create_function_declarations()
tools = [genai.types.Tool(function_declarations=function_declarations)]
response = self.model.generate_content(
prompt,
generation_config=generation_config,
tools=tools
)
# Process function calls and generate follow-up response
return await self._process_response(response, prompt)OAuth2 Authentication Flow:
class WHOICDClient:
async def authenticate(self) -> str:
"""
OAuth2 Client Credentials flow:
1. Request access token using client credentials
2. Cache token with expiration tracking
3. Automatic refresh before expiration
4. Fallback handling for authentication failures
"""
auth_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'scope': 'icdapi_access',
'grant_type': 'client_credentials'
}
response = await self._make_auth_request(auth_data)
return response['access_token']Medical Term Search:
async def search_medical_term(self, term: str) -> APIResult:
"""
Comprehensive medical term lookup:
1. Search ICD-11 database for exact matches
2. Fuzzy matching for similar terms
3. Category-based search for broader context
4. Definition extraction and simplification
"""
headers = {'Authorization': f'Bearer {await self.get_access_token()}'}
search_url = f"{self.base_url}/entity/search"
params = {
'q': term,
'subtreeFilterUsesFoundationDescendants': 'false',
'includeKeywordResult': 'true'
}
response = await self._make_request('GET', search_url, headers=headers, params=params)
return self._process_search_results(response)Drug Safety Information Retrieval:
class OpenFDAClient:
async def get_drug_information(self, drug_name: str) -> APIResult:
"""
Comprehensive drug safety analysis:
1. Drug label information (official FDA data)
2. Recall information (safety alerts)
3. Adverse event reports (side effects)
4. Enforcement actions (regulatory actions)
"""
# Parallel API calls for comprehensive data
tasks = [
self._get_drug_labels(drug_name),
self._get_drug_recalls(drug_name),
self._get_adverse_events(drug_name)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return self._combine_drug_data(results, drug_name)Rate Limiting & Error Handling:
class RateLimiter:
def __init__(self, requests_per_minute: int = 240):
self.requests_per_minute = requests_per_minute
self.requests = deque()
async def acquire(self):
"""
Token bucket rate limiting:
- 240 requests per minute (FDA limit)
- Exponential backoff on rate limit exceeded
- Request queuing for burst handling
"""
now = time.time()
# Remove requests older than 1 minute
while self.requests and self.requests[0] <= now - 60:
self.requests.popleft()
if len(self.requests) >= self.requests_per_minute:
sleep_time = 60 - (now - self.requests[0])
await asyncio.sleep(sleep_time)
self.requests.append(now)Entity Extraction Pipeline:
class MedicalEntityExtractor:
def __init__(self):
self.nlp = spacy.load("en_core_med7_lg") # Medical NLP model
self.drug_ner = DrugNER() # Specialized drug recognition
async def extract_entities(self, text: str) -> Dict[str, List[MedicalEntity]]:
"""
Multi-stage entity extraction:
1. Medical NER using MedSpaCy
2. Drug-specific NER for medication names
3. Symptom extraction using medical vocabularies
4. Confidence scoring and validation
"""
# Process with medical NLP model
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entity = MedicalEntity(
text=ent.text,
entity_type=self._map_entity_type(ent.label_),
confidence=ent._.confidence if hasattr(ent._, 'confidence') else 0.8,
start_pos=ent.start_char,
end_pos=ent.end_char
)
entities.append(entity)
return self._group_entities_by_type(entities)Log Configuration:
def setup_logging() -> logging.Logger:
"""
Multi-level logging with structured output:
- File logging: Daily rotation with detailed information
- Console logging: Real-time monitoring during development
- Structured format: JSON-compatible for log analysis
"""
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
# File handler with daily rotation
file_handler = logging.handlers.TimedRotatingFileHandler(
f'logs/healthcare_navigator_{datetime.now().strftime("%Y%m%d")}.log',
when='midnight',
interval=1,
backupCount=30
)
return loggerTracing & Decision Logging:
class AgentTracer:
def log_query_processing(self, query: str, plan: QueryPlan):
logger.info(f"QUERY_PROCESSING", extra={
'query_text': query[:100],
'query_type': plan.query_type.value,
'urgency_level': plan.urgency_level.value,
'tools_required': plan.tools_required,
'execution_steps': len(plan.execution_steps)
})
def log_api_call(self, api_name: str, endpoint: str, response_time: float, success: bool):
logger.info(f"API_CALL", extra={
'api_name': api_name,
'endpoint': endpoint,
'response_time_ms': response_time * 1000,
'success': success,
'timestamp': datetime.utcnow().isoformat()
})
def log_tool_execution(self, tool_name: str, input_data: Dict, result: ExecutionResult):
logger.info(f"TOOL_EXECUTION", extra={
'tool_name': tool_name,
'execution_time_ms': result.execution_time * 1000,
'success': result.success,
'confidence_score': result.confidence_score,
'error': result.error if not result.success else None
})Metrics Collection:
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'query_processing_times': [],
'api_response_times': defaultdict(list),
'cache_hit_rates': [],
'error_rates': defaultdict(int),
'user_satisfaction_scores': []
}
def record_query_processing(self, processing_time: float, success: bool):
self.metrics['query_processing_times'].append(processing_time)
if not success:
self.metrics['error_rates']['query_processing'] += 1
def get_performance_summary(self) -> Dict[str, Any]:
return {
'avg_query_time': np.mean(self.metrics['query_processing_times']),
'p95_query_time': np.percentile(self.metrics['query_processing_times'], 95),
'cache_hit_rate': np.mean(self.metrics['cache_hit_rates']),
'overall_error_rate': sum(self.metrics['error_rates'].values()) / len(self.metrics['query_processing_times'])
}Unit Testing:
# tests/test_agent.py
class TestHealthcareNavigatorAgent:
@pytest.mark.asyncio
async def test_medical_term_query(self):
agent = HealthcareNavigatorAgent("test_session")
response = await agent.process_query("What is diabetes?")
assert response.response.response_text is not None
assert "diabetes" in response.response.response_text.lower()
assert any("disclaimer" in disclaimer.lower() for disclaimer in response.response.disclaimers)
assert response.response.confidence_score > 0.5Integration Testing:
# tests/test_integration.py
class TestAPIIntegration:
@pytest.mark.asyncio
async def test_who_icd_integration(self):
client = WHOICDClient()
result = await client.search_medical_term("hypertension")
assert result.success
assert result.data['definition'] is not None
assert result.data['icd_code'] is not NoneEnd-to-End Testing:
#!/bin/bash
# test_setup.py - Comprehensive system testing
echo "🧪 Running Healthcare Navigator Tests..."
# Test CLI interface
python cli.py --query "What is diabetes?" > test_output.txt
if grep -q "diabetes" test_output.txt; then
echo "✅ CLI medical term query test passed"
else
echo "❌ CLI medical term query test failed"
fi
# Test web interface (headless)
python -c "
import asyncio
from src.agent import HealthcareNavigatorAgent
async def test_agent():
agent = HealthcareNavigatorAgent('test')
response = await agent.process_query('Tell me about aspirin side effects')
assert 'aspirin' in response.response.response_text.lower()
print('✅ Agent drug query test passed')
asyncio.run(test_agent())
"
# Test API health
python -c "
import asyncio
from src.agent import HealthcareNavigatorAgent
async def test_health():
agent = HealthcareNavigatorAgent('test')
status = await agent.get_system_status()
assert status['system_healthy'] == True
print('✅ System health check passed')
asyncio.run(test_health())
"
echo "🎉 All tests completed successfully!"Issue: External API calls can be slow, especially WHO ICD-11 OAuth flow
# Mitigation strategies implemented:
- Aggressive caching (1-24 hour TTL based on data type)
- Parallel API calls where possible
- Timeout management (10-20 seconds per API)
- Fallback chains for degraded performanceIssue: Document processing can consume significant memory
# Current limitations:
- Maximum file size: 20MB
- Memory usage scales with document size
- No streaming processing for very large files
# Mitigation:
- File size validation before processing
- Chunked processing for large documents
- Memory cleanup after processingIssue: Queries that could have multiple interpretations
# Example problematic queries:
"I have a cold" - Could be symptom analysis or general information
"Tell me about depression" - Medical condition vs. mental health support
# Current handling:
- Context-aware disambiguation using conversation history
- User clarification prompts when ambiguity detected
- Default to safest interpretation (medical information vs. diagnosis)Issue: Limited support for non-English medical queries
# Current limitations:
- Primary support for English only
- Medical NLP models trained on English text
- API responses primarily in English
# Partial mitigation:
- Basic language detection
- Graceful error messages for unsupported languages
- Future: Multi-language medical NLP modelsIssue: OAuth token refresh and rate limiting
# Limitations:
- Token expires every hour
- Rate limits not clearly documented
- Occasional authentication failures
# Mitigation:
- Proactive token refresh (55-minute intervals)
- Exponential backoff on rate limit errors
- Fallback to Gemini AI when WHO API unavailableIssue: Not all drugs have complete FDA data
# Data gaps:
- Newer drugs may lack comprehensive recall data
- Generic drugs may have limited information
- International drugs not in FDA database
# Handling:
- Clear indication when data is limited
- Fallback to AI-generated drug information
- Recommendation to consult healthcare providersIssue: Cannot provide medical diagnoses despite user expectations
# User expectation management:
- Clear disclaimers in every medical response
- Explicit statements about not providing diagnoses
- Consistent referral to healthcare professionals
- Emergency detection and immediate care recommendationsIssue: Limited ability to check complex drug interactions
# Current limitations:
- Basic allergy checking against user preferences
- No comprehensive drug interaction database
- Cannot account for all medical conditions
# Safety measures:
- Clear warnings about consulting pharmacists
- Emphasis on professional medical advice
- Allergy alerts when user data availableIssue: SQLite limitations for high-concurrency scenarios
# Current setup:
- SQLite suitable for development and small deployments
- Single-file database with limited concurrent writes
- No built-in replication or clustering
# Production recommendations:
- PostgreSQL for production deployments
- Database connection pooling
- Read replicas for conversation historyIssue: In-memory session state doesn't scale horizontally
# Current limitations:
- Session state stored in application memory
- No session sharing between instances
- Lost sessions on application restart
# Scaling solutions:
- Redis-based session storage
- Stateless session design
- Load balancer session affinity- Integration with more specialized medical NLP models
- Support for medical abbreviations and terminology
- Improved entity linking to medical ontologies
- Machine learning-based user preference learning
- Adaptive response formatting based on user expertise
- Personalized medical information recommendations
- Medical image analysis integration
- Voice input and output capabilities
- Integration with wearable device data
- Real-time medical emergency detection
- Integration with emergency services APIs
- Advanced drug interaction checking
The Healthcare Navigator represents a robust foundation for medical information access while maintaining appropriate limitations and safety boundaries. The system is designed to be transparent about its capabilities and limitations, ensuring users understand both what it can and cannot provide in terms of medical guidance.