A production-ready, scalable WebSocket package for Go with support for rooms, broadcasting, multi-node clustering, middleware, hooks, and extensibility.
- Production-Ready: Proper concurrency, graceful shutdown & drain, error handling
- Horizontally Scalable: Multi-node support via adapter pattern (Redis, NATS, or custom)
- Pluggable: Bring your own logger, metrics
- Middleware System: Chain handlers with custom logic
- Lifecycle Hooks: Hook into connection, message, room, and backpressure events
- Room Support: Group clients into rooms for targeted broadcasting
- Metrics & Logging: Built-in interfaces for observability; official Prometheus subpackage (
wshub/prometheus) - Configurable: Extensive configuration with builder pattern
- Limits & Rate Limiting: Control connections, rooms, and message rates
- Backpressure Control: Configurable drop policies with notification hooks
- Write Coalescing: Opt-in batching of text messages into single frames for reduced syscalls
- Health Probes: Built-in
/healthzand/readyzhandlers with JSON responses for Kubernetes - Global Counts: Cluster-wide client and room counts via presence gossip
- Zero Business Logic: Pure infrastructure, bring your own logic
Zero-allocation broadcasting, nanosecond lookups — built for scale. (Full benchmarks)
| Operation | Scale | Time | Allocs |
|---|---|---|---|
Broadcast |
100,000 clients | 22.0 ms | 0 |
Broadcast |
1,000,000 clients | 263 ms | 0 |
BroadcastToRoom |
1,000,000 clients | 260 ms | 0 |
BroadcastParallel |
50,000 clients | 5.5 ms | 1 |
SendToClient |
1,000,000 clients | 130 ns | 0 |
SendToUser |
1,000,000 users | 192 ns | 1 |
GetClient |
1,000 clients | 17.7 ns | 0 |
GlobalClientCount |
500 nodes | 4.2 μs | 0 |
| Middleware chain (built) | 3 middlewares | 14.3 ns | 0 |
Message size has no impact on dispatch — 64 B and 64 KB both take ~5.7 μs for 100 clients.
go get github.com/KARTIKrocks/wshubpackage main
import (
"context"
"log"
"net/http"
"time"
"github.com/KARTIKrocks/wshub"
)
func main() {
// Create hub with configuration
config := wshub.DefaultConfig().
WithMaxMessageSize(1024 * 1024).
WithCompression(true)
hub := wshub.NewHub(
wshub.WithConfig(config),
wshub.WithMessageHandler(func(client *wshub.Client, msg *wshub.Message) error {
log.Printf("Message from %s: %s", client.ID, msg.Text())
return client.Send(msg.Data)
}),
)
// Start the hub
go hub.Run()
// Set up HTTP handler
http.HandleFunc("/ws", hub.HandleHTTP())
log.Println("Server starting on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
// Graceful drain + shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hub.Drain(ctx) // stop new connections, wait for existing ones
hub.Shutdown(ctx) // force-close anything remaining
}config := wshub.DefaultConfig()
// Or customize
config := wshub.Config{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
WriteWait: 10 * time.Second,
PongWait: 60 * time.Second,
PingPeriod: 54 * time.Second,
MaxMessageSize: 1024 * 1024,
SendChannelSize: 512,
EnableCompression: true,
CheckOrigin: wshub.AllowAllOrigins,
}config := wshub.DefaultConfig().
WithBufferSizes(4096, 4096).
WithMaxMessageSize(1024 * 1024).
WithCompression(true).
WithCheckOrigin(wshub.AllowOrigins("https://example.com"))// Allow all origins (default)
config.CheckOrigin = wshub.AllowAllOrigins
// Allow same origin only
config.CheckOrigin = wshub.AllowSameOrigin
// Allow specific origins
config.CheckOrigin = wshub.AllowOrigins("https://example.com", "https://app.example.com")
// Custom checker
config.CheckOrigin = func(r *http.Request) bool {
return strings.HasSuffix(r.Header.Get("Origin"), ".example.com")
}// Get all clients
clients := hub.Clients()
count := hub.ClientCount()
// Find client
client, ok := hub.GetClient(clientID)
client, ok := hub.GetClientByUserID(userID)
clients := hub.GetClientsByUserID(userID)// Broadcast to all
hub.Broadcast([]byte("Hello everyone"))
hub.BroadcastText("Hello everyone")
hub.BroadcastJSON(map[string]string{"message": "Hello"})
// Broadcast pre-encoded JSON (zero-alloc, ideal for fan-out)
data, _ := json.Marshal(map[string]string{"message": "Hello"})
hub.BroadcastRawJSON(data)
// Broadcast with context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
hub.BroadcastWithContext(ctx, data)
// Broadcast except one client
hub.BroadcastExcept(data, excludeClient)
// Send to specific client
hub.SendToClient(clientID, data)
// Send to all connections of a user
hub.SendToUser(userID, data)// Join/leave rooms
hub.JoinRoom(client, "general")
hub.LeaveRoom(client, "general")
hub.LeaveAllRooms(client)
// Broadcast to room
hub.BroadcastToRoom("general", data)
hub.BroadcastToRoomExcept("general", data, exceptClient)
// Room info
clients := hub.RoomClients("general")
count := hub.RoomCount("general")
rooms := hub.RoomNames()
exists := hub.RoomExists("general")// Client properties
client.ID // Unique client ID
// Set user ID
client.SetUserID("user-123")
userID := client.GetUserID()
// Metadata
client.SetMetadata("role", "admin")
role, ok := client.GetMetadata("role")
client.DeleteMetadata("role")
// Send messages
client.Send([]byte("Hello"))
client.SendText("Hello")
client.SendJSON(map[string]string{"message": "Hello"})
client.SendRawJSON(preEncodedJSON) // skip marshaling
client.SendBinary(data)
client.SendWithContext(ctx, data)
// Close connection
client.Close()
client.CloseWithCode(websocket.CloseNormalClosure, "Goodbye")
// Room membership
rooms := client.Rooms()
inRoom := client.InRoom("general")
count := client.RoomCount()
// Status
closed := client.IsClosed()
closedAt := client.ClosedAt()
// Client-specific handlers
client.OnMessage(func(c *wshub.Client, msg *wshub.Message) {
// Handle message
})
client.OnClose(func(c *wshub.Client) {
// Handle close
})
client.OnError(func(c *wshub.Client, err error) {
// Handle error
})hub := wshub.NewHub(
wshub.WithHooks(wshub.Hooks{
// Before connection upgrade
BeforeConnect: func(r *http.Request) error {
token := r.Header.Get("Authorization")
if !validateToken(token) {
return wshub.ErrAuthenticationFailed
}
return nil
},
// After successful connection
AfterConnect: func(client *wshub.Client) {
log.Printf("Client connected: %s", client.ID)
},
// Before message processing
BeforeMessage: func(client *wshub.Client, msg *wshub.Message) (*wshub.Message, error) {
if len(msg.Data) > 1000 {
return nil, errors.New("message too large")
}
return msg, nil
},
// After message processing
AfterMessage: func(client *wshub.Client, msg *wshub.Message, err error) {
if err != nil {
log.Printf("Message error: %v", err)
}
},
// Before room join
BeforeRoomJoin: func(client *wshub.Client, room string) error {
if !canJoinRoom(client, room) {
return wshub.ErrUnauthorized
}
return nil
},
// After room join
AfterRoomJoin: func(client *wshub.Client, room string) {
hub.BroadcastToRoomExcept(room,
[]byte(fmt.Sprintf("%s joined", client.ID)),
client,
)
},
// On error
OnError: func(client *wshub.Client, err error) {
log.Printf("Client error: %v", err)
},
}),
)// Create middleware chain
chain := wshub.NewMiddlewareChain(handleMessage).
Use(wshub.RecoveryMiddleware(logger)).
Use(wshub.LoggingMiddleware(logger)).
Use(wshub.MetricsMiddleware(metrics)).
Build()
// Use in message handler
hub := wshub.NewHub(
wshub.WithMessageHandler(chain.Execute),
)// Logging
wshub.LoggingMiddleware(logger)
// Panic recovery
wshub.RecoveryMiddleware(logger)
// Metrics
wshub.MetricsMiddleware(metrics)func RateLimitMiddleware(limiter RateLimiter) wshub.Middleware {
return func(next wshub.HandlerFunc) wshub.HandlerFunc {
return func(client *wshub.Client, msg *wshub.Message) error {
if !limiter.Allow(client.ID) {
return wshub.ErrRateLimitExceeded
}
return next(client, msg)
}
}
}
func AuthMiddleware(auth AuthService) wshub.Middleware {
return func(next wshub.HandlerFunc) wshub.HandlerFunc {
return func(client *wshub.Client, msg *wshub.Message) error {
if client.GetUserID() == "" {
return wshub.ErrUnauthorized
}
return next(client, msg)
}
}
}// Implement the Logger interface
type ZapLogger struct {
logger *zap.Logger
}
func (l *ZapLogger) Debug(msg string, args ...any) {
l.logger.Sugar().Debugw(msg, args...)
}
func (l *ZapLogger) Info(msg string, args ...any) {
l.logger.Sugar().Infow(msg, args...)
}
func (l *ZapLogger) Warn(msg string, args ...any) {
l.logger.Sugar().Warnw(msg, args...)
}
func (l *ZapLogger) Error(msg string, args ...any) {
l.logger.Sugar().Errorw(msg, args...)
}
// Use it
hub := wshub.NewHub(wshub.WithLogger(&ZapLogger{logger}))Use the official Prometheus subpackage for production metrics:
import (
wshubprom "github.com/KARTIKrocks/wshub/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
reg := prometheus.NewRegistry()
collector := wshubprom.New(wshubprom.WithRegistry(reg))
hub := wshub.NewHub(wshub.WithMetrics(collector))
go hub.Run()
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))Or implement the MetricsCollector interface yourself (e.g. for StatsD):
type MyMetrics struct{}
func (m *MyMetrics) IncrementConnections() {}
func (m *MyMetrics) DecrementConnections() {}
func (m *MyMetrics) IncrementMessagesReceived() {}
func (m *MyMetrics) IncrementMessagesSent(count int) {}
func (m *MyMetrics) IncrementMessagesDropped() {}
func (m *MyMetrics) RecordMessageSize(size int) {}
func (m *MyMetrics) RecordLatency(d time.Duration) {}
func (m *MyMetrics) RecordBroadcastDuration(d time.Duration) {}
func (m *MyMetrics) IncrementErrors(errorType string) {}
func (m *MyMetrics) IncrementRoomJoins() {}
func (m *MyMetrics) IncrementRoomLeaves() {}
func (m *MyMetrics) IncrementRooms() {}
func (m *MyMetrics) DecrementRooms() {}
hub := wshub.NewHub(wshub.WithMetrics(&MyMetrics{}))For development, use DebugMetrics for an in-memory snapshot:
metrics := wshub.NewDebugMetrics()
hub := wshub.NewHub(wshub.WithMetrics(metrics))
// Later
stats := metrics.Stats()
fmt.Println(stats.ActiveConnections, stats.TotalMessagesRecv, stats.AvgBroadcast)limits := wshub.DefaultLimits().
WithMaxConnections(10000).
WithMaxConnectionsPerUser(5).
WithMaxRoomsPerClient(10).
WithMaxClientsPerRoom(100).
WithMaxMessageRate(100)
hub := wshub.NewHub(wshub.WithLimits(limits))Scale horizontally by connecting multiple hub instances through a shared message bus. All broadcasts and targeted sends are automatically relayed across nodes.
import wshubredis "github.com/KARTIKrocks/wshub/adapter/redis"
rdb := goredis.NewClient(&goredis.Options{Addr: "localhost:6379"})
adapter := wshubredis.New(rdb)
hub := wshub.NewHub(
wshub.WithAdapter(adapter),
wshub.WithNodeID("pod-web-1"), // optional: stable ID for debugging
)
go hub.Run()| Adapter | Install | Best For |
|---|---|---|
| Redis | go get github.com/KARTIKrocks/wshub/adapter/redis |
Most deployments, easy setup |
| NATS | go get github.com/KARTIKrocks/wshub/adapter/nats |
Low-latency, high-throughput |
| Custom | Implement wshub.Adapter interface |
Any message bus |
Adapters are separate Go modules -- importing the core wshub package never pulls in Redis or NATS dependencies.
| Operation | Cross-Node |
|---|---|
Broadcast, BroadcastBinary, BroadcastText, BroadcastJSON, BroadcastRawJSON |
Yes |
BroadcastExcept |
Yes |
BroadcastToRoom, BroadcastToRoomExcept |
Yes |
SendToUser |
Yes |
SendToClient |
Yes |
JoinRoom, LeaveRoom |
No (local per hub) |
GetClient, ClientCount |
No (local per hub) |
Enable presence gossip to get cluster-wide totals:
hub := wshub.NewHub(
wshub.WithAdapter(adapter),
wshub.WithPresence(5 * time.Second), // publish stats every 5s
)
hub.GlobalClientCount() // total across all nodes
hub.GlobalRoomCount("general") // room members across all nodesNodes that miss 3 consecutive heartbeats are automatically evicted from the totals.
For zero-downtime rolling deploys (e.g. Kubernetes), call Drain before Shutdown. Drain stops accepting new connections (HTTP 503) while letting existing connections finish their in-flight messages. Idle connections are proactively closed after the drain timeout.
// preStop / SIGTERM handler
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hub.Drain(ctx) // stop new connections, wait for existing ones
hub.Shutdown(ctx) // force-close anything remaininghub := wshub.NewHub(
// Configure idle connection reaper timeout (default: 30s).
// Connections idle for this duration during drain are closed with CloseGoingAway.
// Set to 0 to disable the reaper entirely.
wshub.WithDrainTimeout(15 * time.Second),
)// Drop-in HTTP handlers — respond with JSON and correct status codes
http.Handle("/healthz", hub.HealthHandler()) // 200 while Run() is alive, else 503
http.Handle("/readyz", hub.ReadyHandler()) // 200 while running, 503 when draining/stopped
// Programmatic access
hs := hub.Health() // HealthStatus{Alive, Ready, State, Uptime, Clients}
hub.Alive() // true only while Run() goroutine is executing
hub.Ready() // true when alive and in StateRunning
hub.Uptime() // time.Duration since Run() started (0 if not started or exited)When a client's send buffer is full, configure how messages are handled:
hub := wshub.NewHub(
// DropNewest (default): discard the new message
// DropOldest: evict the oldest queued message to make room
wshub.WithDropPolicy(wshub.DropOldest),
wshub.WithHooks(wshub.Hooks{
OnSendDropped: func(client *wshub.Client, data []byte) {
log.Printf("dropped %d bytes for client %s", len(data), client.ID)
// Options: disconnect slow client, log, queue externally
// client.Close()
},
}),
)| Policy | Behavior | Best For |
|---|---|---|
DropNewest |
Discards the new message | Default, safe |
DropOldest |
Evicts oldest queued message | Real-time data (dashboards, tickers, game state) |
When throughput is high and messages queue up, enable write coalescing to batch multiple text messages into a single WebSocket frame separated by newlines (\n). This reduces syscalls at the cost of receivers needing to split frames:
cfg := wshub.DefaultConfig().WithCoalesceWrites(true)
hub := wshub.NewHub(wshub.WithConfig(cfg))- Only text messages are coalesced; binary messages are always sent as individual frames
- Receivers must split coalesced frames on
\nto recover individual messages - When disabled (default), every message is its own frame — no behavior change
err := hub.JoinRoom(client, room)
switch err {
case wshub.ErrClientNotFound:
// Client not registered
case wshub.ErrAlreadyInRoom:
// Client already in room
case wshub.ErrEmptyRoomName:
// Empty room name
case wshub.ErrRoomNotFound:
// Room doesn't exist
case wshub.ErrNotInRoom:
// Client not in room
case wshub.ErrConnectionClosed:
// Connection was closed
case wshub.ErrSendBufferFull:
// Send buffer full
case wshub.ErrHubNotStarted:
// Hub Run() has not been called yet
case wshub.ErrHubDraining:
// Hub is draining, not accepting new connections
case wshub.ErrHubStopped:
// Hub has been shut down
case wshub.ErrMaxConnectionsReached:
// Connection limit reached
case wshub.ErrMaxRoomsReached:
// Room limit per client reached
case wshub.ErrRoomFull:
// Room is full
case wshub.ErrRateLimitExceeded:
// Rate limit exceeded
case wshub.ErrAuthenticationFailed:
// Authentication failed
case wshub.ErrUnauthorized:
// Unauthorized action
}See examples/chat/ for a complete chat application demonstrating:
- Room management
- Username tracking
- Message broadcasting
- Middleware (recovery + logging)
- Rate limiting
- Connection limits
Save as index.html and open in a browser while the server is running:
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Test</h1>
<div>
<input type="text" id="message" placeholder="Type a message" />
<button onclick="send()">Send</button>
</div>
<div id="messages"></div>
<script>
const ws = new WebSocket("ws://localhost:8080/ws");
ws.onopen = () => {
console.log("Connected");
addMessage("Connected to server");
};
ws.onmessage = (event) => {
addMessage("Received: " + event.data);
};
ws.onclose = () => {
addMessage("Disconnected");
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
addMessage("Error occurred");
};
function send() {
const input = document.getElementById("message");
ws.send(input.value);
addMessage("Sent: " + input.value);
input.value = "";
}
function addMessage(msg) {
const div = document.getElementById("messages");
div.innerHTML += "<p>" + msg + "</p>";
}
</script>
</body>
</html>- Always use middleware for cross-cutting concerns (logging, metrics, auth)
- Use hooks for lifecycle events instead of wrapping the hub
- Implement proper logging and metrics for production observability
- Set appropriate limits to prevent resource exhaustion
- Use
DrainthenShutdownfor zero-downtime deploys - Handle errors appropriately - don't ignore send failures
- Use rooms for targeted messaging instead of filtering in handlers
- Set user ID after authentication for multi-device support
- Use metadata for request-scoped data instead of global state
- Test with concurrent clients to ensure thread safety
- Increase
SendChannelSizefor high-throughput scenarios - Enable compression for large messages
- Use
BroadcastWithContextfor timeout control - Batch messages when possible
- Monitor send buffer sizes via metrics
- Use
WithParallelBroadcast(batchSize)for 1000+ concurrent clients — dispatches batches to a persistent worker pool instead of spawning goroutines per broadcast - Use
WithParallelBroadcastWorkers(n)to tune the pool size (default:runtime.NumCPU())
Measured on an Intel i5-11400H @ 2.70GHz (12 cores), Go 1.26, Linux. See performance highlights for a quick summary.
Run them yourself:
go test -bench=. -benchmem ./...| Operation | Clients | Time | Allocs |
|---|---|---|---|
Broadcast |
100,000 | 22.0 ms | 0 |
Broadcast |
1,000,000 | 263 ms | 0 |
BroadcastToRoom |
100,000 | 23.2 ms | 0 |
BroadcastToRoom |
1,000,000 | 260 ms | 0 |
BroadcastExcept |
100,000 | 25.9 ms | 1 |
BroadcastExcept |
1,000,000 | 294 ms | 1 |
BroadcastToRoomExcept |
100,000 | 26.0 ms | 1 |
BroadcastToRoomExcept |
1,000,000 | 277 ms | 1 |
Uses a persistent worker pool instead of spawning goroutines per broadcast. The hub snapshot slice is pre-built on register/unregister, so parallel broadcasts allocate nothing beyond the pool task. Enable with WithParallelBroadcast(batchSize).
| Operation | Clients | Time | Allocs |
|---|---|---|---|
BroadcastParallel |
100 | 5.6 μs | 0 |
BroadcastParallel |
10,000 | 989 μs | 1 |
BroadcastParallel |
50,000 | 5.5 ms | 1 |
| Operation | Scale | Time | Allocs |
|---|---|---|---|
SendToClient |
100,000 clients | 129 ns | 0 |
SendToClient |
1,000,000 clients | 130 ns | 0 |
SendToUser |
100,000 users | 198 ns | 1 |
SendToUser |
1,000,000 users | 192 ns | 1 |
| Operation | Nodes | Time | Allocs |
|---|---|---|---|
GlobalClientCount |
5 | 63 ns | 0 |
GlobalClientCount |
50 | 397 ns | 0 |
GlobalClientCount |
100 | 715 ns | 0 |
GlobalClientCount |
500 | 4.2 μs | 0 |
GlobalRoomCount |
5 | 118 ns | 0 |
GlobalRoomCount |
50 | 823 ns | 0 |
GlobalRoomCount |
100 | 1.7 μs | 0 |
GlobalRoomCount |
500 | 9.7 μs | 0 |
| Payload | Time (100 clients) | Allocs |
|---|---|---|
| 64 B | 5.7 μs | 0 |
| 512 B | 5.5 μs | 0 |
| 4 KB | 5.4 μs | 0 |
| 64 KB | 5.7 μs | 0 |
| Operation | Time | Allocs |
|---|---|---|
GetClient (1,000 clients) |
17.7 ns | 0 |
ClientCount |
0.28 ns | 0 |
GetClientByUserID |
51.3 ns | 0 |
RoomExists |
23.6 ns | 0 |
RoomCount |
22.1 ns | 0 |
GetMetadata |
17.0 ns | 0 |
SetMetadata |
30.6 ns | 0 |
| Operation | Time | Allocs |
|---|---|---|
Send (text) |
82.9 ns | 1 |
SendJSON |
495 ns | 5 |
| Mode | Time | Allocs |
|---|---|---|
| Built (cached) | 14.3 ns | 0 |
| Unbuilt (on-the-fly) | 17.0 ns | 0 |
Always call
Build()on your middleware chain for best performance.
| Operation | Time | Allocs |
|---|---|---|
GetClient |
31.0 ns | 0 |
ClientCount |
0.23 ns | 0 |
Metadata (set+get) |
76.5 ns | 0 |
Broadcast (100 clients) |
5.9 μs | 120 |
| Operation | Time | Allocs |
|---|---|---|
NewMessage |
30.5 ns | 0 |
NewTextMessage |
32.0 ns | 0 |
NewBinaryMessage |
30.2 ns | 0 |
NewJSONMessage |
820 ns | 9 |
NewRawJSONMessage |
30.9 ns | 0 |
All Hub and Client methods are thread-safe. The package uses:
- RWMutex for client/room maps
- Separate mutexes for callbacks
- Channels for cross-goroutine communication
- WaitGroups for graceful shutdown
Contributions welcome! Please read CONTRIBUTING.md for guidelines.