diff --git a/cmd/shannon.go b/cmd/shannon.go index 3b31a3766..bfc553845 100644 --- a/cmd/shannon.go +++ b/cmd/shannon.go @@ -15,9 +15,18 @@ import ( func getShannonFullNode(logger polylog.Logger, config *shannonconfig.ShannonGatewayConfig) (shannon.FullNode, error) { fullNodeConfig := config.FullNodeConfig + // TODO_TECHDEBT(@adshmh): Refactor to find a better fit for load testing config handling. + // + // Load Testing against a RelayMiner: + // Restrict the allowed supplier + var allowedSupplierAddr string + if config.GatewayConfig.LoadTestingConfig != nil { + allowedSupplierAddr = config.GatewayConfig.LoadTestingConfig.GetAllowedSupplierAddr() + } + // TODO_MVP(@adshmh): rename the variables here once a more accurate name is selected for `LazyFullNode` // LazyFullNode skips all caching and queries the onchain data for serving each relay request. - lazyFullNode, err := shannon.NewLazyFullNode(logger, fullNodeConfig) + lazyFullNode, err := shannon.NewLazyFullNode(logger, fullNodeConfig, allowedSupplierAddr) if err != nil { return nil, fmt.Errorf("failed to create Shannon lazy full node: %w", err) } @@ -27,7 +36,8 @@ func getShannonFullNode(logger polylog.Logger, config *shannonconfig.ShannonGate return lazyFullNode, nil } - fullNode, err := shannon.NewCachingFullNode(logger, lazyFullNode, fullNodeConfig.CacheConfig) + // TODO_TECHDEBT(@adshmh): Refactor to clarify the fullnode's config requirements (including owned apps). + fullNode, err := shannon.NewCachingFullNode(logger, lazyFullNode, fullNodeConfig.CacheConfig, config.GatewayConfig) if err != nil { return nil, fmt.Errorf("failed to create a Shannon caching full node instance: %w", err) } diff --git a/protocol/shannon/config.go b/protocol/shannon/config.go index 138ee7906..4e2fd419d 100644 --- a/protocol/shannon/config.go +++ b/protocol/shannon/config.go @@ -339,6 +339,18 @@ func (fnc *FullNodeConfig) HydrateDefaults() { } } +func (ltc *LoadTestingConfig) GetAllowedSupplierAddr() string { + relayMinerConfig := ltc.RelayMinerConfig + + // RelayMiner config not specified: + // No restrictions on supplier address. + if relayMinerConfig == nil { + return "" + } + + return relayMinerConfig.SupplierAddr +} + func (ltc *LoadTestingConfig) Validate() error { // Error: neither backend server nor RelayMiner config are specified. if ltc.BackendServiceURL == nil && ltc.RelayMinerConfig == nil { diff --git a/protocol/shannon/endpoint.go b/protocol/shannon/endpoint.go index 3f1a92784..6a3f087da 100644 --- a/protocol/shannon/endpoint.go +++ b/protocol/shannon/endpoint.go @@ -130,7 +130,7 @@ type protocolEndpoint struct { // the first app will be chosen. A randomization among the apps in this (unlikely) scenario // may be needed. // session is the active session corresponding to the app, of which the endpoint is a member. - session sessiontypes.Session + session *sessiontypes.Session } // IsFallback returns false for protocol endpoints. @@ -165,7 +165,7 @@ func (e protocolEndpoint) WebsocketURL() (string, error) { // Session returns a pointer to the session associated with the endpoint. func (e protocolEndpoint) Session() *sessiontypes.Session { - return &e.session + return e.session } // Supplier returns the supplier address of the endpoint. @@ -177,7 +177,7 @@ func (e protocolEndpoint) Supplier() string { // It returns a map for efficient lookup, as the main/only consumer of this function uses // the return value for selecting an endpoint for sending a relay. func endpointsFromSession( - session sessiontypes.Session, + session *sessiontypes.Session, // TODO_TECHDEBT(@adshmh): Refactor load testing logic to make it more visible. // // The only supplier allowed from the session. @@ -185,7 +185,7 @@ func endpointsFromSession( allowedSupplierAddr string, ) (map[protocol.EndpointAddr]endpoint, error) { sf := sdk.SessionFilter{ - Session: &session, + Session: session, } // AllEndpoints will return a map of supplier address to a list of supplier endpoints. diff --git a/protocol/shannon/full_node.go b/protocol/shannon/full_node.go index e89f9ac50..913e2a6a2 100644 --- a/protocol/shannon/full_node.go +++ b/protocol/shannon/full_node.go @@ -5,7 +5,6 @@ import ( apptypes "github.com/pokt-network/poktroll/x/application/types" servicetypes "github.com/pokt-network/poktroll/x/service/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" sdk "github.com/pokt-network/shannon-sdk" @@ -27,7 +26,7 @@ type FullNode interface { // GetSession returns the latest session matching the supplied service+app combination. // Sessions are solely used for sending relays, and therefore only the latest session for any service+app combination is needed. // Note: Shannon returns the latest session for a service+app combination if no blockHeight is provided. - GetSession(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error) + GetSession(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (hydratedSession, error) // GetSessionWithExtendedValidity implements session retrieval with support for // Pocket Network's native "session grace period" business logic. @@ -52,7 +51,7 @@ type FullNode interface { // - https://dev.poktroll.com/protocol/governance/gov_params // - https://dev.poktroll.com/protocol/primitives/claim_and_proof_lifecycle // If within grace period of a session rollover, it may return the previous session. - GetSessionWithExtendedValidity(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error) + GetSessionWithExtendedValidity(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (hydratedSession, error) // GetSharedParams returns the shared module parameters from the blockchain. GetSharedParams(ctx context.Context) (*sharedtypes.Params, error) diff --git a/protocol/shannon/fullnode_cache.go b/protocol/shannon/fullnode_cache.go index 0a8262930..3ca1d10ba 100644 --- a/protocol/shannon/fullnode_cache.go +++ b/protocol/shannon/fullnode_cache.go @@ -3,6 +3,7 @@ package shannon import ( "context" "fmt" + "sync" "time" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" @@ -19,54 +20,14 @@ import ( // ---------------- Cache Configuration ---------------- const ( - // Retry base delay for exponential backoff on failed refreshes - retryBaseDelay = 100 * time.Millisecond - - // cacheCapacity: - // - Max entries across all shards (not per-shard) - // - Exceeding capacity triggers LRU eviction per shard - // - 100k supports most large deployments - // - TODO_TECHDEBT(@commoddity): Revisit based on real-world usage; consider making configurable - cacheCapacity = 100_000 - - // numShards: - // - Number of independent cache shards for concurrency - // - Reduces lock contention, improves parallelism - // - 10 is a good balance for most workloads - numShards = 10 - - // evictionPercentage: - // - % of LRU entries evicted per shard when full - // - 10% = incremental cleanup, avoids memory spikes - // - SturdyC also evicts expired entries in background - evictionPercentage = 10 - - // TODO_TECHDEBT(@commoddity): See Issue #291 for improvements to refresh logic - // minEarlyRefreshPercentage: - // - Earliest point (as % of TTL) to start background refresh - // - 0.75 = 75% of TTL (e.g. 22.5s for 30s TTL) - minEarlyRefreshPercentage = 0.75 - - // maxEarlyRefreshPercentage: - // - Latest point (as % of TTL) to start background refresh - // - 0.9 = 90% of TTL (e.g. 27s for 30s TTL) - // - Ensures refresh always completes before expiry - maxEarlyRefreshPercentage = 0.9 - - // Cache key prefixes to avoid collisions between different data types. - sessionCacheKeyPrefix = "session" - // TODO_IMPROVE: Make this configurable - sharedParamsCacheKey = "shared_params" - // TODO_IMPROVE: Make this configurable - sharedParamsCacheTTL = 2 * time.Minute // Shared params change infrequently - sharedParamsCacheCapacity = 3 // Only need to cache the last couple of shared params at any point in time + sharedParamsCacheTTL = 2 * time.Minute // Shared params change infrequently // TODO_IMPROVE: Make this configurable - blockHeightCacheKey = "block_height" - // TODO_IMPROVE: Make this configurable - blockHeightCacheTTL = 15 * time.Second // Block height changes frequently - blockHeightCacheCapacity = 5 // Only need to cache the last few blocks at any point in time + blockHeightCacheTTL = 15 * time.Second // Block height changes frequently + + // Cache key prefixes to avoid collisions between different data types. + sessionCacheKeyPrefix = "session" // TODO_IMPROVE: Make this configurable // - Grace period scale down factor forces the gateway to respect a smaller @@ -76,57 +37,79 @@ const ( gracePeriodScaleDownFactor = 0.8 ) -// getCacheDelays returns the min/max delays for SturdyC's Early Refresh strategy. -// - Proactively refreshes cache before expiry (prevents misses/latency spikes) -// - Refresh window: 75-90% of TTL (e.g. 22.5-27s for 30s TTL) -// - Spreads requests to avoid thundering herd -// See: https://github.com/viccon/sturdyc?tab=readme-ov-file#early-refreshes -func getCacheDelays(ttl time.Duration) (min, max time.Duration) { - minFloat := float64(ttl) * minEarlyRefreshPercentage - maxFloat := float64(ttl) * maxEarlyRefreshPercentage - - // Round to the nearest second - min = time.Duration(minFloat/float64(time.Second)+0.5) * time.Second - max = time.Duration(maxFloat/float64(time.Second)+0.5) * time.Second - return +// hydratedSession contains a session along with pre-computed endpoints +// to avoid repeatedly calling endpointsFromSession for the same session +type hydratedSession struct { + session *sessiontypes.Session + endpoints map[protocol.EndpointAddr]endpoint +} + +// blockHeightCache represents a simple cache for block heights +type blockHeightCache struct { + mu sync.RWMutex + height int64 + lastUpdated time.Time +} + +// sessionCache represents a simple cache for hydrated sessions +type sessionCache struct { + mu sync.RWMutex + sessions map[string]sessionCacheEntry +} + +type sessionCacheEntry struct { + hydratedSession hydratedSession + lastUpdated time.Time +} + +// sharedParamsCache represents a simple cache for shared parameters +type sharedParamsCache struct { + mu sync.RWMutex + params *sharedtypes.Params + lastUpdated time.Time } var _ FullNode = &cachingFullNode{} -// cachingFullNode wraps a LazyFullNode with SturdyC-based caching. -// - Early refresh: background updates before expiry (prevents thundering herd/latency spikes) -// - Example: 30s TTL, refresh at 22.5–27s (75–90%) -// - Benefits: zero-latency reads, graceful degradation, auto load balancing -// Docs: https://github.com/viccon/sturdyc +// cachingFullNode wraps a LazyFullNode with simple map-based caching. +// Background goroutines periodically refresh cached data to ensure freshness. type cachingFullNode struct { logger polylog.Logger // Underlying node for protocol data fetches lazyFullNode *LazyFullNode - // Session cache - // TODO_MAINNET_MIGRATION(@Olshansk): Revisit after mainnet - sessionCache *sturdyc.Client[sessiontypes.Session] + // Simple caches with RWMutex for thread safety + blockCache *blockHeightCache + sessionsCache *sessionCache + sharedCache *sharedParamsCache + cacheConfig CacheConfig - // Shared params cache - sharedParamsCache *sturdyc.Client[*sharedtypes.Params] + // Account client wrapped with cache (keeping original implementation) + cachingAccountClient *sdk.AccountClient - // Block height cache - blockHeightCache *sturdyc.Client[int64] + // Context and cancel function for background goroutines + ctx context.Context + cancel context.CancelFunc - // Account client wrapped with SturdyC cache - cachingAccountClient *sdk.AccountClient + // ownedApps is the list of apps owned by the gateway operator + // This is used to prefetch and cache all sessions. + // Necessary to avoid individual requests getting stuck waiting for a session fetch. + ownedApps map[protocol.ServiceID][]string + + // Tracks whether the endpoint is healthy. + // Set to true once at least 1 iteration of fetching sessions succeeds. + isHealthy bool + isHealthyMu sync.RWMutex } -// NewCachingFullNode wraps a LazyFullNode with: -// - Session cache: caches sessions, refreshes early -// - Account cache: indefinite cache for account data -// -// Both use early refresh to avoid thundering herd/latency spikes. +// NewCachingFullNode wraps a LazyFullNode with simple map-based caches +// and starts background goroutines for periodic cache updates. func NewCachingFullNode( logger polylog.Logger, lazyFullNode *LazyFullNode, cacheConfig CacheConfig, + gatewayConfig GatewayConfig, ) (*cachingFullNode, error) { // Set default session TTL if not set cacheConfig.hydrateDefaults() @@ -136,74 +119,201 @@ func NewCachingFullNode( Str("cache_config_session_ttl", cacheConfig.SessionTTL.String()). Msgf("cachingFullNode - Cache Configuration") - // Create the session cache with early refreshes - sessionMinRefreshDelay, sessionMaxRefreshDelay := getCacheDelays(cacheConfig.SessionTTL) - sessionCache := sturdyc.New[sessiontypes.Session]( - cacheCapacity, - numShards, - cacheConfig.SessionTTL, - evictionPercentage, - // See: https://github.com/viccon/sturdyc?tab=readme-ov-file#early-refreshes - sturdyc.WithEarlyRefreshes( - sessionMinRefreshDelay, - sessionMaxRefreshDelay, - cacheConfig.SessionTTL, - retryBaseDelay, - ), - ) + // TODO_TECHDEBT(@adshmh): refactor to remove duplicate owned apps processing at startup. + // + // Retrieve the list of apps owned by the gateway. + ownedApps, err := getOwnedApps(logger, gatewayConfig.OwnedAppsPrivateKeysHex, lazyFullNode) + if err != nil { + return nil, fmt.Errorf("failed to get app addresses from config: %w", err) + } - // Account cache: infinite for app lifetime; no early refresh needed. + // Account cache: keeping original sturdyc implementation for account data accountCache := sturdyc.New[*accounttypes.QueryAccountResponse]( accountCacheCapacity, - numShards, + 10, // numShards accountCacheTTL, - evictionPercentage, + 10, // evictionPercentage ) - // Shared params cache - sharedParamsMinRefreshDelay, sharedParamsMaxRefreshDelay := getCacheDelays(sharedParamsCacheTTL) - sharedParamsCache := sturdyc.New[*sharedtypes.Params]( - sharedParamsCacheCapacity, - 1, - sharedParamsCacheTTL, - evictionPercentage, - sturdyc.WithEarlyRefreshes( - sharedParamsMinRefreshDelay, - sharedParamsMaxRefreshDelay, - sharedParamsCacheTTL, - retryBaseDelay, - ), - ) - - // Block height cache - blockHeightMinRefreshDelay, blockHeightMaxRefreshDelay := getCacheDelays(blockHeightCacheTTL) - blockHeightCache := sturdyc.New[int64]( - blockHeightCacheCapacity, - 1, - blockHeightCacheTTL, - evictionPercentage, - sturdyc.WithEarlyRefreshes( - blockHeightMinRefreshDelay, - blockHeightMaxRefreshDelay, - blockHeightCacheTTL, - retryBaseDelay, - ), - ) - - // Initialize the caching full node with the modified lazy full node - return &cachingFullNode{ - logger: logger, - lazyFullNode: lazyFullNode, - sessionCache: sessionCache, - sharedParamsCache: sharedParamsCache, - blockHeightCache: blockHeightCache, + // Create context for background goroutines + ctx, cancel := context.WithCancel(context.Background()) + + // Initialize the caching full node + cfn := &cachingFullNode{ + logger: logger, + lazyFullNode: lazyFullNode, + cacheConfig: cacheConfig, + ctx: ctx, + cancel: cancel, + blockCache: &blockHeightCache{ + height: 0, + }, + sessionsCache: &sessionCache{ + sessions: make(map[string]sessionCacheEntry), + }, + sharedCache: &sharedParamsCache{ + params: nil, + }, // Wrap the underlying account fetcher with a SturdyC caching layer. cachingAccountClient: getCachingAccountClient( logger, accountCache, lazyFullNode.accountClient, ), - }, nil + + ownedApps: ownedApps, + } + + // Start background cache update goroutines + cfn.startCacheUpdateRoutines() + + return cfn, nil +} + +// startCacheUpdateRoutines starts background goroutines to periodically update caches +func (cfn *cachingFullNode) startCacheUpdateRoutines() { + // Start block height cache update routine + go cfn.updateBlockHeightCache() + + // Start shared params cache update routine + go cfn.updateSharedParamsCache() + + // Start session cache update routine + go cfn.updateSessionCache() +} + +// updateBlockHeightCache periodically updates the block height cache +func (cfn *cachingFullNode) updateBlockHeightCache() { + ticker := time.NewTicker(blockHeightCacheTTL) + defer ticker.Stop() + + var updatedOnce bool + for { + if !updatedOnce { + if err := cfn.fetchAndUpdateBlockHeightCache(); err == nil { + updatedOnce = true + } + time.Sleep(1 * time.Second) + continue + } + + select { + case <-cfn.ctx.Done(): + return + case <-ticker.C: + if err := cfn.fetchAndUpdateBlockHeightCache(); err != nil { + cfn.logger.Error().Err(err).Msg("Failed to update block height cache") + } + } + } +} + +func (cfn *cachingFullNode) fetchAndUpdateBlockHeightCache() error { + height, err := cfn.lazyFullNode.GetCurrentBlockHeight(cfn.ctx) + if err != nil { + return err + } + + cfn.blockCache.mu.Lock() + cfn.blockCache.height = height + cfn.blockCache.lastUpdated = time.Now() + cfn.blockCache.mu.Unlock() + + cfn.logger.Debug().Int64("height", height).Msg("Updated block height cache") + return nil +} + +// updateSharedParamsCache periodically updates the shared params cache +func (cfn *cachingFullNode) updateSharedParamsCache() { + ticker := time.NewTicker(sharedParamsCacheTTL) + defer ticker.Stop() + + for { + select { + case <-cfn.ctx.Done(): + return + case <-ticker.C: + params, err := cfn.lazyFullNode.GetSharedParams(cfn.ctx) + if err != nil { + cfn.logger.Error().Err(err).Msg("Failed to update shared params cache") + continue + } + + cfn.sharedCache.mu.Lock() + cfn.sharedCache.params = params + cfn.sharedCache.lastUpdated = time.Now() + cfn.sharedCache.mu.Unlock() + + cfn.logger.Debug().Msg("Updated shared params cache") + } + } +} + +// updateSessionCache periodically updates the session cache for active sessions +func (cfn *cachingFullNode) updateSessionCache() { + + for { + // Fetch all sessions for caching. + updatedSessions, err := cfn.fetchAllSessions() + if err != nil { + cfn.logger.Error().Err(err).Msg("Failed to get updated sessions. Skipping session cache update") + + // Add a short delay before retrying. + time.Sleep(1 * time.Second) + continue + } + + // Update existing sessions in cache + cfn.sessionsCache.mu.Lock() + cfn.sessionsCache.sessions = updatedSessions + cfn.sessionsCache.mu.Unlock() + + // Mark the caching full node as healthy + cfn.isHealthyMu.Lock() + cfn.isHealthy = true + cfn.isHealthyMu.Unlock() + + // Sleep until the cache expiry. + time.Sleep(cfn.cacheConfig.SessionTTL) + } +} + +// TODO_UPNEXT(@adshmh): Support height-based session retrieval. +func (cfn *cachingFullNode) fetchAllSessions() (map[string]sessionCacheEntry, error) { + // Initialize updated sessions + updatedSessions := make(map[string]sessionCacheEntry) + + // TODO_UPNEXT(@adshmh): Fetch sessions concurrently. + // + // Iterate over owned apps + for serviceID, appsAddrs := range cfn.ownedApps { + for _, appAddr := range appsAddrs { + // Fetch updated session + // TODO_TECHDEBT(@adshmh): Set a deadline for fetching a session. + session, err := cfn.lazyFullNode.GetSession(context.TODO(), serviceID, appAddr) + if err != nil { + cfn.logger.Error(). + Str("service_id", string(serviceID)). + Str("app_addr", appAddr). + Err(err). + Msg("Failed to fetch session") + continue + } + + // Update the session with new cache key based on current height + newKey := getSessionCacheKey(serviceID, appAddr) + updatedSessions[newKey] = sessionCacheEntry{ + hydratedSession: session, + lastUpdated: time.Now(), + } + } + } + + if len(updatedSessions) == 0 { + return nil, fmt.Errorf("failed to get any sessions") + } + + return updatedSessions, nil } // GetApp is only used at startup; relaying fetches sessions for app/session sync. @@ -211,62 +321,35 @@ func (cfn *cachingFullNode) GetApp(ctx context.Context, appAddr string) (*apptyp return cfn.lazyFullNode.GetApp(ctx, appAddr) } -// GetSession returns (and auto-refreshes) the session for a service/app from cache. +// GetSession returns the session for a service/app from cache, fetching if not present. func (cfn *cachingFullNode) GetSession( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { - logger := cfn.logger.With( - "service_id", string(serviceID), - "app_addr", appAddr, - "method", "GetSession", - ) +) (hydratedSession, error) { + sessionKey := getSessionCacheKey(serviceID, appAddr) - height, err := cfn.GetCurrentBlockHeight(ctx) - if err != nil { - logger.Error().Err(err).Msgf( - "[cachingFullNode.GetSession] Failed to get current block height", - ) - return sessiontypes.Session{}, err + // Try to get from cache first + cfn.sessionsCache.mu.RLock() + entry, exists := cfn.sessionsCache.sessions[sessionKey] + cfn.sessionsCache.mu.RUnlock() + + if exists { + return entry.hydratedSession, nil } - sessionKey := getSessionCacheKey(serviceID, appAddr, height) - - // See: https://github.com/viccon/sturdyc?tab=readme-ov-file#get-or-fetch - session, err := cfn.sessionCache.GetOrFetch( - ctx, - sessionKey, - func(fetchCtx context.Context) (sessiontypes.Session, error) { - logger.Debug().Str("session_key", sessionKey).Msgf("Fetching session from full node") - return cfn.lazyFullNode.GetSession(ctx, serviceID, appAddr) - }, - ) - return session, err + return hydratedSession{}, fmt.Errorf("session not found") } +// TODO_UPNEXT(@adshmh): Refactor to handle height-based session retrieval from the cache. +// // GetSessionWithExtendedValidity implements session retrieval with support for // Pocket Network's "session grace period" business logic. -// -// It is used to account for the case when: -// - RelayMiner.FullNode.Height > Gateway.FullNode.Height -// AND -// - RelayMiner.FullNode.Session > Gateway.FullNode.Session -// -// In the context of PATH, it is used to account for the case when: -// - Gateway.FullNode.Height > RelayMiner.FullNode.Height -// AND -// - Gateway.FullNode.Session > RelayMiner.FullNode.Session -// -// Protocol References: -// - https://github.com/pokt-network/poktroll/blob/main/proto/pocket/shared/params.proto -// - https://dev.poktroll.com/protocol/governance/gov_params -// - https://dev.poktroll.com/protocol/primitives/claim_and_proof_lifecycle func (cfn *cachingFullNode) GetSessionWithExtendedValidity( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger := cfn.logger.With( "service_id", string(serviceID), "app_addr", appAddr, @@ -274,24 +357,27 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( ) // Get the current session from cache - currentSession, err := cfn.GetSession(ctx, serviceID, appAddr) + cachedSession, err := cfn.GetSession(ctx, serviceID, appAddr) if err != nil { logger.Error().Err(err).Msg("Failed to get current session") - return sessiontypes.Session{}, fmt.Errorf("error getting current session: %w", err) + return hydratedSession{}, fmt.Errorf("error getting current session: %w", err) } + // Extract the underlying session. + currentSession := cachedSession.session + // Get shared parameters to determine grace period sharedParams, err := cfn.GetSharedParams(ctx) if err != nil { logger.Warn().Err(err).Msg("Failed to get shared params, falling back to current session") - return currentSession, nil + return cachedSession, nil } // Get current block height currentHeight, err := cfn.GetCurrentBlockHeight(ctx) if err != nil { logger.Warn().Err(err).Msg("Failed to get current block height, falling back to current session") - return currentSession, nil + return cachedSession, nil } // Calculate when the previous session's grace period would end @@ -309,7 +395,7 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( // If we're not within the grace period of the previous session, return the current session if currentHeight > prevSessionEndHeightWithExtendedValidity { logger.Debug().Msg("IS NOT WITHIN GRACE PERIOD: Returning current session") - return currentSession, nil + return cachedSession, nil } // Scale down the grace period to aggressively start using the new session @@ -318,38 +404,41 @@ func (cfn *cachingFullNode) GetSessionWithExtendedValidity( logger.Debug(). Int64("prev_session_end_height_with_extended_validity_scaled", prevSessionEndHeightWithExtendedValidityScaled). Msg("IS WITHIN GRACE PERIOD BUT: Returning current session to aggressively start using the new session") - return currentSession, nil + return cachedSession, nil } logger.Debug().Msg("IS WITHIN GRACE PERIOD: Going to fetch previous session") - // Use cache for previous session lookup with a specific key - prevSessionKey := getSessionCacheKey(serviceID, appAddr, prevSessionEndHeight) - prevSession, err := cfn.sessionCache.GetOrFetch( - ctx, - prevSessionKey, - func(fetchCtx context.Context) (sessiontypes.Session, error) { - cfn.logger.Debug().Msg("Fetching previous session from full node") - session, fetchErr := cfn.lazyFullNode.GetSessionWithExtendedValidity(fetchCtx, serviceID, appAddr) - if fetchErr != nil { - cfn.logger.Error().Err(fetchErr).Msg("Failed to fetch previous session from full node") - } - return session, fetchErr - }, - ) + // TODO_UPNEXT(@adshmh): Support height-based session retrieval. + // + // Try to get previous session from cache + prevSessionKey := getSessionCacheKey(serviceID, appAddr) + + cfn.sessionsCache.mu.RLock() + entry, exists := cfn.sessionsCache.sessions[prevSessionKey] + cfn.sessionsCache.mu.RUnlock() + + if exists { + logger.Debug().Str("prev_session_key", prevSessionKey).Msg("Previous session found in cache") + return entry.hydratedSession, nil + } - return prevSession, err + // Not in cache, fetch from underlying node + logger.Debug().Str("prev_session_key", prevSessionKey).Msg("Previous session not in cache, fetching from full node") + prevSession, err := cfn.lazyFullNode.GetSessionWithExtendedValidity(ctx, serviceID, appAddr) + if err != nil { + logger.Error().Err(err).Msg("Failed to fetch previous session from full node") + return hydratedSession{}, err + } + return prevSession, nil } -// getSessionCacheKey builds a unique cache key for session: ::: -func getSessionCacheKey(serviceID protocol.ServiceID, appAddr string, height int64) string { - return fmt.Sprintf("%s:%s:%s:%d", sessionCacheKeyPrefix, serviceID, appAddr, height) +// getSessionCacheKey builds a unique cache key for session: :: +func getSessionCacheKey(serviceID protocol.ServiceID, appAddr string) string { + return fmt.Sprintf("%s:%s:%s", sessionCacheKeyPrefix, serviceID, appAddr) } -// ValidateRelayResponse: -// - Validates the raw response bytes received from an endpoint. -// - Uses the SDK and the caching full node's account client for validation. -// - Will use the caching account client to fetch the account pub key. +// ValidateRelayResponse uses the SDK and the caching full node's account client for validation. func (cfn *cachingFullNode) ValidateRelayResponse( supplierAddr sdk.SupplierAddress, responseBz []byte, @@ -362,58 +451,63 @@ func (cfn *cachingFullNode) ValidateRelayResponse( ) } -// GetAccountClient: passthrough to underlying node (returns caching client). +// GetAccountClient returns the caching account client. func (cfn *cachingFullNode) GetAccountClient() *sdk.AccountClient { return cfn.cachingAccountClient } // IsHealthy: passthrough to underlying node. -// TODO_IMPROVE(@commoddity): -// - Add smarter health checks (e.g. verify cached apps/sessions) -// - Currently always true (cache fills as needed) func (cfn *cachingFullNode) IsHealthy() bool { + // Check if the caching full node has been marked healthy. + // i.e. if at least one iteration of fetching sessions has succeeded. + cfn.isHealthyMu.RLock() + defer cfn.isHealthyMu.RUnlock() + + if !cfn.isHealthy { + return false + } + + // Delegate to the lazy full node's health status. return cfn.lazyFullNode.IsHealthy() } -// GetSharedParams: cached shared params with early refresh for governance changes. +// GetSharedParams returns cached shared params. func (cfn *cachingFullNode) GetSharedParams(ctx context.Context) (*sharedtypes.Params, error) { - params, err := cfn.sharedParamsCache.GetOrFetch( - ctx, - sharedParamsCacheKey, - func(fetchCtx context.Context) (*sharedtypes.Params, error) { - cfn.logger.Debug().Msg("Fetching shared params from full node") - params, fetchErr := cfn.lazyFullNode.GetSharedParams(fetchCtx) - if fetchErr != nil { - cfn.logger.Error().Err(fetchErr).Msg("Failed to fetch shared params from full node") - } - return params, fetchErr - }, - ) + cfn.sharedCache.mu.RLock() + params := cfn.sharedCache.params + cfn.sharedCache.mu.RUnlock() + + if params == nil { + // Cache not initialized yet, fetch directly + cfn.logger.Debug().Msg("Shared params cache not initialized, fetching from full node") + return nil, fmt.Errorf("shared params not cached yet") + } - return params, err + return params, nil } -// GetCurrentBlockHeight: cached block height with a sho TTL and early refresh. +// TODO_TECHDEBT(@adshmh): Add timeout on fetching current block height. +// GetCurrentBlockHeight returns cached block height. func (cfn *cachingFullNode) GetCurrentBlockHeight(ctx context.Context) (int64, error) { - height, err := cfn.blockHeightCache.GetOrFetch( - ctx, - blockHeightCacheKey, - func(fetchCtx context.Context) (int64, error) { - cfn.logger.Debug().Msg("Fetching current block height from full node") - height, fetchErr := cfn.lazyFullNode.GetCurrentBlockHeight(fetchCtx) - if fetchErr != nil { - cfn.logger.Error().Err(fetchErr).Msg("Failed to fetch current block height from full node") - } - return height, fetchErr - }, - ) + cfn.blockCache.mu.RLock() + defer cfn.blockCache.mu.RUnlock() + + height := cfn.blockCache.height + if height == 0 { + return 0, fmt.Errorf("height not fetched yet") + } - return height, err + return height, nil } // IsInSessionRollover: passthrough to underlying lazy full node. -// The lazy full node manages session rollover monitoring in the background, -// so we simply delegate to its rollover state. func (cfn *cachingFullNode) IsInSessionRollover() bool { return cfn.lazyFullNode.IsInSessionRollover() } + +// Stop gracefully shuts down the caching full node and stops background goroutines. +func (cfn *cachingFullNode) Stop() { + if cfn.cancel != nil { + cfn.cancel() + } +} diff --git a/protocol/shannon/fullnode_lazy.go b/protocol/shannon/fullnode_lazy.go index a9876e336..573271efc 100644 --- a/protocol/shannon/fullnode_lazy.go +++ b/protocol/shannon/fullnode_lazy.go @@ -46,10 +46,22 @@ type LazyFullNode struct { // Session rollover monitoring state rolloverState *sessionRolloverState + + // TODO_TECHDEBT(@adshmh): Make the load testing supplier filtering logic more visible. + // + // If specified, will only return endpoints matching the supplier address. + // Used for load testing against a single RelayMiner. + allowedSupplierAddr string } +// TODO_TECHDEBT(@adshmh): Refactor to find a better fit for the load testing configuration. +// // NewLazyFullNode builds and returns a LazyFullNode using the provided configuration. -func NewLazyFullNode(logger polylog.Logger, config FullNodeConfig) (*LazyFullNode, error) { +func NewLazyFullNode( + logger polylog.Logger, + config FullNodeConfig, + allowedSupplierAddr string, +) (*LazyFullNode, error) { logger = logger.With("component", "fullnode_lazy") blockClient, err := newBlockClient(config.RpcURL) @@ -103,7 +115,7 @@ func (lfn *LazyFullNode) GetSession( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { session, err := lfn.sessionClient.GetSession( ctx, appAddr, @@ -112,23 +124,34 @@ func (lfn *LazyFullNode) GetSession( ) if err != nil { - return sessiontypes.Session{}, + return hydratedSession{}, fmt.Errorf("GetSession: error getting the session for service %s app %s: %w", serviceID, appAddr, err, ) } if session == nil { - return sessiontypes.Session{}, + return hydratedSession{}, fmt.Errorf("GetSession: got nil session for service %s app %s: %w", serviceID, appAddr, err, ) } // Update session rollover boundaries for rollover monitoring - lfn.rolloverState.updateSessionRolloverBoundaries(*session) - - return *session, nil + lfn.rolloverState.updateSessionRolloverBoundaries(session) + + // TODO_TECHDEBT(@adshmh): Refactor load testing related code to make the filtering more visible. + // + // TODO_UPNEXT(@adshmh): Log and handle potential errors. + // + // In Load Testing using RelayMiner mode: drop any endpoints ot matching the single supplier specified in the config. + // + endpoints, _ := endpointsFromSession(session, lfn.allowedSupplierAddr) + + return hydratedSession{ + session: session, + endpoints: endpoints, + }, nil } // ValidateRelayResponse: @@ -187,7 +210,7 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( ctx context.Context, serviceID protocol.ServiceID, appAddr string, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger := lfn.logger.With( "service_id", serviceID, "app_addr", appAddr, @@ -198,7 +221,7 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( currentSession, err := lfn.GetSession(ctx, serviceID, appAddr) if err != nil { logger.Error().Err(err).Msg("failed to get current session") - return sessiontypes.Session{}, fmt.Errorf("error getting current session: %w", err) + return hydratedSession{}, fmt.Errorf("error getting current session: %w", err) } // Get shared parameters to determine grace period @@ -216,15 +239,15 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( } // Calculate when the previous session's grace period would end - prevSessionEndHeight := currentSession.Header.SessionStartBlockHeight - 1 + prevSessionEndHeight := currentSession.session.Header.SessionStartBlockHeight - 1 prevSessionEndHeightWithExtendedValidity := prevSessionEndHeight + int64(sharedParams.GracePeriodEndOffsetBlocks) logger = logger.With( "prev_session_end_height", prevSessionEndHeight, "prev_session_end_height_with_extended_validity", prevSessionEndHeightWithExtendedValidity, "current_height", currentHeight, - "current_session_start_height", currentSession.Header.SessionStartBlockHeight, - "current_session_end_height", currentSession.Header.SessionEndBlockHeight, + "current_session_start_height", currentSession.session.Header.SessionStartBlockHeight, + "current_session_end_height", currentSession.session.Header.SessionEndBlockHeight, ) // If we're not within the grace period of the previous session, return the current session @@ -241,10 +264,10 @@ func (lfn *LazyFullNode) GetSessionWithExtendedValidity( } // Update session rollover boundaries for rollover monitoring - lfn.rolloverState.updateSessionRolloverBoundaries(currentSession) + lfn.rolloverState.updateSessionRolloverBoundaries(currentSession.session) // Return the previous session - return *prevSession, nil + return createHydratedSession(prevSession, lfn.allowedSupplierAddr) } // IsInSessionRollover returns true if we're currently in a session rollover period. @@ -360,3 +383,19 @@ func newSharedClient(config grpc.GRPCConfig) (*sdk.SharedClient, error) { return &sdk.SharedClient{QueryClient: sharedtypes.NewQueryClient(conn)}, nil } + +// createHydratedSession creates a hydratedSession from a session by computing its endpoints +func createHydratedSession( + session *sessiontypes.Session, + allowedSupplierAddr string, +) (hydratedSession, error) { + endpoints, err := endpointsFromSession(session, allowedSupplierAddr) + if err != nil { + return hydratedSession{}, fmt.Errorf("failed to create endpoints from session: %w", err) + } + + return hydratedSession{ + session: session, + endpoints: endpoints, + }, nil +} diff --git a/protocol/shannon/fullnode_session_rollover.go b/protocol/shannon/fullnode_session_rollover.go index 25988b9cf..33f022863 100644 --- a/protocol/shannon/fullnode_session_rollover.go +++ b/protocol/shannon/fullnode_session_rollover.go @@ -129,7 +129,7 @@ func (srs *sessionRolloverState) updateBlockHeight() { // updateSessionRolloverBoundaries updates rollover boundaries when we fetch a new session. // Called from GetSession() to keep rollover monitoring current. // Only updates rollover boundaries if the current rollover period has ended. -func (srs *sessionRolloverState) updateSessionRolloverBoundaries(session sessiontypes.Session) { +func (srs *sessionRolloverState) updateSessionRolloverBoundaries(session *sessiontypes.Session) { if session.Header == nil { srs.logger.Warn().Msg("Session header is nil, cannot update session values") return diff --git a/protocol/shannon/fullnode_session_rollover_test.go b/protocol/shannon/fullnode_session_rollover_test.go index 6813fd57b..95bfa8b6d 100644 --- a/protocol/shannon/fullnode_session_rollover_test.go +++ b/protocol/shannon/fullnode_session_rollover_test.go @@ -141,7 +141,7 @@ func Test_updateSessionRolloverBoundaries(t *testing.T) { srs.sessionRolloverEnd = tt.initialRolloverEnd srs.currentBlockHeight = tt.initialBlockHeight - srs.updateSessionRolloverBoundaries(tt.session) + srs.updateSessionRolloverBoundaries(&tt.session) if tt.shouldReturn && (srs.sessionRolloverStart != tt.expectedRolloverStart || srs.sessionRolloverEnd != tt.expectedRolloverEnd) { t.Errorf("updateSessionRolloverBoundaries() should have returned early, but state changed") diff --git a/protocol/shannon/gateway_mode.go b/protocol/shannon/gateway_mode.go index bec1743e0..05b6d9869 100644 --- a/protocol/shannon/gateway_mode.go +++ b/protocol/shannon/gateway_mode.go @@ -7,7 +7,6 @@ import ( "slices" apptypes "github.com/pokt-network/poktroll/x/application/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/protocol" ) @@ -36,7 +35,7 @@ func (p *Protocol) getActiveGatewaySessions( ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request, -) ([]sessiontypes.Session, error) { +) ([]hydratedSession, error) { p.logger.With( "service_id", serviceID, "gateway_mode", p.gatewayMode, diff --git a/protocol/shannon/mode_centralized.go b/protocol/shannon/mode_centralized.go index 20739a8f7..e322afd36 100644 --- a/protocol/shannon/mode_centralized.go +++ b/protocol/shannon/mode_centralized.go @@ -13,8 +13,6 @@ import ( "context" "fmt" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" - "github.com/buildwithgrove/path/protocol" ) @@ -22,7 +20,7 @@ import ( func (p *Protocol) getCentralizedGatewayModeActiveSessions( ctx context.Context, serviceID protocol.ServiceID, -) ([]sessiontypes.Session, error) { +) ([]hydratedSession, error) { logger := p.logger.With( "method", "getCentralizedGatewayModeActiveSessions", "service_id", string(serviceID), @@ -39,9 +37,9 @@ func (p *Protocol) getCentralizedGatewayModeActiveSessions( } // Loop over the address of apps owned by the gateway in Centralized gateway mode. - var ownedAppSessions []sessiontypes.Session + var ownedAppSessions []hydratedSession for _, ownedAppAddr := range ownedAppsForService { - session, err := p.getSession(ctx, logger, ownedAppAddr, serviceID) + session, err := p.getSession(ctx, p.logger, ownedAppAddr, serviceID) if err != nil { return nil, err } diff --git a/protocol/shannon/mode_delegated.go b/protocol/shannon/mode_delegated.go index 3910472eb..9c456f364 100644 --- a/protocol/shannon/mode_delegated.go +++ b/protocol/shannon/mode_delegated.go @@ -11,7 +11,6 @@ import ( "net/http" apptypes "github.com/pokt-network/poktroll/x/application/types" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/protocol" "github.com/buildwithgrove/path/request" @@ -28,7 +27,7 @@ func (p *Protocol) getDelegatedGatewayModeActiveSession( ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request, -) ([]sessiontypes.Session, error) { +) ([]hydratedSession, error) { logger := p.logger.With("method", "getDelegatedGatewayModeActiveSession") extractedAppAddr, err := getAppAddrFromHTTPReq(httpReq) @@ -45,7 +44,7 @@ func (p *Protocol) getDelegatedGatewayModeActiveSession( } // Skip the session's app if it is not staked for the requested service. - selectedApp := session.Application + selectedApp := session.session.Application if !appIsStakedForService(serviceID, selectedApp) { err = fmt.Errorf("%w: Trying to use app %s that is not staked for the service %s", errProtocolContextSetupAppNotStaked, selectedApp.Address, serviceID) logger.Error().Err(err).Msgf("SHOULD NEVER HAPPEN: %s", err.Error()) @@ -54,7 +53,7 @@ func (p *Protocol) getDelegatedGatewayModeActiveSession( logger.Debug().Msgf("successfully verified the gateway (%s) has delegation for the selected app (%s) for service (%s).", p.gatewayAddr, selectedApp.Address, serviceID) - return []sessiontypes.Session{session}, nil + return []hydratedSession{session}, nil } // appIsStakedForService returns true if the supplied application is staked for the supplied service ID. diff --git a/protocol/shannon/observation.go b/protocol/shannon/observation.go index e7d1b8106..f1c43858b 100644 --- a/protocol/shannon/observation.go +++ b/protocol/shannon/observation.go @@ -237,8 +237,8 @@ func buildEndpointFromObservation( // Used to identify an endpoint for applying sanctions. func buildSessionFromObservation( observation *protocolobservations.ShannonEndpointObservation, -) sessiontypes.Session { - return sessiontypes.Session{ +) *sessiontypes.Session { + return &sessiontypes.Session{ // Only Session Header is required for processing observations. Header: &sessiontypes.SessionHeader{ ApplicationAddress: observation.GetEndpointAppAddress(), diff --git a/protocol/shannon/observation_websocket.go b/protocol/shannon/observation_websocket.go index 379e6b9a2..34522a0e9 100644 --- a/protocol/shannon/observation_websocket.go +++ b/protocol/shannon/observation_websocket.go @@ -305,7 +305,7 @@ func buildEndpointFromWebSocketConnectionObservation( ) endpoint { session := buildSessionFromWebSocketConnectionObservation(observation) return &protocolEndpoint{ - session: session, + session: &session, supplier: observation.GetSupplier(), url: observation.GetEndpointUrl(), } diff --git a/protocol/shannon/protocol.go b/protocol/shannon/protocol.go index bc21d77e3..c1d6ef2b9 100644 --- a/protocol/shannon/protocol.go +++ b/protocol/shannon/protocol.go @@ -7,7 +7,6 @@ import ( "net/http" "github.com/pokt-network/poktroll/pkg/polylog" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" "github.com/buildwithgrove/path/gateway" @@ -411,7 +410,7 @@ func (p *Protocol) IsAlive() bool { func (p *Protocol) getUniqueEndpoints( ctx context.Context, serviceID protocol.ServiceID, - activeSessions []sessiontypes.Session, + activeSessions []hydratedSession, filterSanctioned bool, rpcType sharedtypes.RPCType, ) (map[protocol.EndpointAddr]endpoint, error) { @@ -465,7 +464,7 @@ func (p *Protocol) getUniqueEndpoints( func (p *Protocol) getSessionsUniqueEndpoints( _ context.Context, serviceID protocol.ServiceID, - activeSessions []sessiontypes.Session, + activeSessions []hydratedSession, filterByRPCType sharedtypes.RPCType, ) (map[protocol.EndpointAddr]endpoint, error) { logger := p.logger.With( @@ -480,38 +479,20 @@ func (p *Protocol) getSessionsUniqueEndpoints( endpoints := make(map[protocol.EndpointAddr]endpoint) - // TODO_TECHDEBT(@adshmh): Refactor load testing related code to make the filtering more visible. - // - // In Load Testing using RelayMiner mode: drop any endpoints ot matching the single supplier specified in the config. - // - var allowedSupplierAddr string - if ltc := p.loadTestingConfig; ltc != nil { - if ltc.RelayMinerConfig != nil { - allowedSupplierAddr = ltc.RelayMinerConfig.SupplierAddr - } - } - // Iterate over all active sessions for the service ID. - for _, session := range activeSessions { - app := session.Application + for _, hydratedSession := range activeSessions { + app := hydratedSession.session.Application // Using a single iteration scope for this logger. // Avoids adding all apps in the loop to the logger's fields. // Hydrate the logger with session details. logger := logger.With("valid_app_address", app.Address).With("method", "getSessionsUniqueEndpoints") - logger = hydrateLoggerWithSession(logger, &session) - logger.ProbabilisticDebugInfo(polylog.ProbabilisticDebugInfoProb).Msgf("Finding unique endpoints for session %s for app %s for service %s.", session.SessionId, app.Address, serviceID) - - // Retrieve all endpoints for the session. - sessionEndpoints, err := endpointsFromSession(session, allowedSupplierAddr) - if err != nil { - logger.Error().Err(err).Msgf("Internal error: error getting all endpoints for service %s app %s and session: skipping the app.", serviceID, app.Address) - continue - } + logger = hydrateLoggerWithSession(logger, hydratedSession.session) + logger.ProbabilisticDebugInfo(polylog.ProbabilisticDebugInfoProb).Msgf("Finding unique endpoints for session %s for app %s for service %s.", hydratedSession.session.SessionId, app.Address, serviceID) // Initialize the qualified endpoints as the full set of session endpoints. // Sanctioned endpoints will be filtered out below if a valid RPC type is provided. - qualifiedEndpoints := sessionEndpoints + qualifiedEndpoints := hydratedSession.endpoints // Filter out sanctioned endpoints if a valid RPC type is provided. // If no valid RPC type is provided, don't filter out sanctioned endpoints. @@ -519,7 +500,7 @@ func (p *Protocol) getSessionsUniqueEndpoints( if sanctionedEndpointsStore, ok := p.sanctionedEndpointsStores[filterByRPCType]; ok { logger.Debug().Msgf( "app %s has %d endpoints before filtering sanctioned endpoints.", - app.Address, len(sessionEndpoints), + app.Address, len(hydratedSession.endpoints), ) // Filter out any sanctioned endpoints @@ -528,7 +509,7 @@ func (p *Protocol) getSessionsUniqueEndpoints( if len(filteredEndpoints) == 0 { logger.Error().Msgf( "❌ All %d session endpoints are sanctioned for service %s, app %s. SKIPPING the app.", - len(sessionEndpoints), serviceID, app.Address, + len(hydratedSession.endpoints), serviceID, app.Address, ) continue } @@ -538,13 +519,13 @@ func (p *Protocol) getSessionsUniqueEndpoints( } // Log the number of endpoints before and after filtering - logger.Info().Msgf("Filtered session endpoints for app %s from %d to %d.", app.Address, len(sessionEndpoints), len(qualifiedEndpoints)) + logger.Info().Msgf("Filtered session endpoints for app %s from %d to %d.", app.Address, len(hydratedSession.endpoints), len(qualifiedEndpoints)) maps.Copy(endpoints, qualifiedEndpoints) logger.Info().Msgf( "Successfully fetched %d endpoints for session %s for application %s for service %s.", - len(qualifiedEndpoints), session.SessionId, app.Address, serviceID, + len(qualifiedEndpoints), hydratedSession.session.SessionId, app.Address, serviceID, ) } diff --git a/protocol/shannon/session.go b/protocol/shannon/session.go index ab6e8e7b8..dc2cd34f2 100644 --- a/protocol/shannon/session.go +++ b/protocol/shannon/session.go @@ -5,20 +5,20 @@ import ( "fmt" "github.com/pokt-network/poktroll/pkg/polylog" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" "github.com/buildwithgrove/path/protocol" ) var ( - // TODO_UPNEXT(@olshansk): Experiment the difference between active and extended sessions. - // - Make this configurable at the gateway yaml level - // - Add metrics to track how active vs extended sessions are used - // - Evaluate the impact of active vs extended sessions on performance - // - Enable making two parallel requests: one with active session and one with extended session - // DEV_NOTE: As of PR #339, we are hard-coding this to prevent any business logic changes to enable - // faster iteration on main and prevent the outstanding PR from getting stale. - extendedSessionEnabled = false +// TODO_UPNEXT(@olshansk): Experiment the difference between active and extended sessions. +// - Make this configurable at the gateway yaml level +// - Add metrics to track how active vs extended sessions are used +// - Evaluate the impact of active vs extended sessions on performance +// - Enable making two parallel requests: one with active session and one with extended session +// DEV_NOTE: As of PR #339, we are hard-coding this to prevent any business logic changes to enable +// faster iteration on main and prevent the outstanding PR from getting stale. + +// extendedSessionEnabled = false ) // getSession returns the session for the app address provided. @@ -28,18 +28,16 @@ func (p *Protocol) getSession( logger polylog.Logger, appAddr string, serviceID protocol.ServiceID, -) (sessiontypes.Session, error) { +) (hydratedSession, error) { logger.Info().Msgf("About to get a session for app %s for service %s", appAddr, serviceID) var err error - var session sessiontypes.Session + var session hydratedSession - // Retrieve the session for the owned app, without grace period logic. - if extendedSessionEnabled { - session, err = p.GetSessionWithExtendedValidity(ctx, serviceID, appAddr) - } else { - session, err = p.GetSession(ctx, serviceID, appAddr) - } + // TODO_TECHDEBT(@adshmh): Support sessions with grace period. + // Use GetSessionWithExtendedValidity method. + // + session, err = p.GetSession(ctx, serviceID, appAddr) if err != nil { err = fmt.Errorf("%w: Error getting the current session from the full node for app: %s, error: %w", errProtocolContextSetupFetchSession, appAddr, err) logger.Error().Err(err).Msgf("SHOULD NEVER HAPPEN: %s", err.Error()) @@ -47,7 +45,7 @@ func (p *Protocol) getSession( } // Select the first session in the list. - selectedApp := session.Application + selectedApp := session.session.Application logger.Debug().Msgf("fetched the app with the selected address %s.", selectedApp.Address) if appAddr != selectedApp.Address {