A simple, robust, and production-ready WebSocket Pub/Sub client for Python, built on Socket.IO.
- π Automatic reconnection with exponential backoff
- π¨ Message queuing for reliable sequential processing
- π― Topic-based subscription with custom handlers
- π Both publish and subscribe capabilities
- π Comprehensive logging for debugging
- π§ͺ Type hints for better IDE support
- β‘ Async message processing with threading
- π‘οΈ Error handling with graceful degradation
- π Idempotence support to prevent duplicate message processing
- Idempotence Guide - Learn about idempotence classes and preventing duplicate message processing
pip install python-pubsub-clientgit clone https://github.com/venantvr/Python.PubSub.Client.git
cd Python.PubSub.Client
pip install .git clone https://github.com/venantvr/Python.PubSub.Client.git
cd Python.PubSub.Client
# Install in editable mode with development dependencies
make dev
# Or manually:
pip install -e ".[dev]"The client can be configured using environment variables. Copy .env.example to .env and adjust the values:
cp .env.example .env
# Edit .env with your configurationKey configuration options:
PUBSUB_SERVER_URL: WebSocket server URL (default:http://localhost:5000)PUBSUB_CONSUMER_NAME: Client identifier (default:demo-client)PUBSUB_TOPICS: Comma-separated list of topics to subscribePUBSUB_LOG_LEVEL: Logging level (DEBUG, INFO, WARNING, ERROR)
See .env.example for all available options.
from python_pubsub_client import PubSubClient # Simple import thanks to __init__.py
# Create client
client = PubSubClient(
url="http://localhost:5000",
consumer="alice",
topics=["notifications", "updates"]
)
# Register message handlers
def handle_notification(message):
print(f"Received notification: {message}")
client.register_handler("notifications", handle_notification)
# Start the client
client.start()# Publish a message to a topic
# noinspection PyUnresolvedReferences
client.publish(
topic="notifications",
message={"type": "alert", "content": "Hello World!"},
producer="alice",
message_id="msg-001"
)import logging
from python_pubsub_client import PubSubClient
# Configure logging
logging.basicConfig(level=logging.INFO)
# Create client with multiple topics
client = PubSubClient(
url="http://localhost:5000",
consumer="service-a",
topics=["orders", "inventory", "shipping"]
)
# Define custom handlers for each topic
def process_order(message):
order_id = message.get("order_id")
print(f"Processing order: {order_id}")
# Your order processing logic here
def update_inventory(message):
item_id = message.get("item_id")
quantity = message.get("quantity")
print(f"Updating inventory for item {item_id}: {quantity}")
# Your inventory logic here
def track_shipping(message):
tracking_number = message.get("tracking_number")
print(f"Tracking shipment: {tracking_number}")
# Your shipping logic here
# Register handlers
client.register_handler("orders", process_order)
client.register_handler("inventory", update_inventory)
client.register_handler("shipping", track_shipping)
# Start client (blocking)
try:
client.start()
except KeyboardInterrupt:
print("Shutting down client...")- Python 3.9 or higher
- pip and virtualenv
- Make (optional, for using Makefile commands)
# Clone the repository
git clone https://github.com/venantvr/Python.PubSub.Client.git
cd Python.PubSub.Client
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install development dependencies
make dev
# Or manually:
pip install -e ".[dev]"# Run all tests
make test
# Run specific test file
pytest tests/test_client.py -vPython.PubSub.Client/
βββ src/
β βββ pubsub/
β βββ __init__.py
β βββ pubsub_client.py # Main client implementation
β βββ pubsub_message.py # Message model
βββ tests/
β βββ __init__.py
β βββ test_client.py
β βββ test_message.py
βββ examples/
β βββ simple_client.py # Example usage
βββ Makefile # Development commands
βββ pyproject.toml # Project configuration (single source of truth)
βββ README.md # This file
The project uses pytest for testing. Tests are located in the tests/ directory.
# Run tests
pytest
# Run with verbose output
pytest -v
# Run with coverage report
pytest --cov=pubsub --cov-report=htmlPubSubClient(url: str, consumer: str, topics: List[str])url: WebSocket server URLconsumer: Consumer identifiertopics: List of topics to subscribe to
Register a callback function for a specific topic.
Publish a message to a topic via HTTP POST.
Start the client and begin listening for messages (blocking).
Stop the client gracefully and clean up resources.
PubSubMessage.new(topic: str, message: Any, producer: str, message_id: str)Create a new message instance.
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Please ensure:
- All tests pass
- Coverage remains above 80%
- Documentation is updated as needed
This project is licensed under the MIT License.
- Built with python-socketio
- Inspired by various Pub/Sub patterns and implementations
- Thanks to all contributors!
- Author: venantvr
- Email: venantvr@gmail.com
- GitHub: @venantvr
Please report bugs via GitHub Issues.
- Add support for message persistence
- Implement message encryption
- Add metrics and monitoring
- Support for multiple server URLs (failover)
- GraphQL subscription support
- WebRTC data channel support
Made with β€οΈ by venantvr