From 87a13844ba866ac1b96a89e98397dde85b38d3d7 Mon Sep 17 00:00:00 2001 From: "F." Date: Sat, 4 Apr 2026 00:30:36 +0200 Subject: [PATCH] chore(deps,refactor): bump tooling/Go deps and migrate distMetrics to typed atomics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tooling & dependency updates: - golangci-lint: v2.11.3 → v2.11.4 (pre-commit hook, project-settings.env, Makefile) - buf: v1.66.1 → v1.67.0 - go.opentelemetry.io/otel (core, metric, trace): v1.42.0 → v1.43.0 - andybalholm/brotli: v1.2.0 → v1.2.1 - klauspost/compress: v1.18.4 → v1.18.5 Refactor (pkg/backend/dist_memory.go): - Migrate all distMetrics fields from raw int64/uint64 to sync/atomic typed atomics (atomic.Int64, atomic.Uint64) - Replace all atomic.LoadInt64/StoreInt64/AddInt64 call sites with the idiomatic value-method equivalents (.Load(), .Store(), .Add()) - Eliminates unsafe pointer passing and makes atomic access self-documenting --- .pre-commit/golangci-lint-hook | 2 +- .project-settings.env | 4 +- Makefile | 4 +- go.mod | 10 +- go.sum | 20 +-- pkg/backend/dist_memory.go | 276 ++++++++++++++++----------------- 6 files changed, 158 insertions(+), 158 deletions(-) diff --git a/.pre-commit/golangci-lint-hook b/.pre-commit/golangci-lint-hook index afad091..86718b2 100755 --- a/.pre-commit/golangci-lint-hook +++ b/.pre-commit/golangci-lint-hook @@ -23,7 +23,7 @@ if [[ -f "${ROOT_DIR}/.project-settings.env" ]]; then # shellcheck disable=SC1090 source "${ROOT_DIR}/.project-settings.env" fi -GOLANGCI_LINT_VERSION="${GOLANGCI_LINT_VERSION:-v2.11.3}" +GOLANGCI_LINT_VERSION="${GOLANGCI_LINT_VERSION:-v2.11.4}" # ####################################### # Install dependencies to run the pre-commit hook diff --git a/.project-settings.env b/.project-settings.env index c6e8a86..7a77fa6 100644 --- a/.project-settings.env +++ b/.project-settings.env @@ -1,5 +1,5 @@ -GOLANGCI_LINT_VERSION=v2.11.3 -BUF_VERSION=v1.66.1 +GOLANGCI_LINT_VERSION=v2.11.4 +BUF_VERSION=v1.67.0 GO_VERSION=1.26.1 GCI_PREFIX=github.com/hyp3rd/hypercache PROTO_ENABLED=false diff --git a/Makefile b/Makefile index 42400d7..b8c9d8f 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ include .project-settings.env -GOLANGCI_LINT_VERSION ?= v2.11.3 -BUF_VERSION ?= v1.66.1 +GOLANGCI_LINT_VERSION ?= v2.11.4 +BUF_VERSION ?= v1.67.0 GO_VERSION ?= 1.26.1 GCI_PREFIX ?= github.com/hyp3rd/hypercache PROTO_ENABLED ?= true diff --git a/go.mod b/go.mod index 2d208a1..5e216ad 100644 --- a/go.mod +++ b/go.mod @@ -12,19 +12,19 @@ require ( github.com/redis/go-redis/v9 v9.18.0 github.com/shamaton/msgpack/v3 v3.1.0 github.com/ugorji/go/codec v1.3.1 - go.opentelemetry.io/otel v1.42.0 - go.opentelemetry.io/otel/metric v1.42.0 - go.opentelemetry.io/otel/trace v1.42.0 + go.opentelemetry.io/otel v1.43.0 + go.opentelemetry.io/otel/metric v1.43.0 + go.opentelemetry.io/otel/trace v1.43.0 ) require ( - github.com/andybalholm/brotli v1.2.0 // indirect + github.com/andybalholm/brotli v1.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/gofiber/schema v1.7.0 // indirect github.com/gofiber/utils/v2 v2.0.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.18.4 // indirect + github.com/klauspost/compress v1.18.5 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index 1e6f565..cd9f2cb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= -github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro= +github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -33,8 +33,8 @@ github.com/hyp3rd/ewrap v1.3.8 h1:36IYDgSWI5wG85G+CIwE7WvU5xi+FJvT8KWR8YVT+cA= github.com/hyp3rd/ewrap v1.3.8/go.mod h1:ly3lreW7OWbBaX9I4zTKqctJlf9uxNQiUD5zXl2vz4g= github.com/hyp3rd/sectools v1.2.3 h1:XElGIhLOWPJxVLyLPzfKASYjs+3yEkDN48JeSw/Wvjo= github.com/hyp3rd/sectools v1.2.3/go.mod h1:iwl65boK1VNhwvRNSQDItdD5xon8W1l+ox4JFTe5WbI= -github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= -github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -82,12 +82,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= -go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= -go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= -go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= -go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= -go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index e215209..dbf1af9 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -68,7 +68,7 @@ type DistMemory struct { // consistency / versioning (initial) readConsistency ConsistencyLevel writeConsistency ConsistencyLevel - versionCounter uint64 // global monotonic for this node (lamport-like) + versionCounter atomic.Uint64 // global monotonic for this node (lamport-like) // hinted handoff hintTTL time.Duration @@ -394,18 +394,18 @@ func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { //nol } fetchDur := time.Since(startFetch) - atomic.StoreInt64(&dm.metrics.merkleFetchNanos, fetchDur.Nanoseconds()) + dm.metrics.merkleFetchNanos.Store(fetchDur.Nanoseconds()) startBuild := time.Now() localTree := dm.BuildMerkleTree() buildDur := time.Since(startBuild) - atomic.StoreInt64(&dm.metrics.merkleBuildNanos, buildDur.Nanoseconds()) + dm.metrics.merkleBuildNanos.Store(buildDur.Nanoseconds()) entries := dm.sortedMerkleEntries() startDiff := time.Now() diffs := localTree.DiffLeafRanges(remoteTree) diffDur := time.Since(startDiff) - atomic.StoreInt64(&dm.metrics.merkleDiffNanos, diffDur.Nanoseconds()) + dm.metrics.merkleDiffNanos.Store(diffDur.Nanoseconds()) missing := dm.resolveMissingKeys(ctx, nodeID, entries) @@ -419,7 +419,7 @@ func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { //nol return nil } - atomic.AddInt64(&dm.metrics.merkleSyncs, 1) + dm.metrics.merkleSyncs.Add(1) return nil } @@ -739,7 +739,7 @@ func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { //nolin } }() - atomic.AddInt64(&dm.metrics.writeAttempts, 1) + dm.metrics.writeAttempts.Add(1) owners := dm.lookupOwners(item.Key) if len(owners) == 0 { @@ -758,17 +758,17 @@ func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { //nolin } // primary path: assign version & timestamp - item.Version = atomic.AddUint64(&dm.versionCounter, 1) + item.Version = dm.versionCounter.Add(1) item.Origin = string(dm.localNode.ID) item.LastUpdated = time.Now() dm.applySet(ctx, item, false) acks := 1 + dm.replicateTo(ctx, item, owners[1:]) - atomic.AddInt64(&dm.metrics.writeAcks, int64(acks)) + dm.metrics.writeAcks.Add(int64(acks)) needed := dm.requiredAcks(len(owners), dm.writeConsistency) if acks < needed { - atomic.AddInt64(&dm.metrics.writeQuorumFailures, 1) + dm.metrics.writeQuorumFailures.Add(1) return sentinel.ErrQuorumFailed } @@ -778,7 +778,7 @@ func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { //nolin // --- Consistency helper methods. --- -// List aggregates items (no ordering, then filters applied per interface contract not yet integrated; kept simple). +// List aggregates items (no ordering, then filters applied per interface contract not yet integrated; kept simple);. func (dm *DistMemory) List(_ context.Context, _ ...IFilter) ([]*cache.Item, error) { items := make([]*cache.Item, 0, listPrealloc) for _, s := range dm.shards { @@ -817,7 +817,7 @@ func (dm *DistMemory) Remove(ctx context.Context, keys ...string) error { //noli continue } - atomic.AddInt64(&dm.metrics.forwardRemove, 1) + dm.metrics.forwardRemove.Add(1) _ = dm.transport.ForwardRemove(ctx, string(owners[0]), key, true) //nolint:errcheck // best-effort } @@ -905,45 +905,45 @@ func (dm *DistMemory) DebugOwners(key string) []cluster.NodeID { // distMetrics holds internal counters (best-effort, not atomic snapshot consistent). type distMetrics struct { - forwardGet int64 - forwardSet int64 - forwardRemove int64 - replicaFanoutSet int64 - replicaFanoutRemove int64 - readRepair int64 - replicaGetMiss int64 - heartbeatSuccess int64 - heartbeatFailure int64 - nodesSuspect int64 // number of times a node transitioned to suspect - nodesDead int64 // number of times a node transitioned to dead/pruned - nodesRemoved int64 - versionConflicts int64 // times a newer version (or tie-broken origin) replaced previous candidate - versionTieBreaks int64 // subset of conflicts decided by origin tie-break - readPrimaryPromote int64 // times read path skipped unreachable primary and promoted next owner - hintedQueued int64 // hints queued - hintedReplayed int64 // hints successfully replayed - hintedExpired int64 // hints expired before delivery - hintedDropped int64 // hints dropped due to non-not-found transport errors - hintedGlobalDropped int64 // hints dropped due to global caps (count/bytes) - hintedBytes int64 // approximate total bytes currently queued (best-effort) - merkleSyncs int64 // merkle sync operations completed - merkleKeysPulled int64 // keys applied during sync - merkleBuildNanos int64 // last build duration (ns) - merkleDiffNanos int64 // last diff duration (ns) - merkleFetchNanos int64 // last remote fetch duration (ns) - autoSyncLoops int64 // number of auto-sync ticks executed - tombstonesActive int64 // approximate active tombstones - tombstonesPurged int64 // cumulative purged tombstones - writeQuorumFailures int64 // number of write operations that failed quorum - writeAcks int64 // cumulative replica write acks (includes primary) - writeAttempts int64 // total write operations attempted (Set) - rebalancedKeys int64 // keys migrated during rebalancing - rebalanceBatches int64 // number of batches processed - rebalanceThrottle int64 // times rebalance was throttled due to concurrency limits - rebalanceLastNanos int64 // duration of last full rebalance scan (ns) - rebalanceReplicaDiff int64 // number of keys whose value was pushed to newly added replicas (replica-only diff) - rebalanceReplicaDiffThrottle int64 // number of times replica diff scan exited early due to per-tick limit - rebalancedPrimary int64 // number of keys whose primary ownership changed (migrations to new primary) + forwardGet atomic.Int64 + forwardSet atomic.Int64 + forwardRemove atomic.Int64 + replicaFanoutSet atomic.Int64 + replicaFanoutRemove atomic.Int64 + readRepair atomic.Int64 + replicaGetMiss atomic.Int64 + heartbeatSuccess atomic.Int64 + heartbeatFailure atomic.Int64 + nodesSuspect atomic.Int64 // number of times a node transitioned to suspect + nodesDead atomic.Int64 // number of times a node transitioned to dead/pruned + nodesRemoved atomic.Int64 + versionConflicts atomic.Int64 // times a newer version (or tie-broken origin) replaced previous candidate + versionTieBreaks atomic.Int64 // subset of conflicts decided by origin tie-break + readPrimaryPromote atomic.Int64 // times read path skipped unreachable primary and promoted next owner + hintedQueued atomic.Int64 // hints queued + hintedReplayed atomic.Int64 // hints successfully replayed + hintedExpired atomic.Int64 // hints expired before delivery + hintedDropped atomic.Int64 // hints dropped due to non-not-found transport errors + hintedGlobalDropped atomic.Int64 // hints dropped due to global caps (count/bytes) + hintedBytes atomic.Int64 // approximate total bytes currently queued (best-effort) + merkleSyncs atomic.Int64 // merkle sync operations completed + merkleKeysPulled atomic.Int64 // keys applied during sync + merkleBuildNanos atomic.Int64 // last build duration (ns) + merkleDiffNanos atomic.Int64 // last diff duration (ns) + merkleFetchNanos atomic.Int64 // last remote fetch duration (ns) + autoSyncLoops atomic.Int64 // number of auto-sync ticks executed + tombstonesActive atomic.Int64 // approximate active tombstones + tombstonesPurged atomic.Int64 // cumulative purged tombstones + writeQuorumFailures atomic.Int64 // number of write operations that failed quorum + writeAcks atomic.Int64 // cumulative replica write acks (includes primary) + writeAttempts atomic.Int64 // total write operations attempted (Set) + rebalancedKeys atomic.Int64 // keys migrated during rebalancing + rebalanceBatches atomic.Int64 // number of batches processed + rebalanceThrottle atomic.Int64 // times rebalance was throttled due to concurrency limits + rebalanceLastNanos atomic.Int64 // duration of last full rebalance scan (ns) + rebalanceReplicaDiff atomic.Int64 // number of keys whose value was pushed to newly added replicas (replica-only diff) + rebalanceReplicaDiffThrottle atomic.Int64 // number of times replica diff scan exited early due to per-tick limit + rebalancedPrimary atomic.Int64 // number of keys whose primary ownership changed (migrations to new primary) } // DistMetrics snapshot. @@ -1024,47 +1024,47 @@ func (dm *DistMemory) Metrics() DistMetrics { } return DistMetrics{ - ForwardGet: atomic.LoadInt64(&dm.metrics.forwardGet), - ForwardSet: atomic.LoadInt64(&dm.metrics.forwardSet), - ForwardRemove: atomic.LoadInt64(&dm.metrics.forwardRemove), - ReplicaFanoutSet: atomic.LoadInt64(&dm.metrics.replicaFanoutSet), - ReplicaFanoutRemove: atomic.LoadInt64(&dm.metrics.replicaFanoutRemove), - ReadRepair: atomic.LoadInt64(&dm.metrics.readRepair), - ReplicaGetMiss: atomic.LoadInt64(&dm.metrics.replicaGetMiss), - HeartbeatSuccess: atomic.LoadInt64(&dm.metrics.heartbeatSuccess), - HeartbeatFailure: atomic.LoadInt64(&dm.metrics.heartbeatFailure), - NodesSuspect: atomic.LoadInt64(&dm.metrics.nodesSuspect), - NodesDead: atomic.LoadInt64(&dm.metrics.nodesDead), - NodesRemoved: atomic.LoadInt64(&dm.metrics.nodesRemoved), - VersionConflicts: atomic.LoadInt64(&dm.metrics.versionConflicts), - VersionTieBreaks: atomic.LoadInt64(&dm.metrics.versionTieBreaks), - ReadPrimaryPromote: atomic.LoadInt64(&dm.metrics.readPrimaryPromote), - HintedQueued: atomic.LoadInt64(&dm.metrics.hintedQueued), - HintedReplayed: atomic.LoadInt64(&dm.metrics.hintedReplayed), - HintedExpired: atomic.LoadInt64(&dm.metrics.hintedExpired), - HintedDropped: atomic.LoadInt64(&dm.metrics.hintedDropped), - HintedGlobalDropped: atomic.LoadInt64(&dm.metrics.hintedGlobalDropped), - HintedBytes: atomic.LoadInt64(&dm.metrics.hintedBytes), - MerkleSyncs: atomic.LoadInt64(&dm.metrics.merkleSyncs), - MerkleKeysPulled: atomic.LoadInt64(&dm.metrics.merkleKeysPulled), - MerkleBuildNanos: atomic.LoadInt64(&dm.metrics.merkleBuildNanos), - MerkleDiffNanos: atomic.LoadInt64(&dm.metrics.merkleDiffNanos), - MerkleFetchNanos: atomic.LoadInt64(&dm.metrics.merkleFetchNanos), - AutoSyncLoops: atomic.LoadInt64(&dm.metrics.autoSyncLoops), + ForwardGet: dm.metrics.forwardGet.Load(), + ForwardSet: dm.metrics.forwardSet.Load(), + ForwardRemove: dm.metrics.forwardRemove.Load(), + ReplicaFanoutSet: dm.metrics.replicaFanoutSet.Load(), + ReplicaFanoutRemove: dm.metrics.replicaFanoutRemove.Load(), + ReadRepair: dm.metrics.readRepair.Load(), + ReplicaGetMiss: dm.metrics.replicaGetMiss.Load(), + HeartbeatSuccess: dm.metrics.heartbeatSuccess.Load(), + HeartbeatFailure: dm.metrics.heartbeatFailure.Load(), + NodesSuspect: dm.metrics.nodesSuspect.Load(), + NodesDead: dm.metrics.nodesDead.Load(), + NodesRemoved: dm.metrics.nodesRemoved.Load(), + VersionConflicts: dm.metrics.versionConflicts.Load(), + VersionTieBreaks: dm.metrics.versionTieBreaks.Load(), + ReadPrimaryPromote: dm.metrics.readPrimaryPromote.Load(), + HintedQueued: dm.metrics.hintedQueued.Load(), + HintedReplayed: dm.metrics.hintedReplayed.Load(), + HintedExpired: dm.metrics.hintedExpired.Load(), + HintedDropped: dm.metrics.hintedDropped.Load(), + HintedGlobalDropped: dm.metrics.hintedGlobalDropped.Load(), + HintedBytes: dm.metrics.hintedBytes.Load(), + MerkleSyncs: dm.metrics.merkleSyncs.Load(), + MerkleKeysPulled: dm.metrics.merkleKeysPulled.Load(), + MerkleBuildNanos: dm.metrics.merkleBuildNanos.Load(), + MerkleDiffNanos: dm.metrics.merkleDiffNanos.Load(), + MerkleFetchNanos: dm.metrics.merkleFetchNanos.Load(), + AutoSyncLoops: dm.metrics.autoSyncLoops.Load(), LastAutoSyncNanos: dm.lastAutoSyncDuration.Load(), LastAutoSyncError: lastErr, - TombstonesActive: atomic.LoadInt64(&dm.metrics.tombstonesActive), - TombstonesPurged: atomic.LoadInt64(&dm.metrics.tombstonesPurged), - WriteQuorumFailures: atomic.LoadInt64(&dm.metrics.writeQuorumFailures), - WriteAcks: atomic.LoadInt64(&dm.metrics.writeAcks), - WriteAttempts: atomic.LoadInt64(&dm.metrics.writeAttempts), - RebalancedKeys: atomic.LoadInt64(&dm.metrics.rebalancedKeys), - RebalanceBatches: atomic.LoadInt64(&dm.metrics.rebalanceBatches), - RebalanceThrottle: atomic.LoadInt64(&dm.metrics.rebalanceThrottle), - RebalanceLastNanos: atomic.LoadInt64(&dm.metrics.rebalanceLastNanos), - RebalancedReplicaDiff: atomic.LoadInt64(&dm.metrics.rebalanceReplicaDiff), - RebalanceReplicaDiffThrottle: atomic.LoadInt64(&dm.metrics.rebalanceReplicaDiffThrottle), - RebalancedPrimary: atomic.LoadInt64(&dm.metrics.rebalancedPrimary), + TombstonesActive: dm.metrics.tombstonesActive.Load(), + TombstonesPurged: dm.metrics.tombstonesPurged.Load(), + WriteQuorumFailures: dm.metrics.writeQuorumFailures.Load(), + WriteAcks: dm.metrics.writeAcks.Load(), + WriteAttempts: dm.metrics.writeAttempts.Load(), + RebalancedKeys: dm.metrics.rebalancedKeys.Load(), + RebalanceBatches: dm.metrics.rebalanceBatches.Load(), + RebalanceThrottle: dm.metrics.rebalanceThrottle.Load(), + RebalanceLastNanos: dm.metrics.rebalanceLastNanos.Load(), + RebalancedReplicaDiff: dm.metrics.rebalanceReplicaDiff.Load(), + RebalanceReplicaDiffThrottle: dm.metrics.rebalanceReplicaDiffThrottle.Load(), + RebalancedPrimary: dm.metrics.rebalancedPrimary.Load(), MembershipVersion: mv, MembersAlive: alive, MembersSuspect: suspect, @@ -1224,7 +1224,7 @@ func (dm *DistMemory) resolveMissingKeys(ctx context.Context, nodeID string, ent // track number of remote-only keys discovered via fallback if len(mset) > 0 { - atomic.AddInt64(&dm.metrics.merkleKeysPulled, int64(len(mset))) + dm.metrics.merkleKeysPulled.Add(int64(len(mset))) } return mset @@ -1304,7 +1304,7 @@ func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { nextVer := cur.Version + 1 sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} - atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + dm.metrics.tombstonesActive.Store(dm.countTombstones()) } return @@ -1318,12 +1318,12 @@ func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { // remote has newer version; clear tombstone (key resurrected intentionally) delete(sh.tombs, key) - atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + dm.metrics.tombstonesActive.Store(dm.countTombstones()) } if cur, okLocal := sh.items.Get(key); !okLocal || it.Version > cur.Version { dm.applySet(ctx, it, false) - atomic.AddInt64(&dm.metrics.merkleKeysPulled, 1) + dm.metrics.merkleKeysPulled.Add(1) } } @@ -1367,10 +1367,10 @@ func (dm *DistMemory) startTombstoneSweeper() { //nolint:ireturn case <-ticker.C: purged := dm.compactTombstones() if purged > 0 { - atomic.AddInt64(&dm.metrics.tombstonesPurged, purged) + dm.metrics.tombstonesPurged.Add(purged) } - atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + dm.metrics.tombstonesActive.Store(dm.countTombstones()) case <-stopCh: return @@ -1478,7 +1478,7 @@ func (dm *DistMemory) runRebalanceTick(ctx context.Context) { //nolint:ireturn // Perform shedding cleanup after replica diff (delete local copies after grace once we are no longer owner). dm.shedRemovedKeys() - atomic.StoreInt64(&dm.metrics.rebalanceLastNanos, time.Since(start).Nanoseconds()) + dm.metrics.rebalanceLastNanos.Store(time.Since(start).Nanoseconds()) dm.lastRebalanceVersion.Store(mv) } @@ -1658,13 +1658,13 @@ func (dm *DistMemory) sendReplicaDiff( } _ = dm.transport.ForwardSet(ctx, string(rid), it, false) //nolint:errcheck - atomic.AddInt64(&dm.metrics.replicaFanoutSet, 1) - atomic.AddInt64(&dm.metrics.rebalancedKeys, 1) - atomic.AddInt64(&dm.metrics.rebalanceReplicaDiff, 1) + dm.metrics.replicaFanoutSet.Add(1) + dm.metrics.rebalancedKeys.Add(1) + dm.metrics.rebalanceReplicaDiff.Add(1) processed++ if limit > 0 && processed >= limit { - atomic.AddInt64(&dm.metrics.rebalanceReplicaDiffThrottle, 1) + dm.metrics.rebalanceReplicaDiffThrottle.Add(1) return processed } @@ -1724,7 +1724,7 @@ func (dm *DistMemory) migrateItems(ctx context.Context, items []cache.Item) { case sem <- struct{}{}: default: // saturated; record throttle and then block - atomic.AddInt64(&dm.metrics.rebalanceThrottle, 1) + dm.metrics.rebalanceThrottle.Add(1) sem <- struct{}{} } @@ -1733,7 +1733,7 @@ func (dm *DistMemory) migrateItems(ctx context.Context, items []cache.Item) { wg.Go(func() { defer func() { <-sem }() - atomic.AddInt64(&dm.metrics.rebalanceBatches, 1) + dm.metrics.rebalanceBatches.Add(1) for i := range batchItems { itm := batchItems[i] // value copy @@ -1757,8 +1757,8 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { / } // increment metrics once per attempt (ownership changed). Success is best-effort. - atomic.AddInt64(&dm.metrics.rebalancedKeys, 1) - atomic.AddInt64(&dm.metrics.rebalancedPrimary, 1) + dm.metrics.rebalancedKeys.Add(1) + dm.metrics.rebalancedPrimary.Add(1) _ = dm.transport.ForwardSet(ctx, string(owners[0]), item, true) //nolint:errcheck // best-effort @@ -1991,7 +1991,7 @@ func (dm *DistMemory) tryLocalGet(key string, idx int, oid cluster.NodeID) (*cac if it, ok := dm.shardFor(key).items.GetCopy(key); ok { if idx > 0 { // promotion - atomic.AddInt64(&dm.metrics.readPrimaryPromote, 1) + dm.metrics.readPrimaryPromote.Add(1) } return it, true @@ -2006,12 +2006,12 @@ func (dm *DistMemory) tryRemoteGet(ctx context.Context, key string, idx int, oid return nil, false } - atomic.AddInt64(&dm.metrics.forwardGet, 1) + dm.metrics.forwardGet.Add(1) it, ok, err := dm.transport.ForwardGet(ctx, string(oid), key) if errors.Is(err, sentinel.ErrBackendNotFound) { // owner unreachable -> promotion scenario if idx == 0 { // primary missing - atomic.AddInt64(&dm.metrics.readPrimaryPromote, 1) + dm.metrics.readPrimaryPromote.Add(1) } return nil, false @@ -2022,7 +2022,7 @@ func (dm *DistMemory) tryRemoteGet(ctx context.Context, key string, idx int, oid } if idx > 0 { // promotion occurred - atomic.AddInt64(&dm.metrics.readPrimaryPromote, 1) + dm.metrics.readPrimaryPromote.Add(1) } // read repair: if we're an owner but local missing, replicate @@ -2030,7 +2030,7 @@ func (dm *DistMemory) tryRemoteGet(ctx context.Context, key string, idx int, oid if _, ok2 := dm.shardFor(key).items.Get(key); !ok2 { cloned := *it dm.applySet(ctx, &cloned, false) - atomic.AddInt64(&dm.metrics.readRepair, 1) + dm.metrics.readRepair.Add(1) } } @@ -2112,7 +2112,7 @@ func (dm *DistMemory) repairStaleOwners( if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { _ = dm.transport.ForwardSet(ctx, string(oid), chosen, false) //nolint:errcheck - atomic.AddInt64(&dm.metrics.readRepair, 1) + dm.metrics.readRepair.Add(1) } } } @@ -2130,7 +2130,7 @@ func (dm *DistMemory) fetchOwner(ctx context.Context, key string, idx int, oid c it, ok, err := dm.transport.ForwardGet(ctx, string(oid), key) if errors.Is(err, sentinel.ErrBackendNotFound) { // promotion if idx == 0 { - atomic.AddInt64(&dm.metrics.readPrimaryPromote, 1) + dm.metrics.readPrimaryPromote.Add(1) } return nil, false @@ -2141,7 +2141,7 @@ func (dm *DistMemory) fetchOwner(ctx context.Context, key string, idx int, oid c } if idx > 0 { // earlier owner skipped - atomic.AddInt64(&dm.metrics.readPrimaryPromote, 1) + dm.metrics.readPrimaryPromote.Add(1) } return it, true @@ -2266,7 +2266,7 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co if (dm.hintMaxTotal > 0 && dm.hintTotal >= dm.hintMaxTotal) || (dm.hintMaxBytes > 0 && dm.hintBytes+size > dm.hintMaxBytes) { dm.hintsMu.Unlock() - atomic.AddInt64(&dm.metrics.hintedGlobalDropped, 1) + dm.metrics.hintedGlobalDropped.Add(1) return } @@ -2278,8 +2278,8 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co dm.adjustHintAccounting(1, size) dm.hintsMu.Unlock() - atomic.AddInt64(&dm.metrics.hintedQueued, 1) - atomic.StoreInt64(&dm.metrics.hintedBytes, dm.hintBytes) + dm.metrics.hintedQueued.Add(1) + dm.metrics.hintedBytes.Store(dm.hintBytes) } // approxHintSize estimates the size of a hinted item for global caps. @@ -2382,21 +2382,21 @@ func (dm *DistMemory) replayHints(ctx context.Context) { // reduced cognitive co } } - atomic.StoreInt64(&dm.metrics.hintedBytes, dm.hintBytes) + dm.metrics.hintedBytes.Store(dm.hintBytes) dm.hintsMu.Unlock() } // processHint returns 0=keep,1=remove. func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hintedEntry, now time.Time) int { if now.After(entry.expire) { - atomic.AddInt64(&dm.metrics.hintedExpired, 1) + dm.metrics.hintedExpired.Add(1) return 1 } err := dm.transport.ForwardSet(ctx, nodeID, entry.item, false) if err == nil { - atomic.AddInt64(&dm.metrics.hintedReplayed, 1) + dm.metrics.hintedReplayed.Add(1) return 1 } @@ -2405,7 +2405,7 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint return 0 } - atomic.AddInt64(&dm.metrics.hintedDropped, 1) + dm.metrics.hintedDropped.Add(1) return 1 } @@ -2489,7 +2489,7 @@ func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { //nolint:ireturn dm.lastAutoSyncError.Store("") } - atomic.AddInt64(&dm.metrics.autoSyncLoops, 1) + dm.metrics.autoSyncLoops.Add(1) } func (dm *DistMemory) gossipLoop(stopCh <-chan struct{}) { //nolint:ireturn @@ -2603,28 +2603,28 @@ func (dm *DistMemory) chooseNewer(itemA, itemB *cache.Item) *cache.Item { //noli } if itemB.Version > itemA.Version { // itemB newer - atomic.AddInt64(&dm.metrics.versionConflicts, 1) + dm.metrics.versionConflicts.Add(1) return itemB } if itemA.Version > itemB.Version { // itemA newer - atomic.AddInt64(&dm.metrics.versionConflicts, 1) + dm.metrics.versionConflicts.Add(1) return itemA } // versions equal: tie-break on origin if itemB.Origin < itemA.Origin { // itemB wins by tie-break - atomic.AddInt64(&dm.metrics.versionConflicts, 1) - atomic.AddInt64(&dm.metrics.versionTieBreaks, 1) + dm.metrics.versionConflicts.Add(1) + dm.metrics.versionTieBreaks.Add(1) return itemB } if itemA.Origin < itemB.Origin { // itemA wins by tie-break (still counts) - atomic.AddInt64(&dm.metrics.versionConflicts, 1) - atomic.AddInt64(&dm.metrics.versionTieBreaks, 1) + dm.metrics.versionConflicts.Add(1) + dm.metrics.versionTieBreaks.Add(1) } return itemA @@ -2653,7 +2653,7 @@ func (dm *DistMemory) repairLocalReplica(ctx context.Context, key string, chosen if !ok || localIt.Version < chosen.Version || (localIt.Version == chosen.Version && localIt.Origin > chosen.Origin) { cloned := *chosen dm.applySet(ctx, &cloned, false) - atomic.AddInt64(&dm.metrics.readRepair, 1) + dm.metrics.readRepair.Add(1) } } @@ -2671,7 +2671,7 @@ func (dm *DistMemory) repairRemoteReplica( it, ok, _ := dm.transport.ForwardGet(ctx, string(oid), key) //nolint:errcheck if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { // stale _ = dm.transport.ForwardSet(ctx, string(oid), chosen, false) //nolint:errcheck - atomic.AddInt64(&dm.metrics.readRepair, 1) + dm.metrics.readRepair.Add(1) } } @@ -2681,7 +2681,7 @@ func (dm *DistMemory) handleForwardPrimary(ctx context.Context, owners []cluster return false, sentinel.ErrNotOwner } - atomic.AddInt64(&dm.metrics.forwardSet, 1) + dm.metrics.forwardSet.Add(1) errFwd := dm.transport.ForwardSet(ctx, string(owners[0]), item, true) switch { @@ -2802,7 +2802,7 @@ func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate return } - atomic.AddInt64(&dm.metrics.replicaFanoutSet, int64(len(owners)-1)) + dm.metrics.replicaFanoutSet.Add(int64(len(owners) - 1)) for _, oid := range owners[1:] { // skip primary if oid == dm.localNode.ID { @@ -2874,7 +2874,7 @@ func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate boo sh.items.Remove(key) sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} - atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + dm.metrics.tombstonesActive.Store(dm.countTombstones()) if !replicate || dm.ring == nil || dm.transport == nil { return @@ -2885,7 +2885,7 @@ func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate boo return } - atomic.AddInt64(&dm.metrics.replicaFanoutRemove, int64(len(owners)-1)) + dm.metrics.replicaFanoutRemove.Add(int64(len(owners) - 1)) for _, oid := range owners[1:] { if oid == dm.localNode.ID { @@ -2941,8 +2941,8 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node if dm.hbDeadAfter > 0 && elapsed > dm.hbDeadAfter { // prune dead if dm.membership.Remove(node.ID) { - atomic.AddInt64(&dm.metrics.nodesRemoved, 1) - atomic.AddInt64(&dm.metrics.nodesDead, 1) + dm.metrics.nodesRemoved.Add(1) + dm.metrics.nodesDead.Add(1) } return @@ -2950,7 +2950,7 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node if dm.hbSuspectAfter > 0 && elapsed > dm.hbSuspectAfter && node.State == cluster.NodeAlive { // suspect dm.membership.Mark(node.ID, cluster.NodeSuspect) - atomic.AddInt64(&dm.metrics.nodesSuspect, 1) + dm.metrics.nodesSuspect.Add(1) } ctxHealth, cancel := context.WithTimeout(ctx, dm.hbInterval/2) @@ -2959,17 +2959,17 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node cancel() if err != nil { - atomic.AddInt64(&dm.metrics.heartbeatFailure, 1) + dm.metrics.heartbeatFailure.Add(1) if node.State == cluster.NodeAlive { // escalate dm.membership.Mark(node.ID, cluster.NodeSuspect) - atomic.AddInt64(&dm.metrics.nodesSuspect, 1) + dm.metrics.nodesSuspect.Add(1) } return } - atomic.AddInt64(&dm.metrics.heartbeatSuccess, 1) + dm.metrics.heartbeatSuccess.Add(1) // Mark alive (refresh LastSeen, clear suspicion) dm.membership.Mark(node.ID, cluster.NodeAlive) }