The validation worker service processes votes from the RabbitMQ validation queue, validates them against Redis, and routes them appropriately.
- Vote Validation: Checks if voter hash exists in valid_hashes Redis set
- Duplicate Detection: Prevents duplicate votes using voted_hashes Redis set
- Audit Logging: Records all votes in PostgreSQL for compliance
- Error Handling: Graceful error handling with message requeuing
- Metrics: Prometheus metrics for monitoring
- Graceful Shutdown: Handles SIGTERM/SIGINT signals properly
- Consume from
votes.validationqueue - Validate Hash: Check if hash exists in
valid_hashesRedis SET- If invalid → publish to
votes.reviewqueue with status='invalid'
- If invalid → publish to
- Check Duplicate: Check if hash exists in
voted_hashesRedis SET- If duplicate:
- Increment
duplicate_count:{hash}counter - Publish to
votes.reviewqueue with status='duplicate' and attempt count
- Increment
- If duplicate:
- Process Valid Vote:
- Add hash to
voted_hashesSET - Insert audit log into PostgreSQL
vote_audittable - Publish to
votes.aggregationqueue - ACK message
- Add hash to
- worker.py: Main worker logic and message processing
- redis_client.py: Redis operations (validation, duplicate checking)
- rabbitmq_client.py: RabbitMQ consumer/publisher with auto-reconnect
- database.py: PostgreSQL client with connection pooling
- config.py: Environment configuration management
Copy .env.example to .env and configure:
# RabbitMQ
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASS=guest
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=election_db
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
# Worker
WORKER_ID=worker-1
PREFETCH_COUNT=10
METRICS_PORT=8001- Python 3.11+
- RabbitMQ running
- Redis running
- PostgreSQL running with
vote_audittable
pip install -r requirements.txtpython worker.pydocker build -t validation-worker:latest .docker run -d \
--name validation-worker \
--env-file .env \
-p 8001:8001 \
validation-worker:latestPrometheus metrics available at http://localhost:8001/metrics:
validation_votes_processed_total{status}: Total votes processed by statusvalidation_processing_latency_seconds: Processing time histogramvalidation_errors_total{error_type}: Total errors by typevalidation_queue_size: Current queue sizeredis_operations_total{operation,status}: Redis operation countsdatabase_operations_total{operation,status}: Database operation counts
The worker expects the following PostgreSQL table:
CREATE TABLE vote_audit (
id SERIAL PRIMARY KEY,
voter_hash VARCHAR(64) NOT NULL,
candidate_id INTEGER NOT NULL,
status VARCHAR(20) NOT NULL,
vote_timestamp TIMESTAMP NOT NULL,
processed_timestamp TIMESTAMP NOT NULL,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_vote_audit_voter_hash ON vote_audit(voter_hash);
CREATE INDEX idx_vote_audit_status ON vote_audit(status);
CREATE INDEX idx_vote_audit_timestamp ON vote_audit(vote_timestamp);- Invalid JSON: Message rejected (not requeued)
- Redis Errors: Message NACKed and requeued
- Database Errors: Redis operation rolled back, message NACKed
- Connection Errors: Automatic reconnection with retry logic
The worker handles SIGTERM/SIGINT signals:
- Stops consuming new messages
- Completes processing current message
- Closes RabbitMQ connection
- Closes Redis connection
- Closes database connections
- Exits cleanly
Run multiple workers for horizontal scaling:
# Worker 1
WORKER_ID=worker-1 METRICS_PORT=8001 python worker.py
# Worker 2
WORKER_ID=worker-2 METRICS_PORT=8002 python worker.py
# Worker 3
WORKER_ID=worker-3 METRICS_PORT=8003 python worker.pyEach worker processes messages independently from the shared queue.
Check if worker is processing messages:
curl http://localhost:8001/metrics | grep validation_votes_processed_totalMonitor queue depth:
curl http://localhost:8001/metrics | grep validation_queue_sizeMonitor error rate:
curl http://localhost:8001/metrics | grep validation_errors_total- Check RabbitMQ connection
- Verify queue exists and has messages
- Check Redis connectivity
- Review worker logs
- Check Redis availability
- Verify database connectivity
- Check message format
- Review error metrics by type
- Increase
PREFETCH_COUNT - Scale horizontally (add more workers)
- Check database connection pool size
- Monitor Redis latency
MIT