From 9bfd9cb7f54245af7eb3eefaff2d29f7465984bb Mon Sep 17 00:00:00 2001 From: soffokl Date: Thu, 9 Apr 2026 09:34:56 +0600 Subject: [PATCH 1/6] Optimize session storage blocking --- consumer/session/session_storage.go | 117 +++++++++++++++++++++------- 1 file changed, 89 insertions(+), 28 deletions(-) diff --git a/consumer/session/session_storage.go b/consumer/session/session_storage.go index e3351c3831..64111c15d2 100644 --- a/consumer/session/session_storage.go +++ b/consumer/session/session_storage.go @@ -46,18 +46,41 @@ type Storage struct { mu sync.RWMutex sessionsActive map[session_node.ID]History + + writeQueue chan func() + stopQueue chan struct{} } // NewSessionStorage creates session repository with given dependencies. func NewSessionStorage(storage *boltdb.Bolt) *Storage { - return &Storage{ + s := &Storage{ storage: storage, timeGetter: time.Now, sessionsActive: make(map[session_node.ID]History), + writeQueue: make(chan func(), 100), + stopQueue: make(chan struct{}), + } + go s.writeWorker() + return s +} + +func (repo *Storage) writeWorker() { + for { + select { + case fn := <-repo.writeQueue: + fn() + case <-repo.stopQueue: + return + } } } +// Close stops the background write worker. +func (repo *Storage) Close() { + close(repo.stopQueue) +} + // Subscribe subscribes to relevant events of event bus. func (repo *Storage) Subscribe(bus eventbus.Subscriber) error { if err := bus.Subscribe(session_event.AppTopicSession, repo.consumeServiceSessionEvent); err != nil { @@ -69,13 +92,13 @@ func (repo *Storage) Subscribe(bus eventbus.Subscriber) error { if err := bus.SubscribeAsync(session_event.AppTopicTokensEarned, repo.consumeServiceSessionEarningsEvent); err != nil { return err } - if err := bus.Subscribe(connectionstate.AppTopicConnectionSession, repo.consumeConnectionSessionEvent); err != nil { + if err := bus.SubscribeAsync(connectionstate.AppTopicConnectionSession, repo.consumeConnectionSessionEvent); err != nil { return err } if err := bus.Subscribe(connectionstate.AppTopicConnectionStatistics, repo.consumeConnectionStatisticsEvent); err != nil { return err } - return bus.Subscribe(pingpong_event.AppTopicInvoicePaid, repo.consumeConnectionSpendingEvent) + return bus.SubscribeAsync(pingpong_event.AppTopicInvoicePaid, repo.consumeConnectionSpendingEvent) } // GetAll returns array of all sessions. @@ -274,65 +297,103 @@ func (repo *Storage) consumeConnectionStatisticsEvent(e connectionstate.AppEvent func (repo *Storage) consumeConnectionSpendingEvent(e pingpong_event.AppEventInvoicePaid) { repo.mu.Lock() - defer repo.mu.Unlock() - sessionID := session_node.ID(e.SessionID) + row, ok := repo.activeSession(sessionID) if !ok { + repo.mu.Unlock() return } row.Updated = repo.timeGetter().UTC() row.Tokens = e.Invoice.AgreementTotal + repo.mu.Unlock() - err := repo.storage.Update(sessionStorageBucketName, &row) - if err != nil { - log.Error().Err(err).Msgf("Session %v update failed", sessionID) - return - } + fn := func() { + err := repo.storage.Update(sessionStorageBucketName, &row) + if err != nil { + log.Error().Err(err).Msgf("Session %v update failed", sessionID) + return + } - repo.sessionsActive[sessionID] = row - log.Debug().Msgf("Session %v updated", sessionID) + repo.mu.Lock() + repo.sessionsActive[sessionID] = row + repo.mu.Unlock() + log.Debug().Msgf("Session %v updated", sessionID) + } + select { + case repo.writeQueue <- fn: + case <-repo.stopQueue: + log.Warn().Msgf("Session storage closed, dropping spending update for %v", sessionID) + default: + log.Warn().Msgf("Session write queue full, dropping spending update for %v", sessionID) + } } func (repo *Storage) handleEndedEvent(sessionID session_node.ID) { repo.mu.Lock() - defer repo.mu.Unlock() row, ok := repo.sessionsActive[sessionID] if !ok { + repo.mu.Unlock() log.Warn().Msgf("Can't find session %v to update", sessionID) return } + row.Updated = repo.timeGetter().UTC() row.Status = StatusCompleted + repo.mu.Unlock() - err := repo.storage.Update(sessionStorageBucketName, &row) - if err != nil { - log.Error().Err(err).Msgf("Session %v update failed", sessionID) - return - } + fn := func() { + err := repo.storage.Update(sessionStorageBucketName, &row) + if err != nil { + log.Error().Err(err).Msgf("Session %v update failed", sessionID) + return + } - delete(repo.sessionsActive, sessionID) - log.Debug().Msgf("Session %v updated with final data", sessionID) + repo.mu.Lock() + delete(repo.sessionsActive, sessionID) + repo.mu.Unlock() + log.Debug().Msgf("Session %v updated with final data", sessionID) + } + select { + case repo.writeQueue <- fn: + case <-repo.stopQueue: + log.Warn().Msgf("Session storage closed, dropping end-event update for %v", sessionID) + default: + log.Warn().Msgf("Session write queue full, dropping end-event update for %v", sessionID) + } } func (repo *Storage) handleCreatedEvent(sessionID session_node.ID) { repo.mu.Lock() - defer repo.mu.Unlock() row, ok := repo.sessionsActive[sessionID] if !ok { + repo.mu.Unlock() log.Warn().Msgf("Can't find session %v to store", sessionID) return } + row.Status = StatusNew + repo.mu.Unlock() - err := repo.storage.Store(sessionStorageBucketName, &row) - if err != nil { - log.Error().Err(err).Msgf("Session %v insert failed", row.SessionID) - return - } + fn := func() { + err := repo.storage.Store(sessionStorageBucketName, &row) + if err != nil { + log.Error().Err(err).Msgf("Session %v insert failed", row.SessionID) + return + } - repo.sessionsActive[sessionID] = row - log.Debug().Msgf("Session %v saved", row.SessionID) + repo.mu.Lock() + repo.sessionsActive[sessionID] = row + repo.mu.Unlock() + log.Debug().Msgf("Session %v saved", row.SessionID) + } + select { + case repo.writeQueue <- fn: + case <-repo.stopQueue: + log.Warn().Msgf("Session storage closed, dropping create-event for %v", sessionID) + default: + log.Warn().Msgf("Session write queue full, dropping create-event for %v", sessionID) + } } From 4472b6d431a82589889c7c1df9a069ff65ac1a29 Mon Sep 17 00:00:00 2001 From: soffokl Date: Thu, 9 Apr 2026 09:36:10 +0600 Subject: [PATCH 2/6] Faster release of the connection mutexes --- core/connection/manager.go | 25 ++++++++++++++++++++----- core/connection/multi.go | 23 ++++++++++++++++------- identity/registry/storage.go | 10 +++++----- p2p/stun.go | 3 ++- requests/resolver/resolver_cache.go | 6 +++--- 5 files changed, 46 insertions(+), 21 deletions(-) diff --git a/core/connection/manager.go b/core/connection/manager.go index dd6718ceec..dde63f212a 100644 --- a/core/connection/manager.go +++ b/core/connection/manager.go @@ -48,7 +48,7 @@ import ( ) const ( - p2pDialTimeout = 60 * time.Second + p2pDialTimeout = 20 * time.Second ) var ( @@ -809,11 +809,22 @@ func (m *connectionManager) Cancel() { } func (m *connectionManager) Disconnect() error { - if m.Status().State == connectionstate.NotConnected { + m.statusLock.Lock() + stateWas := m.status.State + if stateWas == connectionstate.NotConnected { + m.statusLock.Unlock() return ErrNoConnection } + if stateWas == connectionstate.Disconnecting { + m.statusLock.Unlock() + return nil + } + m.status.State = connectionstate.Disconnecting + m.statusLock.Unlock() + + log.Info().Msgf("Connection state: %v -> %v", stateWas, connectionstate.Disconnecting) + m.publishStateEvent(connectionstate.Disconnecting) - m.statusDisconnecting() m.disconnect() return nil @@ -1025,9 +1036,13 @@ func (m *connectionManager) Reconnect() { } log.Info().Msg("Waiting for previous session to cleanup") + // Capture the channel reference while holding the lock, then release the lock + // before waiting. This prevents a deadlock where Connect() fails and its deferred + // disconnect() tries to re-acquire cleanupFinishedLock. m.cleanupFinishedLock.Lock() - defer m.cleanupFinishedLock.Unlock() - <-m.cleanupFinished + ch := m.cleanupFinished + m.cleanupFinishedLock.Unlock() + <-ch err = m.Connect(m.connectOptions.ConsumerID, m.connectOptions.HermesID, m.connectOptions.ProposalLookup, m.connectOptions.Params) if err != nil { log.Error().Err(err).Msgf("Failed to reconnect") diff --git a/core/connection/multi.go b/core/connection/multi.go index 6aa0139bb1..3d6eda7580 100644 --- a/core/connection/multi.go +++ b/core/connection/multi.go @@ -46,14 +46,19 @@ func NewMultiConnectionManager(newConnectionManager func() Manager) *multiConnec // Connect creates new connection from given consumer to provider, reports error if connection already exists. func (mcm *multiConnectionManager) Connect(consumerID identity.Identity, hermesID common.Address, proposalLookup ProposalLookup, params ConnectParams) error { - mcm.mu.Lock() - + mcm.mu.RLock() m, ok := mcm.cms[params.ProxyPort] + mcm.mu.RUnlock() + if !ok { - m = mcm.newConnectionManager() - mcm.cms[params.ProxyPort] = m + mcm.mu.Lock() + m, ok = mcm.cms[params.ProxyPort] + if !ok { + m = mcm.newConnectionManager() + mcm.cms[params.ProxyPort] = m + } + mcm.mu.Unlock() } - mcm.mu.Unlock() return m.Connect(consumerID, hermesID, proposalLookup, params) } @@ -97,9 +102,13 @@ func (mcm *multiConnectionManager) Disconnect(id int) error { if id < 0 { mcm.mu.RLock() - defer mcm.mu.RUnlock() - + managers := make([]Manager, 0, len(mcm.cms)) for _, m := range mcm.cms { + managers = append(managers, m) + } + mcm.mu.RUnlock() + + for _, m := range managers { if err := m.Disconnect(); err != nil { log.Error().Err(err).Msg("Failed to disconnect active connection") } diff --git a/identity/registry/storage.go b/identity/registry/storage.go index 0e8e0d9e55..96bef2b1cc 100644 --- a/identity/registry/storage.go +++ b/identity/registry/storage.go @@ -41,7 +41,7 @@ var ErrNotFound = errors.New("no info for provided identity available in storage // RegistrationStatusStorage allows for storing of registration statuses. type RegistrationStatusStorage struct { - lock sync.Mutex + lock sync.RWMutex bolt persistentStorage } @@ -105,15 +105,15 @@ func (rss *RegistrationStatusStorage) get(chainID int64, identity identity.Ident // Get fetches the promise for the given hermes. func (rss *RegistrationStatusStorage) Get(chainID int64, identity identity.Identity) (StoredRegistrationStatus, error) { - rss.lock.Lock() - defer rss.lock.Unlock() + rss.lock.RLock() + defer rss.lock.RUnlock() return rss.get(chainID, identity) } // GetAll fetches all the registration statuses func (rss *RegistrationStatusStorage) GetAll() ([]StoredRegistrationStatus, error) { - rss.lock.Lock() - defer rss.lock.Unlock() + rss.lock.RLock() + defer rss.lock.RUnlock() list := []storedRegistrationStatus{} err := rss.bolt.GetAllFrom(registrationStatusBucket, &list) diff --git a/p2p/stun.go b/p2p/stun.go index 34ab740b15..51b0b05789 100644 --- a/p2p/stun.go +++ b/p2p/stun.go @@ -60,7 +60,6 @@ func stunPorts(identity identity.Identity, eventBus eventbus.Publisher, localPor resp := multiServerSTUN(serverList, p, 2) mu.Lock() - defer mu.Unlock() natType := "unknown" @@ -85,6 +84,8 @@ func stunPorts(identity identity.Identity, eventBus eventbus.Publisher, localPor delete(m, p) } + mu.Unlock() + if eventBus != nil { eventBus.Publish(AppTopicSTUN, STUNDetectionStatus{ Identity: identity.Address, diff --git a/requests/resolver/resolver_cache.go b/requests/resolver/resolver_cache.go index 4c42de7ecc..3cf00594d0 100644 --- a/requests/resolver/resolver_cache.go +++ b/requests/resolver/resolver_cache.go @@ -24,7 +24,7 @@ import ( var defaultResolveCache = NewResolverCache() type resolverCache struct { - mu sync.Mutex + mu sync.RWMutex cache map[string][]string } @@ -46,8 +46,8 @@ func FetchDNSFromCache(name string) (addrs []string) { } func (rc *resolverCache) Fetch(name string) []string { - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() return rc.cache[name] } From 60f5337b0e9687e81f4e3a065186ff6d7bb755f2 Mon Sep 17 00:00:00 2001 From: soffokl Date: Thu, 9 Apr 2026 09:36:32 +0600 Subject: [PATCH 3/6] Fix goroutine leak --- session/pingpong/factory.go | 14 ++++++++++---- session/pingpong/invoice_payer.go | 4 ++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/session/pingpong/factory.go b/session/pingpong/factory.go index d2aa22e9bb..e8d40b79ba 100644 --- a/session/pingpong/factory.go +++ b/session/pingpong/factory.go @@ -107,7 +107,8 @@ func ExchangeFactoryFunc( dataLeewayMegabytes uint64, ) func(senderUUID string, channel p2p.Channel, consumer, provider identity.Identity, hermes common.Address, proposal proposal.PricedServiceProposal, price market.Price) (connection.PaymentIssuer, error) { return func(senderUUID string, channel p2p.Channel, consumer, provider identity.Identity, hermes common.Address, proposal proposal.PricedServiceProposal, price market.Price) (connection.PaymentIssuer, error) { - invoices, err := invoiceReceiver(channel) + receiverStop := make(chan struct{}) + invoices, err := invoiceReceiver(channel, receiverStop) if err != nil { return nil, err } @@ -115,6 +116,7 @@ func ExchangeFactoryFunc( deps := InvoicePayerDeps{ SenderUUID: senderUUID, InvoiceChan: invoices, + ReceiverStop: receiverStop, PeerExchangeMessageSender: NewExchangeSender(channel), ConsumerTotalsStorage: totalStorage, TimeTracker: &timeTracker, @@ -132,8 +134,8 @@ func ExchangeFactoryFunc( } } -func invoiceReceiver(channel p2p.ChannelHandler) (chan crypto.Invoice, error) { - invoices := make(chan crypto.Invoice) +func invoiceReceiver(channel p2p.ChannelHandler, stop <-chan struct{}) (chan crypto.Invoice, error) { + invoices := make(chan crypto.Invoice, 1) channel.Handle(p2p.TopicPaymentInvoice, func(c p2p.Context) error { var msg pb.Invoice @@ -162,13 +164,17 @@ func invoiceReceiver(channel p2p.ChannelHandler) (chan crypto.Invoice, error) { return fmt.Errorf("could not unmarshal field transactorFee of value %v", transactorFee) } - invoices <- crypto.Invoice{ + select { + case invoices <- crypto.Invoice{ AgreementID: agreementID, AgreementTotal: agreementTotal, TransactorFee: transactorFee, Hashlock: msg.GetHashlock(), Provider: msg.GetProvider(), ChainID: msg.GetChainID(), + }: + case <-stop: + return fmt.Errorf("invoice receiver stopped") } return nil diff --git a/session/pingpong/invoice_payer.go b/session/pingpong/invoice_payer.go index b379f5ab80..5025593312 100644 --- a/session/pingpong/invoice_payer.go +++ b/session/pingpong/invoice_payer.go @@ -94,6 +94,7 @@ type hashSigner interface { // InvoicePayerDeps contains all the dependencies for the exchange message tracker. type InvoicePayerDeps struct { InvoiceChan chan crypto.Invoice + ReceiverStop chan struct{} PeerExchangeMessageSender PeerExchangeMessageSender ConsumerTotalsStorage consumerTotalsStorage TimeTracker timeTracker @@ -294,6 +295,9 @@ func (ip *InvoicePayer) Stop() { ip.once.Do(func() { log.Debug().Msg("Stopping...") close(ip.stop) + if ip.deps.ReceiverStop != nil { + close(ip.deps.ReceiverStop) + } }) } From 8f18f8e020cc47c7aee3360ae94d16dd30eb4061 Mon Sep 17 00:00:00 2001 From: soffokl Date: Thu, 9 Apr 2026 09:58:31 +0600 Subject: [PATCH 4/6] Add waitForQueue method to ensure all queued writes are processed --- consumer/session/session_storage.go | 7 +++++++ consumer/session/session_storage_test.go | 11 +++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/consumer/session/session_storage.go b/consumer/session/session_storage.go index 64111c15d2..fa17364a39 100644 --- a/consumer/session/session_storage.go +++ b/consumer/session/session_storage.go @@ -81,6 +81,13 @@ func (repo *Storage) Close() { close(repo.stopQueue) } +// waitForQueue blocks until all queued writes have been processed. +func (repo *Storage) waitForQueue() { + done := make(chan struct{}) + repo.writeQueue <- func() { close(done) } + <-done +} + // Subscribe subscribes to relevant events of event bus. func (repo *Storage) Subscribe(bus eventbus.Subscriber) error { if err := bus.Subscribe(session_event.AppTopicSession, repo.consumeServiceSessionEvent); err != nil { diff --git a/consumer/session/session_storage_test.go b/consumer/session/session_storage_test.go index 11990e4134..54b71f48d4 100644 --- a/consumer/session/session_storage_test.go +++ b/consumer/session/session_storage_test.go @@ -24,16 +24,17 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + + session_event "github.com/mysteriumnetwork/nession/pingpong/event" "github.com/mysteriumnetwork/node/core/connection/connectionstate" "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/storage/boltdb" "github.com/mysteriumnetwork/node/identity" "github.com/mysteriumnetwork/node/market" session_node "github.com/mysteriumnetwork/node/session" - session_event "github.com/mysteriumnetwork/node/session/event" "github.com/mysteriumnetwork/node/session/pingpong/event" "github.com/mysteriumnetwork/payments/crypto" - "github.com/stretchr/testify/assert" ) var ( @@ -265,6 +266,7 @@ func TestSessionStorage_consumeServiceSessionsEvent(t *testing.T) { Status: session_event.CreatedStatus, Session: serviceSessionMock, }) + storage.waitForQueue() // then sessions, err := storage.GetAll() assert.Nil(t, err) @@ -301,6 +303,7 @@ func TestSessionStorage_consumeServiceSessionsEvent(t *testing.T) { Status: session_event.RemovedStatus, Session: serviceSessionMock, }) + storage.waitForQueue() // then sessions, err = storage.GetAll() assert.Nil(t, err) @@ -348,6 +351,7 @@ func TestSessionStorage_consumeEventEndedOK(t *testing.T) { Status: connectionstate.SessionEndedStatus, SessionInfo: connectionSessionMock, }) + storage.waitForQueue() // then sessions, err := storage.GetAll() @@ -385,6 +389,7 @@ func TestSessionStorage_consumeEventConnectedOK(t *testing.T) { Status: connectionstate.SessionCreatedStatus, SessionInfo: connectionSessionMock, }) + storage.waitForQueue() // then sessions, err := storage.GetAll() @@ -428,6 +433,7 @@ func TestSessionStorage_consumeSessionSpendingEvent(t *testing.T) { SessionID: "unknown", Invoice: connectionInvoiceMock, }) + storage.waitForQueue() // then sessions, err := storage.GetAll() assert.Nil(t, err) @@ -457,6 +463,7 @@ func TestSessionStorage_consumeSessionSpendingEvent(t *testing.T) { SessionID: "sessionID", Invoice: connectionInvoiceMock, }) + storage.waitForQueue() // then sessions, err = storage.GetAll() assert.Nil(t, err) From 3dfdc18eb75fb55f561adb52042c8f21f90d9d1a Mon Sep 17 00:00:00 2001 From: soffokl Date: Thu, 9 Apr 2026 10:05:06 +0600 Subject: [PATCH 5/6] Fix import path for session_event in session_storage_test.go --- consumer/session/session_storage_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/session/session_storage_test.go b/consumer/session/session_storage_test.go index 54b71f48d4..4fdbcb0f5b 100644 --- a/consumer/session/session_storage_test.go +++ b/consumer/session/session_storage_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" - session_event "github.com/mysteriumnetwork/nession/pingpong/event" "github.com/mysteriumnetwork/node/core/connection/connectionstate" "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/storage/boltdb" @@ -35,6 +34,7 @@ import ( session_node "github.com/mysteriumnetwork/node/session" "github.com/mysteriumnetwork/node/session/pingpong/event" "github.com/mysteriumnetwork/payments/crypto" + session_event "github.com/mysteriumnetwork/session/pingpong/event" ) var ( From b6be267efe6639b722c616e97315dcd81346e551 Mon Sep 17 00:00:00 2001 From: soffokl Date: Thu, 9 Apr 2026 10:16:38 +0600 Subject: [PATCH 6/6] Fix import path for session_event in session_storage_test.go --- consumer/session/session_storage_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/session/session_storage_test.go b/consumer/session/session_storage_test.go index 4fdbcb0f5b..484c8fb7a0 100644 --- a/consumer/session/session_storage_test.go +++ b/consumer/session/session_storage_test.go @@ -32,9 +32,9 @@ import ( "github.com/mysteriumnetwork/node/identity" "github.com/mysteriumnetwork/node/market" session_node "github.com/mysteriumnetwork/node/session" + session_event "github.com/mysteriumnetwork/node/session/event" "github.com/mysteriumnetwork/node/session/pingpong/event" "github.com/mysteriumnetwork/payments/crypto" - session_event "github.com/mysteriumnetwork/session/pingpong/event" ) var (