From 342155e1ca0c376b3845711411317cf869f6d835 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Mon, 1 Sep 2025 11:52:45 -0400 Subject: [PATCH 01/15] RPS improvements v0 --- cmd/noop_logger.go | 191 ++++++ protocol/shannon/endpoint.go | 8 +- protocol/shannon/full_node.go | 5 +- protocol/shannon/fullnode_cache.go | 554 +++++++++++------- protocol/shannon/fullnode_lazy.go | 32 +- protocol/shannon/fullnode_session_rollover.go | 2 +- protocol/shannon/gateway_mode.go | 3 +- protocol/shannon/mode_centralized.go | 8 +- protocol/shannon/mode_delegated.go | 7 +- protocol/shannon/observation.go | 4 +- protocol/shannon/protocol.go | 13 +- protocol/shannon/session.go | 17 +- 12 files changed, 576 insertions(+), 268 deletions(-) create mode 100644 cmd/noop_logger.go diff --git a/cmd/noop_logger.go b/cmd/noop_logger.go new file mode 100644 index 000000000..7480c7321 --- /dev/null +++ b/cmd/noop_logger.go @@ -0,0 +1,191 @@ +package cmd + +import ( + "context" + "time" + + "github.com/pokt-network/poktroll/pkg/polylog" +) + +// noOpLogger is a Logger implementation that performs no operations. +// All methods return no-op implementations or the receiver itself. +type noOpLogger struct{} + +// NewNoOpLogger creates a new no-operation logger. +func NewNoOpLogger() polylog.Logger { + return &noOpLogger{} +} + +// Debug returns a no-op Event. +func (n *noOpLogger) Debug() polylog.Event { + return &noOpEvent{} +} + +// ProbabilisticDebugInfo returns a no-op Event, ignoring the probability. +func (n *noOpLogger) ProbabilisticDebugInfo(float64) polylog.Event { + return &noOpEvent{} +} + +// Info returns a no-op Event. +func (n *noOpLogger) Info() polylog.Event { + return &noOpEvent{} +} + +// Warn returns a no-op Event. +func (n *noOpLogger) Warn() polylog.Event { + return &noOpEvent{} +} + +// Error returns a no-op Event. +func (n *noOpLogger) Error() polylog.Event { + return &noOpEvent{} +} + +// With returns the same no-op logger, ignoring the key-value pairs. +func (n *noOpLogger) With(keyVals ...any) polylog.Logger { + return n +} + +// WithContext returns the context unchanged. +func (n *noOpLogger) WithContext(ctx context.Context) context.Context { + return ctx +} + +// WithLevel returns a no-op Event, ignoring the level. +func (n *noOpLogger) WithLevel(level polylog.Level) polylog.Event { + return &noOpEvent{} +} + +// Write implements io.Writer by doing nothing and returning the length of p. +func (n *noOpLogger) Write(p []byte) (n int, err error) { + return len(p), nil +} + +// noOpEvent is an Event implementation that performs no operations. +// All methods return the receiver itself or appropriate default values. +type noOpEvent struct{} + +// Str returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Str(key, value string) polylog.Event { + return e +} + +// Bool returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Bool(key string, value bool) polylog.Event { + return e +} + +// Int returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Int(key string, value int) polylog.Event { + return e +} + +// Int8 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Int8(key string, value int8) polylog.Event { + return e +} + +// Int16 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Int16(key string, value int16) polylog.Event { + return e +} + +// Int32 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Int32(key string, value int32) polylog.Event { + return e +} + +// Int64 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Int64(key string, value int64) polylog.Event { + return e +} + +// Uint returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Uint(key string, value uint) polylog.Event { + return e +} + +// Uint8 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Uint8(key string, value uint8) polylog.Event { + return e +} + +// Uint16 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Uint16(key string, value uint16) polylog.Event { + return e +} + +// Uint32 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Uint32(key string, value uint32) polylog.Event { + return e +} + +// Uint64 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Uint64(key string, value uint64) polylog.Event { + return e +} + +// Float32 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Float32(key string, value float32) polylog.Event { + return e +} + +// Float64 returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Float64(key string, value float64) polylog.Event { + return e +} + +// Err returns the receiver, ignoring the error. +func (e *noOpEvent) Err(err error) polylog.Event { + return e +} + +// Timestamp returns the receiver, ignoring the timestamp. +func (e *noOpEvent) Timestamp() polylog.Event { + return e +} + +// Time returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Time(key string, value time.Time) polylog.Event { + return e +} + +// Dur returns the receiver, ignoring the key-value pair. +func (e *noOpEvent) Dur(key string, value time.Duration) polylog.Event { + return e +} + +// Fields returns the receiver, ignoring the fields. +func (e *noOpEvent) Fields(fields any) polylog.Event { + return e +} + +// Func returns the receiver, ignoring the function. +func (e *noOpEvent) Func(func(polylog.Event)) polylog.Event { + return e +} + +// Enabled always returns false since this is a no-op implementation. +func (e *noOpEvent) Enabled() bool { + return false +} + +// Discard returns the receiver. +func (e *noOpEvent) Discard() polylog.Event { + return e +} + +// Msg does nothing. +func (e *noOpEvent) Msg(message string) { + // no-op +} + +// Msgf does nothing. +func (e *noOpEvent) Msgf(format string, keyVals ...interface{}) { + // no-op +} + +// Send does nothing. +func (e *noOpEvent) Send() { + // no-op +} \ No newline at end of file diff --git a/protocol/shannon/endpoint.go b/protocol/shannon/endpoint.go index 6bda8f74a..be5adca61 100644 --- a/protocol/shannon/endpoint.go +++ b/protocol/shannon/endpoint.go @@ -130,7 +130,7 @@ type protocolEndpoint struct { // the first app will be chosen. A randomization among the apps in this (unlikely) scenario // may be needed. // session is the active session corresponding to the app, of which the endpoint is a member. - session sessiontypes.Session + session *sessiontypes.Session } // IsFallback returns false for protocol endpoints. @@ -165,7 +165,7 @@ func (e protocolEndpoint) WebsocketURL() (string, error) { // Session returns a pointer to the session associated with the endpoint. func (e protocolEndpoint) Session() *sessiontypes.Session { - return &e.session + return e.session } // Supplier returns the supplier address of the endpoint. @@ -176,9 +176,9 @@ func (e protocolEndpoint) Supplier() string { // endpointsFromSession returns the list of all endpoints from a Shannon session. // It returns a map for efficient lookup, as the main/only consumer of this function uses // the return value for selecting an endpoint for sending a relay. -func endpointsFromSession(session sessiontypes.Session) (map[protocol.EndpointAddr]endpoint, error) { +func endpointsFromSession(session *sessiontypes.Session) (map[protocol.EndpointAddr]endpoint, error) { sf := sdk.SessionFilter{ - Session: &session, + Session: session, } // AllEndpoints will return a map of supplier address to a list of supplier endpoints. diff --git a/protocol/shannon/full_node.go b/protocol/shannon/full_node.go index e89f9ac50..913e2a6a2 100644 --- a/protocol/shannon/full_node.go +++ b/protocol/shannon/full_node.go @@ -5,7 +5,6 @@ import ( apptypes "github.com/pokt-network/poktroll/x/application/types" servicetypes "github.com/pokt-network/poktroll/x/service/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" sdk "github.com/pokt-network/shannon-sdk" @@ -27,7 +26,7 @@ type FullNode interface { // GetSession returns the latest session matching the supplied service+app combination. // Sessions are solely used for sending relays, and therefore only the latest session for any service+app combination is needed. // Note: Shannon returns the latest session for a service+app combination if no blockHeight is provided. - GetSession(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error) + GetSession(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (hydratedSession, error) // GetSessionWithExtendedValidity implements session retrieval with support for // Pocket Network's native "session grace period" business logic. @@ -52,7 +51,7 @@ type FullNode interface { // - https://dev.poktroll.com/protocol/governance/gov_params // - https://dev.poktroll.com/protocol/primitives/claim_and_proof_lifecycle // If within grace period of a session rollover, it may return the previous session. - GetSessionWithExtendedValidity(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error) + GetSessionWithExtendedValidity(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (hydratedSession, error) // GetSharedParams returns the shared module parameters from the blockchain. GetSharedParams(ctx context.Context) (*sharedtypes.Params, error) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index 0a8262930..c8232a280 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -3,6 +3,8 @@ package shannon import ( "context" "fmt" + "strings" + "sync" "time" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" @@ -19,54 +21,18 @@ import ( // ---------------- Cache Configuration ---------------- const ( - // Retry base delay for exponential backoff on failed refreshes - retryBaseDelay = 100 * time.Millisecond - - // cacheCapacity: - // - Max entries across all shards (not per-shard) - // - Exceeding capacity triggers LRU eviction per shard - // - 100k supports most large deployments - // - TODO_TECHDEBT(@commoddity): Revisit based on real-world usage; consider making configurable - cacheCapacity = 100_000 - - // numShards: - // - Number of independent cache shards for concurrency - // - Reduces lock contention, improves parallelism - // - 10 is a good balance for most workloads - numShards = 10 - - // evictionPercentage: - // - % of LRU entries evicted per shard when full - // - 10% = incremental cleanup, avoids memory spikes - // - SturdyC also evicts expired entries in background - evictionPercentage = 10 - - // TODO_TECHDEBT(@commoddity): See Issue #291 for improvements to refresh logic - // minEarlyRefreshPercentage: - // - Earliest point (as % of TTL) to start background refresh - // - 0.75 = 75% of TTL (e.g. 22.5s for 30s TTL) - minEarlyRefreshPercentage = 0.75 - - // maxEarlyRefreshPercentage: - // - Latest point (as % of TTL) to start background refresh - // - 0.9 = 90% of TTL (e.g. 27s for 30s TTL) - // - Ensures refresh always completes before expiry - maxEarlyRefreshPercentage = 0.9 - - // Cache key prefixes to avoid collisions between different data types. - sessionCacheKeyPrefix = "session" - // TODO_IMPROVE: Make this configurable sharedParamsCacheKey = "shared_params" // TODO_IMPROVE: Make this configurable - sharedParamsCacheTTL = 2 * time.Minute // Shared params change infrequently - sharedParamsCacheCapacity = 3 // Only need to cache the last couple of shared params at any point in time + sharedParamsCacheTTL = 2 * time.Minute // Shared params change infrequently // TODO_IMPROVE: Make this configurable blockHeightCacheKey = "block_height" // TODO_IMPROVE: Make this configurable - blockHeightCacheTTL = 15 * time.Second // Block height changes frequently - blockHeightCacheCapacity = 5 // Only need to cache the last few blocks at any point in time + blockHeightCacheTTL = 15 * time.Second // Block height changes frequently + + // Cache key prefixes to avoid collisions between different data types. + sessionCacheKeyPrefix = "session" // TODO_IMPROVE: Make this configurable // - Grace period scale down factor forces the gateway to respect a smaller @@ -76,57 +42,74 @@ const ( gracePeriodScaleDownFactor = 0.8 ) -// getCacheDelays returns the min/max delays for SturdyC's Early Refresh strategy. -// - Proactively refreshes cache before expiry (prevents misses/latency spikes) -// - Refresh window: 75-90% of TTL (e.g. 22.5-27s for 30s TTL) -// - Spreads requests to avoid thundering herd -// See: https://github.com/viccon/sturdyc?tab=readme-ov-file#early-refreshes -func getCacheDelays(ttl time.Duration) (min, max time.Duration) { - minFloat := float64(ttl) * minEarlyRefreshPercentage - maxFloat := float64(ttl) * maxEarlyRefreshPercentage - - // Round to the nearest second - min = time.Duration(minFloat/float64(time.Second)+0.5) * time.Second - max = time.Duration(maxFloat/float64(time.Second)+0.5) * time.Second - return +// hydratedSession contains a session along with pre-computed endpoints +// to avoid repeatedly calling endpointsFromSession for the same session +type hydratedSession struct { + session *sessiontypes.Session + endpoints map[protocol.EndpointAddr]endpoint +} + +// blockHeightCache represents a simple cache for block heights +type blockHeightCache struct { + mu sync.RWMutex + height int64 + lastUpdated time.Time +} + +// sessionCache represents a simple cache for hydrated sessions +type sessionCache struct { + mu sync.RWMutex + sessions map[string]sessionCacheEntry +} + +type sessionCacheEntry struct { + hydratedSession hydratedSession + lastUpdated time.Time +} + +// sharedParamsCache represents a simple cache for shared parameters +type sharedParamsCache struct { + mu sync.RWMutex + params *sharedtypes.Params + lastUpdated time.Time } var _ FullNode = &cachingFullNode{} -// cachingFullNode wraps a LazyFullNode with SturdyC-based caching. -// - Early refresh: background updates before expiry (prevents thundering herd/latency spikes) -// - Example: 30s TTL, refresh at 22.5–27s (75–90%) -// - Benefits: zero-latency reads, graceful degradation, auto load balancing -// Docs: https://github.com/viccon/sturdyc +// cachingFullNode wraps a LazyFullNode with simple map-based caching. +// Background goroutines periodically refresh cached data to ensure freshness. type cachingFullNode struct { logger polylog.Logger // Underlying node for protocol data fetches lazyFullNode *LazyFullNode - // Session cache - // TODO_MAINNET_MIGRATION(@Olshansk): Revisit after mainnet - sessionCache *sturdyc.Client[sessiontypes.Session] + // Simple caches with RWMutex for thread safety + blockCache *blockHeightCache + sessionsCache *sessionCache + sharedCache *sharedParamsCache + cacheConfig CacheConfig - // Shared params cache - sharedParamsCache *sturdyc.Client[*sharedtypes.Params] + // Account client wrapped with cache (keeping original implementation) + cachingAccountClient *sdk.AccountClient - // Block height cache - blockHeightCache *sturdyc.Client[int64] + // Context and cancel function for background goroutines + ctx context.Context + cancel context.CancelFunc - // Account client wrapped with SturdyC cache - cachingAccountClient *sdk.AccountClient + // ownedApps is the list of apps owned by the gateway operator + // This is used to prefetch and cache all sessions. + // Necessary to avoid individual requests getting stuck waiting for a session fetch. + ownedApps map[protocol.ServiceID][]string } -// NewCachingFullNode wraps a LazyFullNode with: -// - Session cache: caches sessions, refreshes early -// - Account cache: indefinite cache for account data -// -// Both use early refresh to avoid thundering herd/latency spikes. +// NewCachingFullNode wraps a LazyFullNode with simple map-based caches +// and starts background goroutines for periodic cache updates. func NewCachingFullNode( logger polylog.Logger, lazyFullNode *LazyFullNode, cacheConfig CacheConfig, + gatewayConfig GatewayConfig, ) (*cachingFullNode, error) { // Set default session TTL if not set cacheConfig.hydrateDefaults() @@ -136,87 +119,241 @@ func NewCachingFullNode( Str("cache_config_session_ttl", cacheConfig.SessionTTL.String()). Msgf("cachingFullNode - Cache Configuration") - // Create the session cache with early refreshes - sessionMinRefreshDelay, sessionMaxRefreshDelay := getCacheDelays(cacheConfig.SessionTTL) - sessionCache := sturdyc.New[sessiontypes.Session]( - cacheCapacity, - numShards, - cacheConfig.SessionTTL, - evictionPercentage, - // See: https://github.com/viccon/sturdyc?tab=readme-ov-file#early-refreshes - sturdyc.WithEarlyRefreshes( - sessionMinRefreshDelay, - sessionMaxRefreshDelay, - cacheConfig.SessionTTL, - retryBaseDelay, - ), - ) + // TODO_TECHDEBT(@adshmh): refactor to remove duplicate owned apps processing at startup. + // + // Retrieve the list of apps owned by the gateway. + ownedApps, err := getOwnedApps(logger, gatewayConfig.OwnedAppsPrivateKeysHex, lazyFullNode) + if err != nil { + return nil, fmt.Errorf("failed to get app addresses from config: %w", err) + } - // Account cache: infinite for app lifetime; no early refresh needed. + // Account cache: keeping original sturdyc implementation for account data accountCache := sturdyc.New[*accounttypes.QueryAccountResponse]( accountCacheCapacity, - numShards, + 10, // numShards accountCacheTTL, - evictionPercentage, + 10, // evictionPercentage ) - // Shared params cache - sharedParamsMinRefreshDelay, sharedParamsMaxRefreshDelay := getCacheDelays(sharedParamsCacheTTL) - sharedParamsCache := sturdyc.New[*sharedtypes.Params]( - sharedParamsCacheCapacity, - 1, - sharedParamsCacheTTL, - evictionPercentage, - sturdyc.WithEarlyRefreshes( - sharedParamsMinRefreshDelay, - sharedParamsMaxRefreshDelay, - sharedParamsCacheTTL, - retryBaseDelay, - ), - ) - - // Block height cache - blockHeightMinRefreshDelay, blockHeightMaxRefreshDelay := getCacheDelays(blockHeightCacheTTL) - blockHeightCache := sturdyc.New[int64]( - blockHeightCacheCapacity, - 1, - blockHeightCacheTTL, - evictionPercentage, - sturdyc.WithEarlyRefreshes( - blockHeightMinRefreshDelay, - blockHeightMaxRefreshDelay, - blockHeightCacheTTL, - retryBaseDelay, - ), - ) - - // Initialize the caching full node with the modified lazy full node - return &cachingFullNode{ - logger: logger, - lazyFullNode: lazyFullNode, - sessionCache: sessionCache, - sharedParamsCache: sharedParamsCache, - blockHeightCache: blockHeightCache, + // Create context for background goroutines + ctx, cancel := context.WithCancel(context.Background()) + + // Initialize the caching full node + cfn := &cachingFullNode{ + logger: logger, + lazyFullNode: lazyFullNode, + cacheConfig: cacheConfig, + ctx: ctx, + cancel: cancel, + blockCache: &blockHeightCache{ + height: 0, + }, + sessionsCache: &sessionCache{ + sessions: make(map[string]sessionCacheEntry), + }, + sharedCache: &sharedParamsCache{ + params: nil, + }, // Wrap the underlying account fetcher with a SturdyC caching layer. cachingAccountClient: getCachingAccountClient( logger, accountCache, lazyFullNode.accountClient, ), + + ownedApps: ownedApps, + } + + // Start background cache update goroutines + cfn.startCacheUpdateRoutines() + + return cfn, nil +} + +// createHydratedSession creates a hydratedSession from a session by computing its endpoints +func createHydratedSession(session *sessiontypes.Session) (hydratedSession, error) { + endpoints, err := endpointsFromSession(session) + if err != nil { + return hydratedSession{}, fmt.Errorf("failed to create endpoints from session: %w", err) + } + + return hydratedSession{ + session: session, + endpoints: endpoints, }, nil } +// startCacheUpdateRoutines starts background goroutines to periodically update caches +func (cfn *cachingFullNode) startCacheUpdateRoutines() { + // Start block height cache update routine + go cfn.updateBlockHeightCache() + + // Start shared params cache update routine + go cfn.updateSharedParamsCache() + + // Start session cache update routine + go cfn.updateSessionCache() +} + +// updateBlockHeightCache periodically updates the block height cache +func (cfn *cachingFullNode) updateBlockHeightCache() { + ticker := time.NewTicker(blockHeightCacheTTL) + defer ticker.Stop() + + var updatedOnce bool + for { + if !updatedOnce { + if err := cfn.fetchAndUpdateBlockHeightCache(); err == nil { + updatedOnce = true + } + time.Sleep(1 * time.Second) + continue + } + + select { + case <-cfn.ctx.Done(): + return + case <-ticker.C: + if err := cfn.fetchAndUpdateBlockHeightCache(); err != nil { + cfn.logger.Error().Err(err).Msg("Failed to update block height cache") + } + } + } +} + +func (cfn *cachingFullNode) fetchAndUpdateBlockHeightCache() error { + height, err := cfn.lazyFullNode.GetCurrentBlockHeight(cfn.ctx) + if err != nil { + return err + } + + cfn.blockCache.mu.Lock() + cfn.blockCache.height = height + cfn.blockCache.lastUpdated = time.Now() + cfn.blockCache.mu.Unlock() + + cfn.logger.Debug().Int64("height", height).Msg("Updated block height cache") + return nil +} + +// updateSharedParamsCache periodically updates the shared params cache +func (cfn *cachingFullNode) updateSharedParamsCache() { + ticker := time.NewTicker(sharedParamsCacheTTL) + defer ticker.Stop() + + for { + select { + case <-cfn.ctx.Done(): + return + case <-ticker.C: + params, err := cfn.lazyFullNode.GetSharedParams(cfn.ctx) + if err != nil { + cfn.logger.Error().Err(err).Msg("Failed to update shared params cache") + continue + } + + cfn.sharedCache.mu.Lock() + cfn.sharedCache.params = params + cfn.sharedCache.lastUpdated = time.Now() + cfn.sharedCache.mu.Unlock() + + cfn.logger.Debug().Msg("Updated shared params cache") + } + } +} + +// updateSessionCache periodically updates the session cache for active sessions +func (cfn *cachingFullNode) updateSessionCache() { + ticker := time.NewTicker(cfn.cacheConfig.SessionTTL) + defer ticker.Stop() + + var updatedOnce bool + for { + if !updatedOnce { + if allSessions, err := cfn.fetchAllSessions(); err == nil { + cfn.sessionsCache.mu.Lock() + cfn.sessionsCache.sessions = allSessions + cfn.sessionsCache.mu.Unlock() + updatedOnce = true + } + + time.Sleep(1 * time.Second) + continue + } + + select { + case <-cfn.ctx.Done(): + return + case <-ticker.C: + + updatedSessions, err := cfn.fetchAllSessions() + if err != nil { + cfn.logger.Error().Err(err).Msg("Failed to get updated sessions. Skipping session cache update") + continue + } + + // Update existing sessions in cache + cfn.sessionsCache.mu.Lock() + cfn.sessionsCache.sessions = updatedSessions + cfn.sessionsCache.mu.Unlock() + } + } +} + +func (cfn *cachingFullNode) fetchAllSessions() (map[string]sessionCacheEntry, error) { + // Get current block height for session updates + height, err := cfn.GetCurrentBlockHeight(cfn.ctx) + if err != nil { + return nil, err + } + + // Initialize updated sessions + updatedSessions := make(map[string]sessionCacheEntry) + + // TODO_UPNEXT(@adshmh): Fetch sessions concurrently. + // + // Iterate over owned apps + for serviceID, appsAddrs := range cfn.ownedApps { + for _, appAddr := range appsAddrs { + // Fetch updated session + // TODO_TECHDEBT(@adshmh): Set a deadline for fetching a session. + session, err := cfn.lazyFullNode.GetSession(context.TODO(), serviceID, appAddr) + if err != nil { + cfn.logger.Error(). + Str("service_id", string(serviceID)). + Str("app_addr", appAddr). + Err(err). + Msg("Failed to fetch session") + continue + } + + // Update the session with new cache key based on current height + newKey := getSessionCacheKey(serviceID, appAddr, height) + updatedSessions[newKey] = sessionCacheEntry{ + hydratedSession: session, + lastUpdated: time.Now(), + } + } + } + + if len(updatedSessions) == 0 { + return nil, fmt.Errorf("Failed to get any sessions") + } + + return updatedSessions, nil +} + // GetApp is only used at startup; relaying fetches sessions for app/session sync. func (cfn *cachingFullNode) GetApp(ctx context.Context, appAddr string) (*apptypes.Application, error) { return cfn.lazyFullNode.GetApp(ctx, appAddr) } -// GetSession returns (and auto-refreshes) the session for a service/app from cache. +// GetSession returns the session for a service/app from cache, fetching if not present. func (cfn *cachingFullNode) GetSession( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger := cfn.logger.With( "service_id", string(serviceID), "app_addr", appAddr, @@ -228,45 +365,33 @@ func (cfn *cachingFullNode) GetSession( logger.Error().Err(err).Msgf( "[cachingFullNode.GetSession] Failed to get current block height", ) - return sessiontypes.Session{}, err + return hydratedSession{}, err } + sessionKey := getSessionCacheKey(serviceID, appAddr, height) - // See: https://github.com/viccon/sturdyc?tab=readme-ov-file#get-or-fetch - session, err := cfn.sessionCache.GetOrFetch( - ctx, - sessionKey, - func(fetchCtx context.Context) (sessiontypes.Session, error) { - logger.Debug().Str("session_key", sessionKey).Msgf("Fetching session from full node") - return cfn.lazyFullNode.GetSession(ctx, serviceID, appAddr) - }, - ) + // Try to get from cache first + cfn.sessionsCache.mu.RLock() + entry, exists := cfn.sessionsCache.sessions[sessionKey] + cfn.sessionsCache.mu.RUnlock() + + if exists { + return entry.hydratedSession, nil + } - return session, err + return hydratedSession{}, fmt.Errorf("session not found") } +// TODO_UPNEXT(@adshmh): Refactor to handle height-based session retrieval from the cache. +// ================================================= +// // GetSessionWithExtendedValidity implements session retrieval with support for // Pocket Network's "session grace period" business logic. -// -// It is used to account for the case when: -// - RelayMiner.FullNode.Height > Gateway.FullNode.Height -// AND -// - RelayMiner.FullNode.Session > Gateway.FullNode.Session -// -// In the context of PATH, it is used to account for the case when: -// - Gateway.FullNode.Height > RelayMiner.FullNode.Height -// AND -// - Gateway.FullNode.Session > RelayMiner.FullNode.Session -// -// Protocol References: -// - https://github.com/pokt-network/poktroll/blob/main/proto/pocket/shared/params.proto -// - https://dev.poktroll.com/protocol/governance/gov_params -// - https://dev.poktroll.com/protocol/primitives/claim_and_proof_lifecycle func (cfn *cachingFullNode) GetSessionWithExtendedValidity( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger := cfn.logger.With( "service_id", string(serviceID), "app_addr", appAddr, @@ -274,24 +399,27 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( ) // Get the current session from cache - currentSession, err := cfn.GetSession(ctx, serviceID, appAddr) + cachedSession, err := cfn.GetSession(ctx, serviceID, appAddr) if err != nil { logger.Error().Err(err).Msg("Failed to get current session") - return sessiontypes.Session{}, fmt.Errorf("error getting current session: %w", err) + return hydratedSession{}, fmt.Errorf("error getting current session: %w", err) } + // Extract the underlying session. + currentSession := cachedSession.session + // Get shared parameters to determine grace period sharedParams, err := cfn.GetSharedParams(ctx) if err != nil { logger.Warn().Err(err).Msg("Failed to get shared params, falling back to current session") - return currentSession, nil + return cachedSession, nil } // Get current block height currentHeight, err := cfn.GetCurrentBlockHeight(ctx) if err != nil { logger.Warn().Err(err).Msg("Failed to get current block height, falling back to current session") - return currentSession, nil + return cachedSession, nil } // Calculate when the previous session's grace period would end @@ -309,7 +437,7 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( // If we're not within the grace period of the previous session, return the current session if currentHeight > prevSessionEndHeightWithExtendedValidity { logger.Debug().Msg("IS NOT WITHIN GRACE PERIOD: Returning current session") - return currentSession, nil + return cachedSession, nil } // Scale down the grace period to aggressively start using the new session @@ -318,27 +446,31 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( logger.Debug(). Int64("prev_session_end_height_with_extended_validity_scaled", prevSessionEndHeightWithExtendedValidityScaled). Msg("IS WITHIN GRACE PERIOD BUT: Returning current session to aggressively start using the new session") - return currentSession, nil + return cachedSession, nil } logger.Debug().Msg("IS WITHIN GRACE PERIOD: Going to fetch previous session") - // Use cache for previous session lookup with a specific key + // Try to get previous session from cache prevSessionKey := getSessionCacheKey(serviceID, appAddr, prevSessionEndHeight) - prevSession, err := cfn.sessionCache.GetOrFetch( - ctx, - prevSessionKey, - func(fetchCtx context.Context) (sessiontypes.Session, error) { - cfn.logger.Debug().Msg("Fetching previous session from full node") - session, fetchErr := cfn.lazyFullNode.GetSessionWithExtendedValidity(fetchCtx, serviceID, appAddr) - if fetchErr != nil { - cfn.logger.Error().Err(fetchErr).Msg("Failed to fetch previous session from full node") - } - return session, fetchErr - }, - ) - return prevSession, err + cfn.sessionsCache.mu.RLock() + entry, exists := cfn.sessionsCache.sessions[prevSessionKey] + cfn.sessionsCache.mu.RUnlock() + + if exists { + logger.Debug().Str("prev_session_key", prevSessionKey).Msg("Previous session found in cache") + return entry.hydratedSession, nil + } + + // Not in cache, fetch from underlying node + logger.Debug().Str("prev_session_key", prevSessionKey).Msg("Previous session not in cache, fetching from full node") + prevSession, err := cfn.lazyFullNode.GetSessionWithExtendedValidity(ctx, serviceID, appAddr) + if err != nil { + logger.Error().Err(err).Msg("Failed to fetch previous session from full node") + return hydratedSession{}, err + } + return prevSession, nil } // getSessionCacheKey builds a unique cache key for session: ::: @@ -346,10 +478,7 @@ func getSessionCacheKey(serviceID protocol.ServiceID, appAddr string, height int return fmt.Sprintf("%s:%s:%s:%d", sessionCacheKeyPrefix, serviceID, appAddr, height) } -// ValidateRelayResponse: -// - Validates the raw response bytes received from an endpoint. -// - Uses the SDK and the caching full node's account client for validation. -// - Will use the caching account client to fetch the account pub key. +// ValidateRelayResponse uses the SDK and the caching full node's account client for validation. func (cfn *cachingFullNode) ValidateRelayResponse( supplierAddr sdk.SupplierAddress, responseBz []byte, @@ -362,58 +491,53 @@ func (cfn *cachingFullNode) ValidateRelayResponse( ) } -// GetAccountClient: passthrough to underlying node (returns caching client). +// GetAccountClient returns the caching account client. func (cfn *cachingFullNode) GetAccountClient() *sdk.AccountClient { return cfn.cachingAccountClient } // IsHealthy: passthrough to underlying node. -// TODO_IMPROVE(@commoddity): -// - Add smarter health checks (e.g. verify cached apps/sessions) -// - Currently always true (cache fills as needed) func (cfn *cachingFullNode) IsHealthy() bool { return cfn.lazyFullNode.IsHealthy() } -// GetSharedParams: cached shared params with early refresh for governance changes. +// GetSharedParams returns cached shared params. func (cfn *cachingFullNode) GetSharedParams(ctx context.Context) (*sharedtypes.Params, error) { - params, err := cfn.sharedParamsCache.GetOrFetch( - ctx, - sharedParamsCacheKey, - func(fetchCtx context.Context) (*sharedtypes.Params, error) { - cfn.logger.Debug().Msg("Fetching shared params from full node") - params, fetchErr := cfn.lazyFullNode.GetSharedParams(fetchCtx) - if fetchErr != nil { - cfn.logger.Error().Err(fetchErr).Msg("Failed to fetch shared params from full node") - } - return params, fetchErr - }, - ) + cfn.sharedCache.mu.RLock() + params := cfn.sharedCache.params + cfn.sharedCache.mu.RUnlock() + + if params == nil { + // Cache not initialized yet, fetch directly + cfn.logger.Debug().Msg("Shared params cache not initialized, fetching from full node") + return nil, fmt.Errorf("Shared params not cached yet.") + } - return params, err + return params, nil } -// GetCurrentBlockHeight: cached block height with a sho TTL and early refresh. +// TODO_TECHDEBT(@adshmh): Add timeout on fetching current block height. +// GetCurrentBlockHeight returns cached block height. func (cfn *cachingFullNode) GetCurrentBlockHeight(ctx context.Context) (int64, error) { - height, err := cfn.blockHeightCache.GetOrFetch( - ctx, - blockHeightCacheKey, - func(fetchCtx context.Context) (int64, error) { - cfn.logger.Debug().Msg("Fetching current block height from full node") - height, fetchErr := cfn.lazyFullNode.GetCurrentBlockHeight(fetchCtx) - if fetchErr != nil { - cfn.logger.Error().Err(fetchErr).Msg("Failed to fetch current block height from full node") - } - return height, fetchErr - }, - ) + cfn.blockCache.mu.RLock() + defer cfn.blockCache.mu.RUnlock() + + height := cfn.blockCache.height + if height == 0 { + return 0, fmt.Errorf("Height not fetched yet") + } - return height, err + return height, nil } // IsInSessionRollover: passthrough to underlying lazy full node. -// The lazy full node manages session rollover monitoring in the background, -// so we simply delegate to its rollover state. func (cfn *cachingFullNode) IsInSessionRollover() bool { return cfn.lazyFullNode.IsInSessionRollover() } + +// Stop gracefully shuts down the caching full node and stops background goroutines. +func (cfn *cachingFullNode) Stop() { + if cfn.cancel != nil { + cfn.cancel() + } +} diff --git a/protocol/shannon/fullnode_lazy.go b/protocol/shannon/fullnode_lazy.go index 2b45a2fa4..cfc9592c1 100644 --- a/protocol/shannon/fullnode_lazy.go +++ b/protocol/shannon/fullnode_lazy.go @@ -11,7 +11,6 @@ import ( "github.com/pokt-network/poktroll/pkg/polylog" apptypes "github.com/pokt-network/poktroll/x/application/types" servicetypes "github.com/pokt-network/poktroll/x/service/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" sdk "github.com/pokt-network/shannon-sdk" sdktypes "github.com/pokt-network/shannon-sdk/types" @@ -103,7 +102,7 @@ func (lfn *LazyFullNode) GetSession( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { session, err := lfn.sessionClient.GetSession( ctx, appAddr, @@ -112,23 +111,30 @@ func (lfn *LazyFullNode) GetSession( ) if err != nil { - return sessiontypes.Session{}, + return hydratedSession{}, fmt.Errorf("GetSession: error getting the session for service %s app %s: %w", serviceID, appAddr, err, ) } if session == nil { - return sessiontypes.Session{}, + return hydratedSession{}, fmt.Errorf("GetSession: got nil session for service %s app %s: %w", serviceID, appAddr, err, ) } // Update session rollover boundaries for rollover monitoring - lfn.rolloverState.updateSessionRolloverBoundaries(*session) + lfn.rolloverState.updateSessionRolloverBoundaries(session) - return *session, nil + // TODO_UPNEXT(@adshmh): Log and handle potential errors. + // ============================================================ + endpoints, _ := endpointsFromSession(session) + + return hydratedSession{ + session: session, + endpoints: endpoints, + }, nil } // ValidateRelayResponse: @@ -187,7 +193,7 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger := lfn.logger.With( "service_id", serviceID, "app_addr", appAddr, @@ -198,7 +204,7 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( currentSession, err := lfn.GetSession(ctx, serviceID, appAddr) if err != nil { logger.Error().Err(err).Msg("failed to get current session") - return sessiontypes.Session{}, fmt.Errorf("error getting current session: %w", err) + return hydratedSession{}, fmt.Errorf("error getting current session: %w", err) } // Get shared parameters to determine grace period @@ -216,15 +222,15 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( } // Calculate when the previous session's grace period would end - prevSessionEndHeight := currentSession.Header.SessionStartBlockHeight - 1 + prevSessionEndHeight := currentSession.session.Header.SessionStartBlockHeight - 1 prevSessionEndHeightWithExtendedValidity := prevSessionEndHeight + int64(sharedParams.GracePeriodEndOffsetBlocks) logger = logger.With( "prev_session_end_height", prevSessionEndHeight, "prev_session_end_height_with_extended_validity", prevSessionEndHeightWithExtendedValidity, "current_height", currentHeight, - "current_session_start_height", currentSession.Header.SessionStartBlockHeight, - "current_session_end_height", currentSession.Header.SessionEndBlockHeight, + "current_session_start_height", currentSession.session.Header.SessionStartBlockHeight, + "current_session_end_height", currentSession.session.Header.SessionEndBlockHeight, ) // If we're not within the grace period of the previous session, return the current session @@ -241,10 +247,10 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( } // Update session rollover boundaries for rollover monitoring - lfn.rolloverState.updateSessionRolloverBoundaries(currentSession) + lfn.rolloverState.updateSessionRolloverBoundaries(currentSession.session) // Return the previous session - return *prevSession, nil + return createHydratedSession(prevSession) } // IsInSessionRollover returns true if we're currently in a session rollover period. diff --git a/protocol/shannon/fullnode_session_rollover.go b/protocol/shannon/fullnode_session_rollover.go index 25988b9cf..33f022863 100644 --- a/protocol/shannon/fullnode_session_rollover.go +++ b/protocol/shannon/fullnode_session_rollover.go @@ -129,7 +129,7 @@ func (srs *sessionRolloverState) updateBlockHeight() { // updateSessionRolloverBoundaries updates rollover boundaries when we fetch a new session. // Called from GetSession() to keep rollover monitoring current. // Only updates rollover boundaries if the current rollover period has ended. -func (srs *sessionRolloverState) updateSessionRolloverBoundaries(session sessiontypes.Session) { +func (srs *sessionRolloverState) updateSessionRolloverBoundaries(session *sessiontypes.Session) { if session.Header == nil { srs.logger.Warn().Msg("Session header is nil, cannot update session values") return diff --git a/protocol/shannon/gateway_mode.go b/protocol/shannon/gateway_mode.go index bec1743e0..05b6d9869 100644 --- a/protocol/shannon/gateway_mode.go +++ b/protocol/shannon/gateway_mode.go @@ -7,7 +7,6 @@ import ( "slices" apptypes "github.com/pokt-network/poktroll/x/application/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/protocol" ) @@ -36,7 +35,7 @@ func (p *Protocol) getActiveGatewaySessions( ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request, -) ([]sessiontypes.Session, error) { +) ([]hydratedSession, error) { p.logger.With( "service_id", serviceID, "gateway_mode", p.gatewayMode, diff --git a/protocol/shannon/mode_centralized.go b/protocol/shannon/mode_centralized.go index 20739a8f7..e322afd36 100644 --- a/protocol/shannon/mode_centralized.go +++ b/protocol/shannon/mode_centralized.go @@ -13,8 +13,6 @@ import ( "context" "fmt" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" - "github.com/buildwithgrove/path/protocol" ) @@ -22,7 +20,7 @@ import ( func (p *Protocol) getCentralizedGatewayModeActiveSessions( ctx context.Context, serviceID protocol.ServiceID, -) ([]sessiontypes.Session, error) { +) ([]hydratedSession, error) { logger := p.logger.With( "method", "getCentralizedGatewayModeActiveSessions", "service_id", string(serviceID), @@ -39,9 +37,9 @@ func (p *Protocol) getCentralizedGatewayModeActiveSessions( } // Loop over the address of apps owned by the gateway in Centralized gateway mode. - var ownedAppSessions []sessiontypes.Session + var ownedAppSessions []hydratedSession for _, ownedAppAddr := range ownedAppsForService { - session, err := p.getSession(ctx, logger, ownedAppAddr, serviceID) + session, err := p.getSession(ctx, p.logger, ownedAppAddr, serviceID) if err != nil { return nil, err } diff --git a/protocol/shannon/mode_delegated.go b/protocol/shannon/mode_delegated.go index 3910472eb..9c456f364 100644 --- a/protocol/shannon/mode_delegated.go +++ b/protocol/shannon/mode_delegated.go @@ -11,7 +11,6 @@ import ( "net/http" apptypes "github.com/pokt-network/poktroll/x/application/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/protocol" "github.com/buildwithgrove/path/request" @@ -28,7 +27,7 @@ func (p *Protocol) getDelegatedGatewayModeActiveSession( ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request, -) ([]sessiontypes.Session, error) { +) ([]hydratedSession, error) { logger := p.logger.With("method", "getDelegatedGatewayModeActiveSession") extractedAppAddr, err := getAppAddrFromHTTPReq(httpReq) @@ -45,7 +44,7 @@ func (p *Protocol) getDelegatedGatewayModeActiveSession( } // Skip the session's app if it is not staked for the requested service. - selectedApp := session.Application + selectedApp := session.session.Application if !appIsStakedForService(serviceID, selectedApp) { err = fmt.Errorf("%w: Trying to use app %s that is not staked for the service %s", errProtocolContextSetupAppNotStaked, selectedApp.Address, serviceID) logger.Error().Err(err).Msgf("SHOULD NEVER HAPPEN: %s", err.Error()) @@ -54,7 +53,7 @@ func (p *Protocol) getDelegatedGatewayModeActiveSession( logger.Debug().Msgf("successfully verified the gateway (%s) has delegation for the selected app (%s) for service (%s).", p.gatewayAddr, selectedApp.Address, serviceID) - return []sessiontypes.Session{session}, nil + return []hydratedSession{session}, nil } // appIsStakedForService returns true if the supplied application is staked for the supplied service ID. diff --git a/protocol/shannon/observation.go b/protocol/shannon/observation.go index 314fcb497..a5c51a717 100644 --- a/protocol/shannon/observation.go +++ b/protocol/shannon/observation.go @@ -233,8 +233,8 @@ func buildEndpointFromObservation( // Used to identify an endpoint for applying sanctions. func buildSessionFromObservation( observation *protocolobservations.ShannonEndpointObservation, -) sessiontypes.Session { - return sessiontypes.Session{ +) *sessiontypes.Session { + return &sessiontypes.Session{ // Only Session Header is required for processing observations. Header: &sessiontypes.SessionHeader{ ApplicationAddress: observation.GetEndpointAppAddress(), diff --git a/protocol/shannon/protocol.go b/protocol/shannon/protocol.go index e2fbc2798..61183e432 100644 --- a/protocol/shannon/protocol.go +++ b/protocol/shannon/protocol.go @@ -7,7 +7,6 @@ import ( "net/http" "github.com/pokt-network/poktroll/pkg/polylog" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/gateway" "github.com/buildwithgrove/path/health" @@ -162,6 +161,7 @@ func (p *Protocol) AvailableEndpoints( // endpoints will be used to populate the list of endpoints. // // The final boolean parameter sets whether to filter out sanctioned endpoints. + endpoints, err := p.getUniqueEndpoints(ctx, serviceID, activeSessions, true) if err != nil { logger.Error().Err(err).Msg(err.Error()) @@ -327,7 +327,7 @@ func (p *Protocol) IsAlive() bool { func (p *Protocol) getUniqueEndpoints( ctx context.Context, serviceID protocol.ServiceID, - activeSessions []sessiontypes.Session, + activeSessions []hydratedSession, filterSanctioned bool, ) (map[protocol.EndpointAddr]endpoint, error) { logger := p.logger.With( @@ -380,7 +380,7 @@ func (p *Protocol) getUniqueEndpoints( func (p *Protocol) getSessionsUniqueEndpoints( _ context.Context, serviceID protocol.ServiceID, - activeSessions []sessiontypes.Session, + activeSessions []hydratedSession, filterSanctioned bool, // will be true for calls made by service request handling. ) (map[protocol.EndpointAddr]endpoint, error) { logger := p.logger.With( @@ -407,13 +407,8 @@ func (p *Protocol) getSessionsUniqueEndpoints( logger.ProbabilisticDebugInfo(polylog.ProbabilisticDebugInfoProb).Msgf("Finding unique endpoints for session %s for app %s for service %s.", session.SessionId, app.Address, serviceID) // Retrieve all endpoints for the session. - sessionEndpoints, err := endpointsFromSession(session) - if err != nil { - logger.Error().Err(err).Msgf("Internal error: error getting all endpoints for service %s app %s and session: skipping the app.", serviceID, app.Address) - continue - } + qualifiedEndpoints := session.endpoints - qualifiedEndpoints := sessionEndpoints // Filter out sanctioned endpoints if requested. if filterSanctioned { logger.Debug().Msgf( diff --git a/protocol/shannon/session.go b/protocol/shannon/session.go index ab6e8e7b8..60ddfaccb 100644 --- a/protocol/shannon/session.go +++ b/protocol/shannon/session.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/pokt-network/poktroll/pkg/polylog" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/protocol" ) @@ -28,18 +27,16 @@ func (p *Protocol) getSession( logger polylog.Logger, appAddr string, serviceID protocol.ServiceID, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger.Info().Msgf("About to get a session for app %s for service %s", appAddr, serviceID) var err error - var session sessiontypes.Session + var session hydratedSession - // Retrieve the session for the owned app, without grace period logic. - if extendedSessionEnabled { - session, err = p.GetSessionWithExtendedValidity(ctx, serviceID, appAddr) - } else { - session, err = p.GetSession(ctx, serviceID, appAddr) - } + // TODO_TECHDEBT(@adshmh): Support sessions with grace period. + // Use GetSessionWithExtendedValidity method. + // + session, err = p.GetSession(ctx, serviceID, appAddr) if err != nil { err = fmt.Errorf("%w: Error getting the current session from the full node for app: %s, error: %w", errProtocolContextSetupFetchSession, appAddr, err) logger.Error().Err(err).Msgf("SHOULD NEVER HAPPEN: %s", err.Error()) @@ -47,7 +44,7 @@ func (p *Protocol) getSession( } // Select the first session in the list. - selectedApp := session.Application + selectedApp := session.session.Application logger.Debug().Msgf("fetched the app with the selected address %s.", selectedApp.Address) if appAddr != selectedApp.Address { From e4dd912fac78c9c7bbca3c64caba06069534fd7b Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 10:59:55 -0400 Subject: [PATCH 02/15] Initialize the caching full node --- cmd/shannon.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/shannon.go b/cmd/shannon.go index 3b31a3766..8928a506d 100644 --- a/cmd/shannon.go +++ b/cmd/shannon.go @@ -27,7 +27,8 @@ func getShannonFullNode(logger polylog.Logger, config *shannonconfig.ShannonGate return lazyFullNode, nil } - fullNode, err := shannon.NewCachingFullNode(logger, lazyFullNode, fullNodeConfig.CacheConfig) + // TODO_TECHDEBT(@adshmh): Refactor to clarify the fullnode's config requirements (including owned apps). + fullNode, err := shannon.NewCachingFullNode(logger, lazyFullNode, fullNodeConfig.CacheConfig, config.GatewayConfig) if err != nil { return nil, fmt.Errorf("failed to create a Shannon caching full node instance: %w", err) } From 187a6192cd36338b96697c707d8b01e52d677c87 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 11:08:40 -0400 Subject: [PATCH 03/15] Fix linter warnings --- cmd/noop_logger.go | 6 +++--- protocol/shannon/fullnode_cache.go | 1 - protocol/shannon/protocol.go | 18 +++++++++--------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/cmd/noop_logger.go b/cmd/noop_logger.go index 7480c7321..94061cac5 100644 --- a/cmd/noop_logger.go +++ b/cmd/noop_logger.go @@ -1,4 +1,4 @@ -package cmd +package main import ( "context" @@ -57,7 +57,7 @@ func (n *noOpLogger) WithLevel(level polylog.Level) polylog.Event { } // Write implements io.Writer by doing nothing and returning the length of p. -func (n *noOpLogger) Write(p []byte) (n int, err error) { +func (n *noOpLogger) Write(p []byte) (int, error) { return len(p), nil } @@ -188,4 +188,4 @@ func (e *noOpEvent) Msgf(format string, keyVals ...interface{}) { // Send does nothing. func (e *noOpEvent) Send() { // no-op -} \ No newline at end of file +} diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index c8232a280..f8f7b71e9 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -3,7 +3,6 @@ package shannon import ( "context" "fmt" - "strings" "sync" "time" diff --git a/protocol/shannon/protocol.go b/protocol/shannon/protocol.go index 61183e432..75a06fab9 100644 --- a/protocol/shannon/protocol.go +++ b/protocol/shannon/protocol.go @@ -396,24 +396,24 @@ func (p *Protocol) getSessionsUniqueEndpoints( endpoints := make(map[protocol.EndpointAddr]endpoint) // Iterate over all active sessions for the service ID. - for _, session := range activeSessions { - app := session.Application + for _, hydratedSession := range activeSessions { + app := hydratedSession.session.Application // Using a single iteration scope for this logger. // Avoids adding all apps in the loop to the logger's fields. // Hydrate the logger with session details. logger := logger.With("valid_app_address", app.Address).With("method", "getSessionsUniqueEndpoints") - logger = hydrateLoggerWithSession(logger, &session) - logger.ProbabilisticDebugInfo(polylog.ProbabilisticDebugInfoProb).Msgf("Finding unique endpoints for session %s for app %s for service %s.", session.SessionId, app.Address, serviceID) + logger = hydrateLoggerWithSession(logger, hydratedSession.session) + logger.ProbabilisticDebugInfo(polylog.ProbabilisticDebugInfoProb).Msgf("Finding unique endpoints for session %s for app %s for service %s.", hydratedSession.session.SessionId, app.Address, serviceID) // Retrieve all endpoints for the session. - qualifiedEndpoints := session.endpoints + qualifiedEndpoints := hydratedSession.endpoints // Filter out sanctioned endpoints if requested. if filterSanctioned { logger.Debug().Msgf( "app %s has %d endpoints before filtering sanctioned endpoints.", - app.Address, len(sessionEndpoints), + app.Address, len(hydratedSession.endpoints), ) // Filter out any sanctioned endpoints @@ -422,7 +422,7 @@ func (p *Protocol) getSessionsUniqueEndpoints( if len(filteredEndpoints) == 0 { logger.Error().Msgf( "All %d session endpoints are sanctioned for service %s, app %s. Skipping the app.", - len(sessionEndpoints), serviceID, app.Address, + len(hydratedSession.endpoints), serviceID, app.Address, ) continue } @@ -432,13 +432,13 @@ func (p *Protocol) getSessionsUniqueEndpoints( } // Log the number of endpoints before and after filtering - logger.Info().Msgf("Filtered session endpoints for app %s from %d to %d.", app.Address, len(sessionEndpoints), len(qualifiedEndpoints)) + logger.Info().Msgf("Filtered session endpoints for app %s from %d to %d.", app.Address, len(hydratedSession.endpoints), len(qualifiedEndpoints)) maps.Copy(endpoints, qualifiedEndpoints) logger.Info().Msgf( "Successfully fetched %d endpoints for session %s for application %s for service %s.", - len(qualifiedEndpoints), session.SessionId, app.Address, serviceID, + len(qualifiedEndpoints), hydratedSession.session.SessionId, app.Address, serviceID, ) } From 29f0a40e1faa133f643984b655ce722c118cf2a7 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 11:20:57 -0400 Subject: [PATCH 04/15] Fix typo in tests --- protocol/shannon/fullnode_session_rollover_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/shannon/fullnode_session_rollover_test.go b/protocol/shannon/fullnode_session_rollover_test.go index 6813fd57b..95bfa8b6d 100644 --- a/protocol/shannon/fullnode_session_rollover_test.go +++ b/protocol/shannon/fullnode_session_rollover_test.go @@ -141,7 +141,7 @@ func Test_updateSessionRolloverBoundaries(t *testing.T) { srs.sessionRolloverEnd = tt.initialRolloverEnd srs.currentBlockHeight = tt.initialBlockHeight - srs.updateSessionRolloverBoundaries(tt.session) + srs.updateSessionRolloverBoundaries(&tt.session) if tt.shouldReturn && (srs.sessionRolloverStart != tt.expectedRolloverStart || srs.sessionRolloverEnd != tt.expectedRolloverEnd) { t.Errorf("updateSessionRolloverBoundaries() should have returned early, but state changed") From cd61e4b1ca969859c009eb9c084df6f026693141 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 11:32:36 -0400 Subject: [PATCH 05/15] Fix linter warnings --- protocol/shannon/fullnode_cache.go | 10 +++------- protocol/shannon/session.go | 17 +++++++++-------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index f8f7b71e9..1d2ebea6c 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -20,13 +20,9 @@ import ( // ---------------- Cache Configuration ---------------- const ( - // TODO_IMPROVE: Make this configurable - sharedParamsCacheKey = "shared_params" // TODO_IMPROVE: Make this configurable sharedParamsCacheTTL = 2 * time.Minute // Shared params change infrequently - // TODO_IMPROVE: Make this configurable - blockHeightCacheKey = "block_height" // TODO_IMPROVE: Make this configurable blockHeightCacheTTL = 15 * time.Second // Block height changes frequently @@ -336,7 +332,7 @@ func (cfn *cachingFullNode) fetchAllSessions() (map[string]sessionCacheEntry, er } if len(updatedSessions) == 0 { - return nil, fmt.Errorf("Failed to get any sessions") + return nil, fmt.Errorf("failed to get any sessions") } return updatedSessions, nil @@ -509,7 +505,7 @@ func (cfn *cachingFullNode) GetSharedParams(ctx context.Context) (*sharedtypes.P if params == nil { // Cache not initialized yet, fetch directly cfn.logger.Debug().Msg("Shared params cache not initialized, fetching from full node") - return nil, fmt.Errorf("Shared params not cached yet.") + return nil, fmt.Errorf("shared params not cached yet.") } return params, nil @@ -523,7 +519,7 @@ func (cfn *cachingFullNode) GetCurrentBlockHeight(ctx context.Context) (int64, e height := cfn.blockCache.height if height == 0 { - return 0, fmt.Errorf("Height not fetched yet") + return 0, fmt.Errorf("height not fetched yet") } return height, nil diff --git a/protocol/shannon/session.go b/protocol/shannon/session.go index 60ddfaccb..dc2cd34f2 100644 --- a/protocol/shannon/session.go +++ b/protocol/shannon/session.go @@ -10,14 +10,15 @@ import ( ) var ( - // TODO_UPNEXT(@olshansk): Experiment the difference between active and extended sessions. - // - Make this configurable at the gateway yaml level - // - Add metrics to track how active vs extended sessions are used - // - Evaluate the impact of active vs extended sessions on performance - // - Enable making two parallel requests: one with active session and one with extended session - // DEV_NOTE: As of PR #339, we are hard-coding this to prevent any business logic changes to enable - // faster iteration on main and prevent the outstanding PR from getting stale. - extendedSessionEnabled = false +// TODO_UPNEXT(@olshansk): Experiment the difference between active and extended sessions. +// - Make this configurable at the gateway yaml level +// - Add metrics to track how active vs extended sessions are used +// - Evaluate the impact of active vs extended sessions on performance +// - Enable making two parallel requests: one with active session and one with extended session +// DEV_NOTE: As of PR #339, we are hard-coding this to prevent any business logic changes to enable +// faster iteration on main and prevent the outstanding PR from getting stale. + +// extendedSessionEnabled = false ) // getSession returns the session for the app address provided. From cdc937b43849dbae639972672063bc88890ab240 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 11:38:00 -0400 Subject: [PATCH 06/15] Fix linter warning --- protocol/shannon/fullnode_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index 1d2ebea6c..508dcf4b0 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -505,7 +505,7 @@ func (cfn *cachingFullNode) GetSharedParams(ctx context.Context) (*sharedtypes.P if params == nil { // Cache not initialized yet, fetch directly cfn.logger.Debug().Msg("Shared params cache not initialized, fetching from full node") - return nil, fmt.Errorf("shared params not cached yet.") + return nil, fmt.Errorf("shared params not cached yet") } return params, nil From 0ce03fd0040bfcfeb21c47dbc3f67c3a0cf6814c Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 15:05:13 -0400 Subject: [PATCH 07/15] Enable NoOp logger --- cmd/main.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 613a22812..f0306c88f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,9 +12,6 @@ import ( "syscall" "time" - "github.com/pokt-network/poktroll/pkg/polylog" - "github.com/pokt-network/poktroll/pkg/polylog/polyzero" - configpkg "github.com/buildwithgrove/path/config" "github.com/buildwithgrove/path/gateway" "github.com/buildwithgrove/path/health" @@ -56,10 +53,9 @@ func main() { // Initialize the logger log.Printf(`{"level":"info","message":"Initializing PATH logger with level: %s"}`, config.Logger.Level) - loggerOpts := []polylog.LoggerOption{ - polyzero.WithLevel(polyzero.ParseLevel(config.Logger.Level)), - } - logger := polyzero.NewLogger(loggerOpts...) + + // TODO_HACK(@adshmh): Revert to using polylog. + logger := NewNoOpLogger() // Log the config path logger.Info().Msgf("Starting PATH using config file: %s", configPath) From 76332d09735443eb4a6fe40a0b9a34bb159cdc65 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Wed, 3 Sep 2025 15:47:14 -0400 Subject: [PATCH 08/15] Use latest session in cache --- protocol/shannon/fullnode_cache.go | 35 ++++++++---------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index 508dcf4b0..980b670f5 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -295,13 +295,8 @@ func (cfn *cachingFullNode) updateSessionCache() { } } +// TODO_UPNEXT(@adshmh): Support height-based session retrieval. func (cfn *cachingFullNode) fetchAllSessions() (map[string]sessionCacheEntry, error) { - // Get current block height for session updates - height, err := cfn.GetCurrentBlockHeight(cfn.ctx) - if err != nil { - return nil, err - } - // Initialize updated sessions updatedSessions := make(map[string]sessionCacheEntry) @@ -323,7 +318,7 @@ func (cfn *cachingFullNode) fetchAllSessions() (map[string]sessionCacheEntry, er } // Update the session with new cache key based on current height - newKey := getSessionCacheKey(serviceID, appAddr, height) + newKey := getSessionCacheKey(serviceID, appAddr) updatedSessions[newKey] = sessionCacheEntry{ hydratedSession: session, lastUpdated: time.Now(), @@ -349,21 +344,7 @@ func (cfn *cachingFullNode) GetSession( serviceID protocol.ServiceID, appAddr string, ) (hydratedSession, error) { - logger := cfn.logger.With( - "service_id", string(serviceID), - "app_addr", appAddr, - "method", "GetSession", - ) - - height, err := cfn.GetCurrentBlockHeight(ctx) - if err != nil { - logger.Error().Err(err).Msgf( - "[cachingFullNode.GetSession] Failed to get current block height", - ) - return hydratedSession{}, err - } - - sessionKey := getSessionCacheKey(serviceID, appAddr, height) + sessionKey := getSessionCacheKey(serviceID, appAddr) // Try to get from cache first cfn.sessionsCache.mu.RLock() @@ -446,8 +427,10 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( logger.Debug().Msg("IS WITHIN GRACE PERIOD: Going to fetch previous session") + // TODO_UPNEXT(@adshmh): Support height-based session retrieval. + // // Try to get previous session from cache - prevSessionKey := getSessionCacheKey(serviceID, appAddr, prevSessionEndHeight) + prevSessionKey := getSessionCacheKey(serviceID, appAddr) cfn.sessionsCache.mu.RLock() entry, exists := cfn.sessionsCache.sessions[prevSessionKey] @@ -468,9 +451,9 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( return prevSession, nil } -// getSessionCacheKey builds a unique cache key for session: ::: -func getSessionCacheKey(serviceID protocol.ServiceID, appAddr string, height int64) string { - return fmt.Sprintf("%s:%s:%s:%d", sessionCacheKeyPrefix, serviceID, appAddr, height) +// getSessionCacheKey builds a unique cache key for session: :: +func getSessionCacheKey(serviceID protocol.ServiceID, appAddr string) string { + return fmt.Sprintf("%s:%s:%s", sessionCacheKeyPrefix, serviceID, appAddr) } // ValidateRelayResponse uses the SDK and the caching full node's account client for validation. From 8d7c35bb65686fba68671870f603f22893a2461c Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 07:45:00 -0400 Subject: [PATCH 09/15] Revert "Enable NoOp logger" This reverts commit 0ce03fd0040bfcfeb21c47dbc3f67c3a0cf6814c. --- cmd/main.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index f0306c88f..613a22812 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,6 +12,9 @@ import ( "syscall" "time" + "github.com/pokt-network/poktroll/pkg/polylog" + "github.com/pokt-network/poktroll/pkg/polylog/polyzero" + configpkg "github.com/buildwithgrove/path/config" "github.com/buildwithgrove/path/gateway" "github.com/buildwithgrove/path/health" @@ -53,9 +56,10 @@ func main() { // Initialize the logger log.Printf(`{"level":"info","message":"Initializing PATH logger with level: %s"}`, config.Logger.Level) - - // TODO_HACK(@adshmh): Revert to using polylog. - logger := NewNoOpLogger() + loggerOpts := []polylog.LoggerOption{ + polyzero.WithLevel(polyzero.ParseLevel(config.Logger.Level)), + } + logger := polyzero.NewLogger(loggerOpts...) // Log the config path logger.Info().Msgf("Starting PATH using config file: %s", configPath) From 00d8b796936f87ac26f9ae9b740fcfd9c187f4e3 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 07:45:44 -0400 Subject: [PATCH 10/15] Remove NoOp logger --- cmd/noop_logger.go | 191 --------------------------------------------- 1 file changed, 191 deletions(-) delete mode 100644 cmd/noop_logger.go diff --git a/cmd/noop_logger.go b/cmd/noop_logger.go deleted file mode 100644 index 94061cac5..000000000 --- a/cmd/noop_logger.go +++ /dev/null @@ -1,191 +0,0 @@ -package main - -import ( - "context" - "time" - - "github.com/pokt-network/poktroll/pkg/polylog" -) - -// noOpLogger is a Logger implementation that performs no operations. -// All methods return no-op implementations or the receiver itself. -type noOpLogger struct{} - -// NewNoOpLogger creates a new no-operation logger. -func NewNoOpLogger() polylog.Logger { - return &noOpLogger{} -} - -// Debug returns a no-op Event. -func (n *noOpLogger) Debug() polylog.Event { - return &noOpEvent{} -} - -// ProbabilisticDebugInfo returns a no-op Event, ignoring the probability. -func (n *noOpLogger) ProbabilisticDebugInfo(float64) polylog.Event { - return &noOpEvent{} -} - -// Info returns a no-op Event. -func (n *noOpLogger) Info() polylog.Event { - return &noOpEvent{} -} - -// Warn returns a no-op Event. -func (n *noOpLogger) Warn() polylog.Event { - return &noOpEvent{} -} - -// Error returns a no-op Event. -func (n *noOpLogger) Error() polylog.Event { - return &noOpEvent{} -} - -// With returns the same no-op logger, ignoring the key-value pairs. -func (n *noOpLogger) With(keyVals ...any) polylog.Logger { - return n -} - -// WithContext returns the context unchanged. -func (n *noOpLogger) WithContext(ctx context.Context) context.Context { - return ctx -} - -// WithLevel returns a no-op Event, ignoring the level. -func (n *noOpLogger) WithLevel(level polylog.Level) polylog.Event { - return &noOpEvent{} -} - -// Write implements io.Writer by doing nothing and returning the length of p. -func (n *noOpLogger) Write(p []byte) (int, error) { - return len(p), nil -} - -// noOpEvent is an Event implementation that performs no operations. -// All methods return the receiver itself or appropriate default values. -type noOpEvent struct{} - -// Str returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Str(key, value string) polylog.Event { - return e -} - -// Bool returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Bool(key string, value bool) polylog.Event { - return e -} - -// Int returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Int(key string, value int) polylog.Event { - return e -} - -// Int8 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Int8(key string, value int8) polylog.Event { - return e -} - -// Int16 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Int16(key string, value int16) polylog.Event { - return e -} - -// Int32 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Int32(key string, value int32) polylog.Event { - return e -} - -// Int64 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Int64(key string, value int64) polylog.Event { - return e -} - -// Uint returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Uint(key string, value uint) polylog.Event { - return e -} - -// Uint8 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Uint8(key string, value uint8) polylog.Event { - return e -} - -// Uint16 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Uint16(key string, value uint16) polylog.Event { - return e -} - -// Uint32 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Uint32(key string, value uint32) polylog.Event { - return e -} - -// Uint64 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Uint64(key string, value uint64) polylog.Event { - return e -} - -// Float32 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Float32(key string, value float32) polylog.Event { - return e -} - -// Float64 returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Float64(key string, value float64) polylog.Event { - return e -} - -// Err returns the receiver, ignoring the error. -func (e *noOpEvent) Err(err error) polylog.Event { - return e -} - -// Timestamp returns the receiver, ignoring the timestamp. -func (e *noOpEvent) Timestamp() polylog.Event { - return e -} - -// Time returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Time(key string, value time.Time) polylog.Event { - return e -} - -// Dur returns the receiver, ignoring the key-value pair. -func (e *noOpEvent) Dur(key string, value time.Duration) polylog.Event { - return e -} - -// Fields returns the receiver, ignoring the fields. -func (e *noOpEvent) Fields(fields any) polylog.Event { - return e -} - -// Func returns the receiver, ignoring the function. -func (e *noOpEvent) Func(func(polylog.Event)) polylog.Event { - return e -} - -// Enabled always returns false since this is a no-op implementation. -func (e *noOpEvent) Enabled() bool { - return false -} - -// Discard returns the receiver. -func (e *noOpEvent) Discard() polylog.Event { - return e -} - -// Msg does nothing. -func (e *noOpEvent) Msg(message string) { - // no-op -} - -// Msgf does nothing. -func (e *noOpEvent) Msgf(format string, keyVals ...interface{}) { - // no-op -} - -// Send does nothing. -func (e *noOpEvent) Send() { - // no-op -} From 1baf3a8ab11cf4991892826002027164193232c7 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 07:51:31 -0400 Subject: [PATCH 11/15] Use session pointer in websocket observation --- protocol/shannon/observation_websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/shannon/observation_websocket.go b/protocol/shannon/observation_websocket.go index 379e6b9a2..34522a0e9 100644 --- a/protocol/shannon/observation_websocket.go +++ b/protocol/shannon/observation_websocket.go @@ -305,7 +305,7 @@ func buildEndpointFromWebSocketConnectionObservation( ) endpoint { session := buildSessionFromWebSocketConnectionObservation(observation) return &protocolEndpoint{ - session: session, + session: &session, supplier: observation.GetSupplier(), url: observation.GetEndpointUrl(), } From 99cce9eee94c3eb5f4d6c079844188683e9ca120 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 10:46:28 -0400 Subject: [PATCH 12/15] Fix comment formatting --- protocol/shannon/fullnode_cache.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index 980b670f5..bba76cee7 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -359,7 +359,6 @@ func (cfn *cachingFullNode) GetSession( } // TODO_UPNEXT(@adshmh): Refactor to handle height-based session retrieval from the cache. -// ================================================= // // GetSessionWithExtendedValidity implements session retrieval with support for // Pocket Network's "session grace period" business logic. From 351bde9b3c487b3eae5670a8e570d8ac2a33fe9e Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 11:17:37 -0400 Subject: [PATCH 13/15] Support load testing via config --- protocol/shannon/config.go | 17 ++++++++++++++++ protocol/shannon/context.go | 39 +++++++++++++++++++++++++++++++++++- protocol/shannon/protocol.go | 10 +++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/protocol/shannon/config.go b/protocol/shannon/config.go index 3729481b6..e4fba9d0c 100644 --- a/protocol/shannon/config.go +++ b/protocol/shannon/config.go @@ -73,6 +73,11 @@ type ( GatewayPrivateKeyHex string `yaml:"gateway_private_key_hex"` OwnedAppsPrivateKeysHex []string `yaml:"owned_apps_private_keys_hex"` ServiceFallback []ServiceFallback `yaml:"service_fallback"` + // Optional. + // Puts the Gateway in LoadTesting mode if specified. + // All relays will be sent to a fixed URL. + // Allows measuring performance of PATH and full node(s) in isolation. + LoadTestingConfig *LoadTestingConfig `yaml:"load_testing_config"` } // TODO_TECHDEBT(@adshmh): Make configuration and implementation explicit: @@ -88,6 +93,18 @@ type ( // regardless of the health of the protocol endpoints. SendAllTraffic bool `yaml:"send_all_traffic"` } + + // Load testing configuration. + // Used to track Gateway's performance when using "perfect" endpoints. + // If specified: + // - Directs all relays to the specified backend service URL + // - No protocol or fallback endpoint used. + // - Assumes high throughput backend service (e.g. nginx with a fixed response) + LoadTestingConfig struct { + // The URL to use for sending relays. + BackendServiceURL string `yaml:"backend_service_url"` + // TODO_UPNEXT(@adshmh): Support using a fixed URL for a Shannon endpoint/RelayMiner during load testing. + } ) func (gc GatewayConfig) Validate() error { diff --git a/protocol/shannon/context.go b/protocol/shannon/context.go index 5b0774aa0..8cb614d91 100644 --- a/protocol/shannon/context.go +++ b/protocol/shannon/context.go @@ -100,6 +100,14 @@ type requestContext struct { // fallbackEndpoints is used to retrieve a fallback endpoint by an endpoint address. fallbackEndpoints map[protocol.EndpointAddr]endpoint + + // Optional. + // Puts the Gateway in LoadTesting mode if specified. + // All relays will be sent to a fixed URL. + // Allows measuring performance of PATH and full node(s) in isolation. + // Applies to Single Relay ONLY + // No parallel requests for a single relay in load testing mode. + loadTestingConfig *LoadTestingConfig } // HandleServiceRequest: @@ -338,6 +346,13 @@ func (rc *requestContext) executeRelayRequestStrategy(payload protocol.Payload) rc.hydrateLogger("executeRelayRequestStrategy") switch { + + // ** Priority 0: Load testing mode ** + // Use the configured load testing backend server. + case rc.loadTestingConfig != nil: + rc.logger.Debug().Msg("LoadTesting Mode: Sending relay to the load test backend server") + return rc.sendProtocolRelay(payload) + // ** Priority 1: Check Endpoint type ** // Direct fallback endpoint // - Bypasses protocol validation and Shannon network @@ -524,6 +539,17 @@ func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol. return defaultResponse, fmt.Errorf("SHOULD NEVER HAPPEN: failed to marshal relay request: %w", err) } + // TODO_UPNEXT(@adshmh): parse the LoadTesting server's URL in-advance. + var targetServerURL string + switch { + // LoadTesting mode: use the fixed URL. + case rc.loadTestingConfig != nil: + targetServerURL = rc.loadTestingConfig.BackendServiceURL + // Default: use the selected endoint's URL + default: + targetServerURL = selectedEndpoint.PublicURL() + } + // TODO_TECHDEBT(@adshmh): Add a new struct to track details about the HTTP call. // It should contain at-least: // - endpoint payload @@ -531,7 +557,7 @@ func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol. // Use the new struct to pass data around for logging/metrics/etc. // // Send the HTTP request to the protocol endpoint. - httpRelayResponseBz, httpStatusCode, err := rc.sendHTTPRequest(payload, selectedEndpoint.PublicURL(), relayRequestBz) + httpRelayResponseBz, httpStatusCode, err := rc.sendHTTPRequest(payload, targetServerURL, relayRequestBz) if err != nil { return defaultResponse, err } @@ -541,6 +567,17 @@ func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol. return defaultResponse, fmt.Errorf("%w %w: %d", errSendHTTPRelay, errEndpointNon2XXHTTPStatusCode, httpStatusCode) } + // LoadTesting mode: return the backend server's response as-is. + if rc.loadTestingConfig != nil { + return protocol.Response{ + Bytes: httpRelayResponseBz, + HTTPStatusCode: httpStatusCode, + // Intentionally leaving the endpoint address empty. + // Ensuring to sanctions/invalidation rules apply to LoadTesting backend server + EndpointAddr: "", + }, nil + } + // Validate and process the response response, err := rc.validateAndProcessResponse(httpRelayResponseBz) if err != nil { diff --git a/protocol/shannon/protocol.go b/protocol/shannon/protocol.go index 28ad25303..c1d6ef2b9 100644 --- a/protocol/shannon/protocol.go +++ b/protocol/shannon/protocol.go @@ -73,6 +73,12 @@ type Protocol struct { // Each service can have a SendAllTraffic flag to send all traffic to // fallback endpoints, regardless of the health of the protocol endpoints. serviceFallbackMap map[protocol.ServiceID]serviceFallback + + // Optional. + // Puts the Gateway in LoadTesting mode if specified. + // All relays will be sent to a fixed URL. + // Allows measuring performance of PATH and full node(s) in isolation. + loadTestingConfig *LoadTestingConfig } // serviceFallback holds the fallback information for a service, @@ -122,6 +128,9 @@ func NewProtocol( // serviceFallbacks contains the fallback information for each service. serviceFallbackMap: config.getServiceFallbackMap(), + + // load testing config, if specified. + loadTestingConfig: config.LoadTestingConfig, } return protocolInstance, nil @@ -333,6 +342,7 @@ func (p *Protocol) BuildHTTPRequestContextForEndpoint( relayRequestSigner: permittedSigner, httpClient: p.httpClient, fallbackEndpoints: fallbackEndpoints, + loadTestingConfig: p.loadTestingConfig, }, protocolobservations.Observations{}, nil } From 2a8b574eef4cfb3d09f3847c60f26d9b46d4b1f6 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 12:13:00 -0400 Subject: [PATCH 14/15] Delay healthy status on caching full node until sessions are fetched --- protocol/shannon/fullnode_cache.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index bba76cee7..1bc0ac486 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -96,6 +96,11 @@ type cachingFullNode struct { // This is used to prefetch and cache all sessions. // Necessary to avoid individual requests getting stuck waiting for a session fetch. ownedApps map[protocol.ServiceID][]string + + // Tracks whether the endpoint is healthy. + // Set to true once at least 1 iteration of fetching sessions succeeds. + isHealthy bool + isHealthyMu sync.RWMutex } // NewCachingFullNode wraps a LazyFullNode with simple map-based caches @@ -270,6 +275,11 @@ func (cfn *cachingFullNode) updateSessionCache() { cfn.sessionsCache.sessions = allSessions cfn.sessionsCache.mu.Unlock() updatedOnce = true + + // Mark the caching full node as healthy + cfn.isHealthyMu.Lock() + cfn.isHealthy = true + cfn.isHealthyMu.Unlock() } time.Sleep(1 * time.Second) @@ -475,6 +485,15 @@ func (cfn *cachingFullNode) GetAccountClient() *sdk.AccountClient { // IsHealthy: passthrough to underlying node. func (cfn *cachingFullNode) IsHealthy() bool { + // Check if the caching full node has been marked healthy. + // i.e. if at least one iteration of fetching sessions has succeeded. + cfn.isHealthyMu.RLock() + if !cfn.isHealthy { + return false + } + cfn.isHealthyMu.RUnlock() + + // Delegate to the lazy full node's health status. return cfn.lazyFullNode.IsHealthy() } From f7dc9b72a2e022fcb3d95c9a92c5833e31a1dba8 Mon Sep 17 00:00:00 2001 From: Arash Deshmeh Date: Tue, 23 Sep 2025 20:31:56 -0400 Subject: [PATCH 15/15] Set health status on caching full node --- protocol/shannon/fullnode_cache.go | 47 +++++++++++------------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index 1bc0ac486..1dfc87ce4 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -264,44 +264,30 @@ func (cfn *cachingFullNode) updateSharedParamsCache() { // updateSessionCache periodically updates the session cache for active sessions func (cfn *cachingFullNode) updateSessionCache() { - ticker := time.NewTicker(cfn.cacheConfig.SessionTTL) - defer ticker.Stop() - var updatedOnce bool for { - if !updatedOnce { - if allSessions, err := cfn.fetchAllSessions(); err == nil { - cfn.sessionsCache.mu.Lock() - cfn.sessionsCache.sessions = allSessions - cfn.sessionsCache.mu.Unlock() - updatedOnce = true - - // Mark the caching full node as healthy - cfn.isHealthyMu.Lock() - cfn.isHealthy = true - cfn.isHealthyMu.Unlock() - } + // Fetch all sessions for caching. + updatedSessions, err := cfn.fetchAllSessions() + if err != nil { + cfn.logger.Error().Err(err).Msg("Failed to get updated sessions. Skipping session cache update") + // Add a short delay before retrying. time.Sleep(1 * time.Second) continue } - select { - case <-cfn.ctx.Done(): - return - case <-ticker.C: + // Update existing sessions in cache + cfn.sessionsCache.mu.Lock() + cfn.sessionsCache.sessions = updatedSessions + cfn.sessionsCache.mu.Unlock() - updatedSessions, err := cfn.fetchAllSessions() - if err != nil { - cfn.logger.Error().Err(err).Msg("Failed to get updated sessions. Skipping session cache update") - continue - } + // Mark the caching full node as healthy + cfn.isHealthyMu.Lock() + cfn.isHealthy = true + cfn.isHealthyMu.Unlock() - // Update existing sessions in cache - cfn.sessionsCache.mu.Lock() - cfn.sessionsCache.sessions = updatedSessions - cfn.sessionsCache.mu.Unlock() - } + // Sleep until the cache expiry. + time.Sleep(cfn.cacheConfig.SessionTTL) } } @@ -488,10 +474,11 @@ func (cfn *cachingFullNode) IsHealthy() bool { // Check if the caching full node has been marked healthy. // i.e. if at least one iteration of fetching sessions has succeeded. cfn.isHealthyMu.RLock() + defer cfn.isHealthyMu.RUnlock() + if !cfn.isHealthy { return false } - cfn.isHealthyMu.RUnlock() // Delegate to the lazy full node's health status. return cfn.lazyFullNode.IsHealthy()