Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
DiscConcurrency: nodeconfig.DefaultP2PConcurrency,
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: false,
PeerScoreRetention: nodeconfig.DefaultPeerScoreRetention,
PeerMinScore: nodeconfig.DefaultPeerMinScore,
MaxPeers: nodeconfig.DefaultMaxPeers,
ConnManagerLowWatermark: nodeconfig.DefaultConnManagerLowWatermark,
ConnManagerHighWatermark: nodeconfig.DefaultConnManagerHighWatermark,
Expand Down
2 changes: 2 additions & 0 deletions cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
DiscConcurrency: hc.P2P.DiscConcurrency,
MaxConnPerIP: hc.P2P.MaxConnsPerIP,
DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan,
PeerScoreRetention: hc.P2P.PeerScoreRetention,
PeerMinScore: hc.P2P.PeerMinScore,
MaxPeers: hc.P2P.MaxPeers,
ConnManagerLowWatermark: hc.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: hc.P2P.ConnManagerHighWatermark,
Expand Down
2 changes: 2 additions & 0 deletions internal/configs/bootnode/bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type P2pConfig struct {
DiscConcurrency int // Discovery Concurrency value
MaxConnsPerIP int
DisablePrivateIPScan bool
PeerScoreRetention time.Duration
PeerMinScore float64
MaxPeers int64
// In order to disable Connection Manager, it only needs to
// set both the high and low watermarks to zero. In this way,
Expand Down
2 changes: 2 additions & 0 deletions internal/configs/harmony/harmony.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type P2pConfig struct {
DiscConcurrency int // Discovery Concurrency value
MaxConnsPerIP int
DisablePrivateIPScan bool
PeerScoreRetention time.Duration
PeerMinScore float64
MaxPeers int64
// In order to disable Connection Manager, it only needs to
// set both the high and low watermarks to zero. In this way,
Expand Down
5 changes: 5 additions & 0 deletions internal/configs/node/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ const (
DefaultP2PConcurrency = 0
// DefaultMaxConnPerIP is the maximum number of connections to/from a remote IP
DefaultMaxConnPerIP = 10
// DefaultPeerScoreRetention controls how long peer score records are kept.
// Zero disables peer score persistence and score-based connection gating.
DefaultPeerScoreRetention = time.Duration(0)
// DefaultPeerMinScore is the minimum score accepted by peer score gating.
DefaultPeerMinScore = 0.0
// DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit
DefaultMaxPeers = 0
// DefaultConnManagerLowWatermark is the lowest number of connections that'll be maintained in connection manager
Expand Down
92 changes: 89 additions & 3 deletions p2p/host.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package p2p

import (
"container/list"
"context"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -133,6 +134,8 @@ type HostConfig struct {
DiscConcurrency int
MaxConnPerIP int
DisablePrivateIPScan bool
PeerScoreRetention time.Duration
PeerMinScore float64
MaxPeers int64
ConnManagerLowWatermark int
ConnManagerHighWatermark int
Expand Down Expand Up @@ -168,6 +171,7 @@ func init() {
trustedPeersAddedCounter,
trustedPeersDnsResolvedCounter,
trustedPeersConnectFailuresCounter,
peerScoreGaugeVec,
)
}

Expand Down Expand Up @@ -196,8 +200,88 @@ var (
Name: "trusted_peer_connect_failures_total",
Help: "Total number of failed attempts to establish P2P host-level connections with trusted peers",
})
peerScoreGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "p2p",
Name: "peer_score",
Help: "current peer score by local instance and remote peer",
},
[]string{"local_p2p_id", "remote_p2p_id"},
)
peerScoreMetricMu sync.Mutex
peerScoreMetricList = list.New()
peerScoreMetricIdx = make(map[peerScoreMetricKey]*list.Element)
)

const maxPeerScoreMetricSeries = 2048

type peerScoreMetricKey struct {
localPeerID string
remotePeerID string
}

// SetPeerScoreMetric sets the canonical P2P peer score metric.
func SetPeerScoreMetric(localPeerID, remotePeerID string, score float64) {
if localPeerID == "" || remotePeerID == "" {
return
}
key := peerScoreMetricKey{
localPeerID: localPeerID,
remotePeerID: remotePeerID,
}

peerScoreMetricMu.Lock()
var evicted *peerScoreMetricKey
if elem, exists := peerScoreMetricIdx[key]; exists {
peerScoreMetricList.MoveToBack(elem)
} else {
if len(peerScoreMetricIdx) >= maxPeerScoreMetricSeries {
if oldest := peerScoreMetricList.Front(); oldest != nil {
oldestKey := oldest.Value.(peerScoreMetricKey)
evicted = &oldestKey
delete(peerScoreMetricIdx, oldestKey)
peerScoreMetricList.Remove(oldest)
}
}
elem := peerScoreMetricList.PushBack(key)
peerScoreMetricIdx[key] = elem
}
peerScoreMetricMu.Unlock()

if evicted != nil {
peerScoreGaugeVec.Delete(prometheus.Labels{
"local_p2p_id": evicted.localPeerID,
"remote_p2p_id": evicted.remotePeerID,
})
}

peerScoreGaugeVec.With(prometheus.Labels{
"local_p2p_id": localPeerID,
"remote_p2p_id": remotePeerID,
}).Set(score)
}

func DeletePeerScoreMetric(localPeerID, remotePeerID string) {
if localPeerID == "" || remotePeerID == "" {
return
}
key := peerScoreMetricKey{
localPeerID: localPeerID,
remotePeerID: remotePeerID,
}
peerScoreMetricMu.Lock()
if elem, exists := peerScoreMetricIdx[key]; exists {
delete(peerScoreMetricIdx, key)
peerScoreMetricList.Remove(elem)
}
peerScoreMetricMu.Unlock()
peerScoreGaugeVec.Delete(prometheus.Labels{
"local_p2p_id": localPeerID,
"remote_p2p_id": remotePeerID,
})
}

// NewHost ..
func NewHost(cfg HostConfig) (Host, error) {
var (
Expand Down Expand Up @@ -233,9 +317,7 @@ func NewHost(cfg HostConfig) (Host, error) {
if err != nil {
return nil, fmt.Errorf("failed to open peerstore: %w", err)
}
var scoreRetention time.Duration
// TODO: add scoreRetention to configs (for now, it is zero and so, peer scoring is disabled)
scoreRetention = 0
scoreRetention := cfg.PeerScoreRetention
logger := log.New()
ps, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, basePs, datastore, scoreRetention)
if err != nil {
Expand All @@ -256,6 +338,9 @@ func NewHost(cfg HostConfig) (Host, error) {
// Prevent dialing of public addresses
connGtr = gating.AddBlocking(connGtr, cfg.DisablePrivateIPScan)
}
if scoreRetention > 0 {
connGtr = gating.AddScoring(connGtr, ps, cfg.PeerMinScore)
}
connGtr = gating.AddBanExpiry(connGtr, ps, clock.SystemClock)
connGtr = gating.AddMetering(connGtr)

Expand Down Expand Up @@ -1295,6 +1380,7 @@ func (host *HostV2) Connected(net libp2p_network.Network, conn libp2p_network.Co
// called when a connection closed
func (host *HostV2) Disconnected(net libp2p_network.Network, conn libp2p_network.Conn) {
host.logger.Debug().Interface("node", conn.RemotePeer()).Msg("peer disconnected")
DeletePeerScoreMetric(host.GetID().String(), conn.RemotePeer().String())

for _, function := range host.onDisconnects.GetAll() {
if err := function(conn); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions p2p/store/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ type PeerScores struct {
ReqResp ReqRespScores `json:"reqResp"`
}

const (
// Req/resp contribution weights for the combined peer score.
peerScoreValidResponseWeight = 1.0
peerScoreErrorResponsePenalty = 1.0
peerScoreRejectedPayloadPenalty = 2.0
)

// ComputePeerScore returns the combined peer score used by gating and metrics.
// It combines gossip score and req/resp behavior with explicit penalties.
func ComputePeerScore(scores PeerScores) float64 {
return scores.Gossip.Total +
(scores.ReqResp.ValidResponses * peerScoreValidResponseWeight) -
(scores.ReqResp.ErrorResponses * peerScoreErrorResponsePenalty) -
(scores.ReqResp.RejectedPayloads * peerScoreRejectedPayloadPenalty)
}

// ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information
type ScoreDatastore interface {
// GetPeerScores returns the current scores for the specified peer
Expand Down
2 changes: 1 addition & 1 deletion p2p/store/scorebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) {
if err != nil {
return 0, err
}
return scores.Gossip.Total, nil
return ComputePeerScore(scores), nil
}

func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/store/scorebook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expecte

score, err := store.GetPeerScore(id)
require.NoError(t, err)
require.Equal(t, expected.Gossip.Total, score)
require.Equal(t, ComputePeerScore(expected), score)
}

func createMemoryStore(t *testing.T) ExtendedPeerstore {
Expand Down
Loading