diff --git a/client.go b/client.go index cf9c91a..f2aec7d 100644 --- a/client.go +++ b/client.go @@ -51,7 +51,7 @@ type Client[T any] struct { type testHooks struct { beforeSingleflightStart func(ctx context.Context, key string) afterSingleflightStart func(ctx context.Context, key string) - afterMarkRecentWrite func(ctx context.Context, key string) + afterSingleflightEnd func(ctx context.Context, key string) } // NewClient creates a new client that manages the backend cache and fetches from upstream. @@ -332,6 +332,9 @@ func (c *Client[T]) fetchFromUpstreamWithSFKey(ctx context.Context, key string, case <-ctx.Done(): return zero, errors.Wrapf(ctx.Err(), "context cancelled during fetch for key: %s", key) case res := <-resChan: + if c.testHooks != nil && c.testHooks.afterSingleflightEnd != nil { + c.testHooks.afterSingleflightEnd(ctx, key) + } if res.Err != nil { return zero, res.Err } @@ -394,10 +397,6 @@ func (c *Client[T]) markRecentWrite(ctx context.Context, key string) { if err != nil { c.logger.WarnContext(ctx, "failed to mark recent write", "key", key, "error", err) } - - if c.testHooks != nil && c.testHooks.afterMarkRecentWrite != nil { - c.testHooks.afterMarkRecentWrite(ctx, key) - } } // wasRecentlyWritten checks if a key was written (Set or Del) recently diff --git a/double_check_test.go b/double_check_test.go index e3fcb00..bbd02ed 100644 --- a/double_check_test.go +++ b/double_check_test.go @@ -272,8 +272,8 @@ func TestDoubleCheck(t *testing.T) { <-aCanContinue } }, - // Hook 2: Confirm A completed marking recent write - afterMarkRecentWrite: func(ctx context.Context, key string) { + // Hook 2: Confirm A's singleflight completely finished + afterSingleflightEnd: func(ctx context.Context, key string) { if key == "key1" && ctx.Value(ctxKeyRequestA) != nil { close(aCompleted) } @@ -312,7 +312,7 @@ func TestDoubleCheck(t *testing.T) { valueB, errB = client.Get(ctxB, "key1") }() - // Wait for A to complete marking recent write + // Wait for A to complete singleflight (including cleanup) <-aCompleted // Advance time if needed (for beyond-window test) diff --git a/ristretto.go b/ristretto.go index b670cc4..43d56ff 100644 --- a/ristretto.go +++ b/ristretto.go @@ -57,14 +57,20 @@ func (r *RistrettoCache[T]) Set(_ context.Context, key string, value T) error { // 1. Cost exceeds MaxCost (unlikely with cost=1) // 2. Rejected by admission policy (W-TinyLFU) // + // When false is returned, the item is dropped and not added to the cache. + // When true is returned, the item enters the buffer and will be processed. + // // We don't treat rejection as an error because: // - Admission policy rejection is a feature, not a bug // - It prevents cache pollution by low-value entries // - The cache remains consistent and functional + // - Callers can still retrieve data from upstream on cache miss success := r.cache.SetWithTTL(key, value, 1, r.ttl) if success { + // Wait ensures all buffered writes are applied before returning r.cache.Wait() } + // Note: When success is false, there are no buffered writes to wait for return nil }