Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Client[T any] struct {
type testHooks struct {
beforeSingleflightStart func(ctx context.Context, key string)
afterSingleflightStart func(ctx context.Context, key string)
afterMarkRecentWrite func(ctx context.Context, key string)
afterSingleflightEnd func(ctx context.Context, key string)
}

// NewClient creates a new client that manages the backend cache and fetches from upstream.
Expand Down Expand Up @@ -332,6 +332,9 @@ func (c *Client[T]) fetchFromUpstreamWithSFKey(ctx context.Context, key string,
case <-ctx.Done():
return zero, errors.Wrapf(ctx.Err(), "context cancelled during fetch for key: %s", key)
case res := <-resChan:
if c.testHooks != nil && c.testHooks.afterSingleflightEnd != nil {
c.testHooks.afterSingleflightEnd(ctx, key)
}
if res.Err != nil {
return zero, res.Err
}
Expand Down Expand Up @@ -394,10 +397,6 @@ func (c *Client[T]) markRecentWrite(ctx context.Context, key string) {
if err != nil {
c.logger.WarnContext(ctx, "failed to mark recent write", "key", key, "error", err)
}

if c.testHooks != nil && c.testHooks.afterMarkRecentWrite != nil {
c.testHooks.afterMarkRecentWrite(ctx, key)
}
}

// wasRecentlyWritten checks if a key was written (Set or Del) recently
Expand Down
6 changes: 3 additions & 3 deletions double_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ func TestDoubleCheck(t *testing.T) {
<-aCanContinue
}
},
// Hook 2: Confirm A completed marking recent write
afterMarkRecentWrite: func(ctx context.Context, key string) {
// Hook 2: Confirm A's singleflight completely finished
afterSingleflightEnd: func(ctx context.Context, key string) {
if key == "key1" && ctx.Value(ctxKeyRequestA) != nil {
close(aCompleted)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestDoubleCheck(t *testing.T) {
valueB, errB = client.Get(ctxB, "key1")
}()

// Wait for A to complete marking recent write
// Wait for A to complete singleflight (including cleanup)
<-aCompleted

// Advance time if needed (for beyond-window test)
Expand Down
6 changes: 6 additions & 0 deletions ristretto.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,20 @@ func (r *RistrettoCache[T]) Set(_ context.Context, key string, value T) error {
// 1. Cost exceeds MaxCost (unlikely with cost=1)
// 2. Rejected by admission policy (W-TinyLFU)
//
// When false is returned, the item is dropped and not added to the cache.
// When true is returned, the item enters the buffer and will be processed.
//
// We don't treat rejection as an error because:
// - Admission policy rejection is a feature, not a bug
// - It prevents cache pollution by low-value entries
// - The cache remains consistent and functional
// - Callers can still retrieve data from upstream on cache miss
success := r.cache.SetWithTTL(key, value, 1, r.ttl)
if success {
// Wait ensures all buffered writes are applied before returning
r.cache.Wait()
}
// Note: When success is false, there are no buffered writes to wait for
return nil
}

Expand Down
Loading