-
Notifications
You must be signed in to change notification settings - Fork 1
feat: Redis Pub/Sub support #135
Copy link
Copy link
Open
Labels
area/driverDatabase/cache driver infrastructureDatabase/cache driver infrastructuredriver/redisRedis driverRedis driver
Milestone
Description
Summary
Implement Redis Pub/Sub — subscribe to channels and receive push messages. Pub/Sub uses a dedicated connection in push mode.
Design
Package: driver/redis/pubsub.go
API
type PubSub struct {
conn *redisConn // dedicated connection in push mode
channels map[string]bool
patterns map[string]bool
msgCh chan *Message
}
type Message struct {
Channel string
Pattern string // set for pattern subscriptions
Payload string
}
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub
func (c *Client) PSubscribe(ctx context.Context, patterns ...string) *PubSub
func (ps *PubSub) Channel() <-chan *Message
func (ps *PubSub) Subscribe(ctx context.Context, channels ...string) error
func (ps *PubSub) Unsubscribe(ctx context.Context, channels ...string) error
func (ps *PubSub) PSubscribe(ctx context.Context, patterns ...string) error
func (ps *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error
func (ps *PubSub) Close() errorPush Message Handling
Pub/Sub connections switch to push mode — the server sends messages asynchronously:
> SUBSCRIBE mychannel
< *3\r\n$9\r\nsubscribe\r\n$9\r\nmychannel\r\n:1\r\n
(push message arrives)
< *3\r\n$7\r\nmessage\r\n$9\r\nmychannel\r\n$5\r\nhello\r\n
In event loop model:
ProcessRedisdetects push messages (3-element array starting with "message"/"pmessage")- Routes to
PubSub.msgChchannel - Handler goroutine reads from
Channel()chan
RESP3 Push Type
With RESP3, push messages use the > prefix type instead of regular arrays. The parser must handle both RESP2 (array-based) and RESP3 (push-typed) push messages.
Reconnection
- If the connection drops, PubSub auto-resubscribes to all channels/patterns
- Messages during reconnection are lost (Redis Pub/Sub is at-most-once)
- Reconnection attempt with exponential backoff
Acceptance Criteria
- SUBSCRIBE/UNSUBSCRIBE for channels
- PSUBSCRIBE/PUNSUBSCRIBE for patterns
-
Channel()returns messages as they arrive - Both RESP2 and RESP3 push message formats handled
- Dedicated connection (not shared with command traffic)
- Auto-reconnect with channel resubscription
- Close cleanly unsubscribes and closes connection
- Integration test: publish on one client, receive on subscriber
- Concurrent subscribe/unsubscribe safety
Dependencies
- Depends on 133 (Redis client + RedisState for push detection)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area/driverDatabase/cache driver infrastructureDatabase/cache driver infrastructuredriver/redisRedis driverRedis driver