-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubscription.go
More file actions
90 lines (69 loc) · 2.08 KB
/
subscription.go
File metadata and controls
90 lines (69 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package pubsub
import (
"context"
"sync"
)
type Client interface {
Subscribe(ctx context.Context, channels ...string) error
Unsubscribe(ctx context.Context, channels ...string) error
}
type Subscriber[T any] interface {
Dispatch(payload T)
}
type SubscriberMap[T any] map[Subscriber[T]]bool
type SubscriptionMap[T any] map[string]SubscriberMap[T]
type SubscriptionManager[T any] struct {
mu sync.Mutex
pubsub Client
subscriptions SubscriptionMap[T]
}
func (m *SubscriptionManager[T]) add(topic string, sub Subscriber[T]) {
if m.subscriptions[topic] == nil {
m.subscriptions[topic] = SubscriberMap[T]{sub: true}
} else {
m.subscriptions[topic][sub] = true
}
}
func (m *SubscriptionManager[T]) Add(ctx context.Context, topic string, sub Subscriber[T]) error {
return m.AddMulti(ctx, []string{topic}, sub)
}
func (m *SubscriptionManager[T]) AddMulti(ctx context.Context, topics []string, sub Subscriber[T]) error {
m.mu.Lock()
defer m.mu.Unlock()
var newTopics []string
for _, topic := range topics {
m.add(topic, sub)
if len(m.subscriptions[topic]) == 1 {
newTopics = append(newTopics, topic)
}
}
return m.pubsub.Subscribe(ctx, newTopics...)
}
func (m *SubscriptionManager[T]) Remove(ctx context.Context, topic string, sub Subscriber[T]) error {
return m.RemoveMulti(ctx, []string{topic}, sub)
}
func (m *SubscriptionManager[T]) RemoveMulti(ctx context.Context, topics []string, sub Subscriber[T]) error {
var emptyTopics []string
for _, topic := range topics {
if m.subscriptions[topic][sub] {
delete(m.subscriptions[topic], sub)
if len(m.subscriptions[topic]) == 0 {
emptyTopics = append(emptyTopics, topic)
delete(m.subscriptions, topic)
}
}
}
return m.pubsub.Unsubscribe(ctx, emptyTopics...)
}
func (m *SubscriptionManager[T]) Dispatch(topic string, payload T) {
subs := m.subscriptions[topic]
for sub := range subs {
sub.Dispatch(payload)
}
}
func NewSubscriptionManager[T any](client Client) *SubscriptionManager[T] {
return &SubscriptionManager[T]{
pubsub: client,
subscriptions: SubscriptionMap[T]{},
}
}