-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.go
More file actions
223 lines (197 loc) · 5.34 KB
/
client.go
File metadata and controls
223 lines (197 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package main
import (
"crypto/rand"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 60 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
sendBufferSize = 512
// voiceMagic0/1 are the magic bytes that identify voice packets (0x4B56 = "KV").
// Voice packets must be sent as individual WebSocket frames — never batched
// with newline separators — because encrypted binary data may contain 0x0A bytes.
voiceMagic0 = 0x4B
voiceMagic1 = 0x56
)
func newConnID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
log.Printf("rand.Read failed: %v", err)
return ""
}
return fmt.Sprintf("%x", b)
}
// extractFromField extracts the "from" field from a JSON envelope.
// Returns empty string on any error.
func extractFromField(data []byte) string {
var env struct {
From string `json:"from"`
}
if json.Unmarshal(data, &env) == nil {
return env.From
}
return ""
}
// isVoicePacket returns true if data starts with the voice magic bytes.
func isVoicePacket(data []byte) bool {
return len(data) >= 2 && data[0] == voiceMagic0 && data[1] == voiceMagic1
}
type Client struct {
hub *Hub
conn *websocket.Conn
roomID string
peerID string // from JWT (used in leave notifications)
connID string // unique per connection (used for room tracking)
role string
ip string
send chan []byte
closeOnce sync.Once
}
func NewClient(hub *Hub, conn *websocket.Conn, roomID, peerID, role, ip string) *Client {
return &Client{
hub: hub,
conn: conn,
roomID: roomID,
peerID: peerID,
connID: newConnID(),
role: role,
ip: ip,
send: make(chan []byte, sendBufferSize),
}
}
func (c *Client) ReadPump() {
defer func() {
c.hub.Unregister(c)
c.conn.Close()
}()
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
return c.conn.SetReadDeadline(time.Now().Add(pongWait))
})
peerIDLearned := false
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("read error peer=%s room=%s: %v", c.peerID, c.roomID, err)
}
return
}
// Learn the client's actual peerID from the first non-voice message.
// The client may generate a fresh UUID that differs from the JWT's
// peer_id (e.g. when multiple guests reuse one invite link).
if !peerIDLearned && !isVoicePacket(message) {
if realID := extractFromField(message); realID != "" && realID != c.peerID {
log.Printf("peer %s identified as %s (room %s)", c.peerID, realID, c.roomID)
c.peerID = realID
}
peerIDLearned = true
}
c.hub.Broadcast(&BroadcastMsg{
RoomID: c.roomID,
SenderID: c.connID,
Data: message,
})
}
}
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// Voice packets MUST be sent as individual frames — never batched.
// Encrypted binary voice data may contain 0x0A (newline) bytes,
// which would corrupt the message if batched with '\n' separator.
if isVoicePacket(message) {
if err := c.conn.WriteMessage(websocket.BinaryMessage, message); err != nil {
return
}
// After sending voice, drain any more voice packets immediately
// for minimal latency, but don't batch them.
for {
select {
case next, ok2 := <-c.send:
if !ok2 {
return
}
if isVoicePacket(next) {
if err := c.conn.WriteMessage(websocket.BinaryMessage, next); err != nil {
return
}
} else {
// Non-voice message: send it normally, potentially batching
if err := c.writeDataMessage(next); err != nil {
return
}
goto afterDrain
}
default:
goto afterDrain
}
}
afterDrain:
continue
}
// Data (non-voice) messages: batch with newline separator for throughput
if err := c.writeDataMessage(message); err != nil {
return
}
case <-ticker.C:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// writeDataMessage writes a data message, batching any queued non-voice messages.
func (c *Client) writeDataMessage(message []byte) error {
w, err := c.conn.NextWriter(websocket.BinaryMessage)
if err != nil {
return err
}
_, _ = w.Write(message)
// Drain queued data messages into the same write (batching for throughput).
// Voice packets in the queue are sent separately after closing this writer.
var pendingVoice [][]byte
n := len(c.send)
for i := 0; i < n; i++ {
next := <-c.send
if isVoicePacket(next) {
pendingVoice = append(pendingVoice, next)
} else {
_, _ = w.Write([]byte{'\n'})
_, _ = w.Write(next)
}
}
if err := w.Close(); err != nil {
return err
}
// Flush any voice packets that were queued between data messages
for _, vp := range pendingVoice {
if err := c.conn.WriteMessage(websocket.BinaryMessage, vp); err != nil {
return err
}
}
return nil
}
func (c *Client) Close() {
c.closeOnce.Do(func() {
close(c.send)
})
}