A high-performance, real-time Publisher-Subscriber system built with Flask, Flask-SocketIO, and SQLite.
-
Real-time Communication: WebSocket-based pub/sub messaging with instant delivery
-
Multiple Topics: Support for subscribing to multiple topics simultaneously
-
Persistent Storage: SQLite database for message history and consumption tracking
-
Web Interface: Interactive web client for testing and monitoring
-
Python Client Library: Easy-to-use Python client for integration
-
RESTful API: HTTP endpoints for publishing messages
-
Live Monitoring: Real-time monitoring of connected clients and message consumption
-
Docker Support: Ready-to-deploy Docker configuration
-
Comprehensive Testing: Extensive test suite with pytest
-
Production Ready: Health checks, logging, and error handling
- Python 3.8 or higher
- pip package manager
- SQLite3
# Clone the repository
git clone https://github.com/venantvr/Python.Publisher.Subscriber.git
cd Python.Publisher.Subscriber
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install in development mode
pip install -r requirements-dev.txt
pip install -e .# Using Docker Compose
docker-compose up -d
# Or build and run manually
docker build -t python.publisher.subscriber:latest .
docker run -p 5000:5000 python.publisher.subscriber:latest# Run directly
python src/pubsub_ws.pyThe server will start on http://localhost:5000
curl -X POST http://localhost:5000/publish \
-H "Content-Type: application/json" \
-d '{"topic": "sports", "message": "Goal scored!"}'import requests
response = requests.post(
"http://localhost:5000/publish",
json={"topic": "sports", "message": "Goal scored!"}
)from pubsub import PubSubClient
def handle_sports_message(message):
print(f"Sports update: {message}")
def handle_news_message(message):
print(f"News update: {message}")
# Create client and connect
client = PubSubClient(
consumer_name="alice",
topics=["sports", "news"]
)
# Register message handlers
client.register_handler("sports", handle_sports_message)
client.register_handler("news", handle_news_message)
# Start listening
client.start()Open your browser at http://localhost:5000/client.html
Python.Publisher.Subscriber/
├── src/ # Source code
│ ├── pubsub/ # Core library modules
│ │ ├── __init__.py
│ │ ├── pubsub_client.py
│ │ └── pubsub_message.py
│ ├── pubsub_ws.py # Main server application
│ └── client.py # Client implementation
├── tests/ # Test suite
│ ├── test_pubsub_ws.py
│ └── test_pubsub_client.py
├── config/ # Configuration files
├── docs/ # Documentation
├── migrations/ # Database migrations
├── static/ # Static web files
├── .github/ # GitHub Actions workflows
│ └── workflows/
│ ├── ci.yml
│ └── release.yml
├── Dockerfile # Docker configuration
├── docker-compose.yml # Docker Compose setup
├── Makefile # Development commands
├── pyproject.toml # Python project configuration
├── setup.py # Package setup
├── requirements.txt # Python dependencies
└── README.md # This file
# Run all tests
make test
# Run specific test file
pytest tests/test_pubsub_ws.py -v
# Run in watch mode
pytest-watch tests/ -v# Install development dependencies
make installmake help # Show all available commands
make test # Run tests
make clean # Clean generated files
make install # Install dependencies
make update # Update dependenciesThe application uses SQLite with the following schema:
id: Primary keytopic: Message topicmessage: Message contenttimestamp: Creation time
id: Primary keyconsumer: Consumer nametopic: Subscribed topictimestamp: Subscription time
id: Primary keyconsumer: Consumer namemessage_id: Reference to messageconsumed_at: Consumption timestamp
Publish a message to a topic.
{
"topic": "string",
"message": "string"
}Health check endpoint.
subscribe: Subscribe to topics
{
"consumer": "string",
"topics": [
"string"
]
}publish: Publish message via WebSocket
{
"topic": "string",
"message": "string"
}message: Receive subscribed messagesclient_list: Updated list of connected clientsconsumption_update: Message consumption notifications
# Start all services
docker-compose up -d
# View logs
docker-compose logs -f
# Stop services
docker-compose down# Build image
docker build -t python.publisher.subscriber:latest .
# Run container
docker run -d \
-p 5000:5000 \
-v $(pwd)/pubsub.db:/app/pubsub.db \
--name pubsub-server \
python.publisher.subscriber:latestcurl http://localhost:5000/healthThe application provides real-time metrics through the web interface:
- Connected clients count
- Messages per topic
- Consumption rate
- Active subscriptions
- Input validation on all endpoints
- SQL injection prevention via parameterized queries
- XSS protection in web interface
- Rate limiting support
- CORS configuration available
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
- Follow PEP 8 style guide
- Add tests for new features
- Update documentation as needed
- Use type hints
This project is licensed under the MIT License - see the LICENSE file for details.
- Flask team for the excellent web framework
- Socket.IO team for real-time communication
- All contributors and users of this project
Full documentation is available in the docs/ directory.
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: venantvr@gmail.com
- Redis backend support
- Message persistence options
- Authentication and authorization
- Message encryption
- Horizontal scaling support
- GraphQL API
- Admin dashboard
- Message replay functionality
- Dead letter queue
- Prometheus metrics export