Skip to content

feat: Redis Pub/Sub support #135

@FumingPower3925

Description

@FumingPower3925

Summary

Implement Redis Pub/Sub — subscribe to channels and receive push messages. Pub/Sub uses a dedicated connection in push mode.

Design

Package: driver/redis/pubsub.go

API

type PubSub struct {
    conn     *redisConn      // dedicated connection in push mode
    channels map[string]bool
    patterns map[string]bool
    msgCh    chan *Message
}

type Message struct {
    Channel string
    Pattern string   // set for pattern subscriptions
    Payload string
}

func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub
func (c *Client) PSubscribe(ctx context.Context, patterns ...string) *PubSub

func (ps *PubSub) Channel() <-chan *Message
func (ps *PubSub) Subscribe(ctx context.Context, channels ...string) error
func (ps *PubSub) Unsubscribe(ctx context.Context, channels ...string) error
func (ps *PubSub) PSubscribe(ctx context.Context, patterns ...string) error
func (ps *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error
func (ps *PubSub) Close() error

Push Message Handling

Pub/Sub connections switch to push mode — the server sends messages asynchronously:

> SUBSCRIBE mychannel
< *3\r\n$9\r\nsubscribe\r\n$9\r\nmychannel\r\n:1\r\n

(push message arrives)
< *3\r\n$7\r\nmessage\r\n$9\r\nmychannel\r\n$5\r\nhello\r\n

In event loop model:

  • ProcessRedis detects push messages (3-element array starting with "message"/"pmessage")
  • Routes to PubSub.msgCh channel
  • Handler goroutine reads from Channel() chan

RESP3 Push Type

With RESP3, push messages use the > prefix type instead of regular arrays. The parser must handle both RESP2 (array-based) and RESP3 (push-typed) push messages.

Reconnection

  • If the connection drops, PubSub auto-resubscribes to all channels/patterns
  • Messages during reconnection are lost (Redis Pub/Sub is at-most-once)
  • Reconnection attempt with exponential backoff

Acceptance Criteria

  • SUBSCRIBE/UNSUBSCRIBE for channels
  • PSUBSCRIBE/PUNSUBSCRIBE for patterns
  • Channel() returns messages as they arrive
  • Both RESP2 and RESP3 push message formats handled
  • Dedicated connection (not shared with command traffic)
  • Auto-reconnect with channel resubscription
  • Close cleanly unsubscribes and closes connection
  • Integration test: publish on one client, receive on subscriber
  • Concurrent subscribe/unsubscribe safety

Dependencies

  • Depends on 133 (Redis client + RedisState for push detection)

Metadata

Metadata

Labels

area/driverDatabase/cache driver infrastructuredriver/redisRedis driver

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions