-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_test.go
More file actions
318 lines (263 loc) · 8.81 KB
/
stream_test.go
File metadata and controls
318 lines (263 loc) · 8.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package tok_test
import (
"strings"
"sync"
"testing"
"time"
"github.com/GrayCodeAI/tok"
)
func TestStreamCompressor_BasicAppendAndSnapshot(t *testing.T) {
sc := tok.NewStreamCompressor(0) // default threshold
defer sc.Close()
sc.Append("Hello world")
sc.Append("This is a test")
snapshot, _ := sc.Snapshot()
if snapshot == "" {
t.Fatal("expected non-empty snapshot")
}
raw := sc.Raw()
if !strings.Contains(raw, "Hello world") {
t.Error("raw should contain appended content")
}
if !strings.Contains(raw, "This is a test") {
t.Error("raw should contain all appended content")
}
}
func TestStreamCompressor_EmptyAppend(t *testing.T) {
sc := tok.NewStreamCompressor(100)
defer sc.Close()
sc.Append("")
raw := sc.Raw()
if raw != "" {
t.Errorf("expected empty raw after appending empty string, got %q", raw)
}
}
func TestStreamCompressor_ThresholdTriggersCompression(t *testing.T) {
// Use a very low threshold so compression triggers quickly.
sc := tok.NewStreamCompressor(10, tok.Aggressive)
defer sc.Close()
// Append enough content to exceed the threshold.
bigContent := strings.Repeat("The quick brown fox jumped over the lazy dog. ", 50)
sc.Append(bigContent)
// Wait for background compression to complete.
time.Sleep(500 * time.Millisecond)
snapshot, stats := sc.Snapshot()
if snapshot == "" {
t.Fatal("expected non-empty compressed snapshot after threshold exceeded")
}
if stats.OriginalTokens == 0 {
t.Error("expected non-zero original tokens in stats after compression")
}
}
func TestStreamCompressor_SnapshotBeforeCompression(t *testing.T) {
// High threshold so compression never triggers.
sc := tok.NewStreamCompressor(999999)
defer sc.Close()
sc.Append("some content")
snapshot, stats := sc.Snapshot()
if snapshot == "" {
t.Fatal("expected non-empty snapshot (should return raw)")
}
if stats.OriginalTokens != 0 {
t.Error("expected zero stats when compression hasn't run")
}
}
func TestStreamCompressor_TokenCount(t *testing.T) {
sc := tok.NewStreamCompressor(999999)
defer sc.Close()
sc.Append("hello world this is a token counting test")
count := sc.TokenCount()
if count == 0 {
t.Error("expected non-zero token count")
}
}
func TestStreamCompressor_ConcurrentAccess(t *testing.T) {
sc := tok.NewStreamCompressor(50)
defer sc.Close()
var wg sync.WaitGroup
// Multiple goroutines appending concurrently.
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sc.Append("concurrent content that is long enough to matter for tokens. ")
}()
}
// Multiple goroutines reading snapshots concurrently.
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
snapshot, _ := sc.Snapshot()
_ = snapshot
}()
}
// Multiple goroutines reading raw concurrently.
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = sc.Raw()
}()
}
// Multiple goroutines reading token count concurrently.
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = sc.TokenCount()
}()
}
wg.Wait()
}
func TestStreamCompressor_NoDoubleCompression(t *testing.T) {
// Verify rapid Append calls don't spawn multiple goroutines.
// We use a low threshold to ensure compression would trigger.
sc := tok.NewStreamCompressor(5, tok.Minimal)
defer sc.Close()
// Rapidly append many times; only one compression should be in progress at a time.
for i := 0; i < 50; i++ {
sc.Append("word word word word word word word word word word ")
}
// Allow time for background compression.
time.Sleep(500 * time.Millisecond)
snapshot, _ := sc.Snapshot()
if snapshot == "" {
t.Fatal("expected non-empty snapshot after many appends")
}
}
func TestStreamCompressor_CloseWaitsForCompression(t *testing.T) {
sc := tok.NewStreamCompressor(5, tok.Aggressive)
// Append enough to trigger compression.
bigContent := strings.Repeat("token token token token token ", 100)
sc.Append(bigContent)
// Close should wait for any in-progress compression and not panic.
sc.Close()
// After close, Snapshot should still return valid data.
snapshot, _ := sc.Snapshot()
if snapshot == "" {
t.Fatal("expected non-empty snapshot after close")
}
}
func TestStreamCompressor_DefaultThreshold(t *testing.T) {
// Passing 0 should default to 2000.
sc := tok.NewStreamCompressor(0)
defer sc.Close()
// Small content should not trigger compression.
sc.Append("small text")
time.Sleep(100 * time.Millisecond)
_, stats := sc.Snapshot()
if stats.OriginalTokens != 0 {
t.Error("expected no compression for content well below default threshold")
}
}
func TestStreamCompressor_DeltaAccumulatesCompressedOutput(t *testing.T) {
// Use a low threshold so compression triggers on each burst.
sc := tok.NewStreamCompressor(10, tok.Minimal)
defer sc.Close()
// First burst: triggers initial compression.
burst1 := strings.Repeat("alpha bravo charlie delta echo foxtrot ", 20)
sc.Append(burst1)
time.Sleep(500 * time.Millisecond)
snap1, stats1 := sc.Snapshot()
if snap1 == "" {
t.Fatal("expected non-empty snapshot after first burst")
}
if stats1.OriginalTokens == 0 {
t.Fatal("expected non-zero original tokens after first compression")
}
origTokens1 := stats1.OriginalTokens
// Second burst: should compress only the delta and accumulate.
burst2 := strings.Repeat("golf hotel india juliet kilo lima ", 20)
sc.Append(burst2)
time.Sleep(500 * time.Millisecond)
snap2, stats2 := sc.Snapshot()
if snap2 == "" {
t.Fatal("expected non-empty snapshot after second burst")
}
// The accumulated compressed output should be longer than after the first burst.
if len(snap2) <= len(snap1) {
t.Errorf("expected accumulated compressed output to grow: snap1=%d, snap2=%d", len(snap1), len(snap2))
}
// Stats should accumulate across both delta compressions.
if stats2.OriginalTokens <= origTokens1 {
t.Errorf("expected accumulated original tokens to grow: first=%d, accumulated=%d", origTokens1, stats2.OriginalTokens)
}
}
func TestStreamCompressor_DeltaDoesNotRecompressFullContent(t *testing.T) {
// Use a low threshold so compression triggers early.
sc := tok.NewStreamCompressor(10, tok.Minimal)
defer sc.Close()
// First burst: large initial content.
burst1 := strings.Repeat("the quick brown fox jumps over the lazy dog again and again. ", 30)
sc.Append(burst1)
time.Sleep(500 * time.Millisecond)
_, stats1 := sc.Snapshot()
tokensAfterFirst := stats1.OriginalTokens
// Second burst: small delta -- should NOT re-compress the full content.
// The original tokens in the second compression should reflect only the delta,
// not the entire accumulated raw content.
burst2 := "small delta"
sc.Append(burst2)
time.Sleep(500 * time.Millisecond)
_, stats2 := sc.Snapshot()
// The incremental original tokens should be roughly the delta size,
// not the full accumulated content.
incrementalTokens := stats2.OriginalTokens - tokensAfterFirst
fullContentTokens := tok.EstimateTokens(burst1 + "\n" + burst2)
// If the full content was re-compressed, incrementalTokens would be ~fullContentTokens.
// With delta-only, it should be much smaller (just the delta).
if incrementalTokens > fullContentTokens/2 {
t.Errorf("delta compression appears to have re-compressed full content: incremental=%d, fullContent=%d",
incrementalTokens, fullContentTokens)
}
}
func TestStreamCompressor_Reset(t *testing.T) {
sc := tok.NewStreamCompressor(5, tok.Minimal)
// Append enough to trigger compression.
sc.Append(strings.Repeat("hello world testing reset functionality. ", 20))
time.Sleep(500 * time.Millisecond)
snap, stats := sc.Snapshot()
if snap == "" {
t.Fatal("expected non-empty snapshot before reset")
}
if stats.OriginalTokens == 0 {
t.Fatal("expected non-zero stats before reset")
}
// Reset should clear everything.
sc.Reset()
snapAfter, statsAfter := sc.Snapshot()
if snapAfter != "" {
t.Errorf("expected empty snapshot after reset, got %q", snapAfter)
}
if statsAfter.OriginalTokens != 0 {
t.Error("expected zero stats after reset")
}
if sc.Raw() != "" {
t.Error("expected empty raw after reset")
}
// After reset, appending new content should work fresh.
sc.Append("fresh content after reset")
time.Sleep(100 * time.Millisecond)
raw := sc.Raw()
if !strings.Contains(raw, "fresh content after reset") {
t.Error("expected raw to contain new content after reset")
}
sc.Close()
}
func TestStreamCompressor_DeltaStatsAccumulate(t *testing.T) {
sc := tok.NewStreamCompressor(5, tok.Minimal)
defer sc.Close()
// Append multiple bursts, each triggering compression.
for i := 0; i < 3; i++ {
sc.Append(strings.Repeat("token stats accumulate test content here. ", 10))
time.Sleep(500 * time.Millisecond)
}
_, stats := sc.Snapshot()
if stats.TokensSaved <= 0 {
t.Error("expected positive tokens saved after multiple compressions")
}
if stats.ReductionPercent <= 0 {
t.Error("expected positive reduction percent after multiple compressions")
}
}