Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PPing, p2p.NewPingService(p2pNode, peerIDs, conf.TestConfig.TestPingConfig))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(p2pNode))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewRelayRouter(p2pNode, peerIDs, relays))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceDirectConns, p2p.ForceDirectConnections(p2pNode, peerIDs))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceQUICConns, p2p.UpgradeToQUICConnections(p2pNode, peerIDs))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiagnostic, p2p.NewPeerStateDiagnostic(p2pNode, peerIDs))

return p2pNode, nil
}
Expand Down
2 changes: 1 addition & 1 deletion app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ const (
StartValidatorAPI
StartP2PPing
StartP2PRouters
StartForceDirectConns
StartForceQUICConns
StartP2PDiagnostic
StartP2PConsensus
StartSimulator
StartScheduler
Expand Down
13 changes: 6 additions & 7 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions app/lifecycle/orderstop_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func newRunCmd(runFunc func(context.Context, app.Config) error, unsafe bool) *co

libp2plog.SetPrimaryCore(log.LoggerCore()) // Set libp2p logger to use charon logger

_ = libp2plog.SetLogLevel("p2p-holepunch", "debug")
_ = libp2plog.SetLogLevel("observedaddrs", "debug")

printLicense(cmd.Context())
printFlags(cmd.Context(), cmd.Flags())

Expand Down
203 changes: 157 additions & 46 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic" //nolint:revive // Must be imported with alias
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -106,6 +107,7 @@
libp2p.ConnectionGater(connGater),
// Enable Autonat (required for hole punching)
libp2p.EnableNATService(),
libp2p.EnableHolePunching(holepunch.WithTracer(newHolePunchTracer(ctx))),
libp2p.AddrsFactory(func(internalAddrs []ma.Multiaddr) []ma.Multiaddr {
return filterAdvertisedAddrs(externalAddrs, internalAddrs, filterPrivateAddrs)
}),
Expand Down Expand Up @@ -256,9 +258,15 @@
}

// NewEventCollector returns a lifecycle hook that instruments libp2p events.
func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx {

Check failure on line 261 in p2p/p2p.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ4_jSaMSosnWSdcPU6V&open=AZ4_jSaMSosnWSdcPU6V&pullRequest=4516
return func(ctx context.Context) {
sub, err := p2pNode.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
sub, err := p2pNode.EventBus().Subscribe([]any{
new(event.EvtLocalReachabilityChanged),
new(event.EvtLocalAddressesUpdated),
new(event.EvtNATDeviceTypeChanged),
new(event.EvtPeerIdentificationCompleted),
new(event.EvtPeerIdentificationFailed),
})
if err != nil {
log.Error(ctx, "Failed to subscribe to libp2p events", err)
return
Expand All @@ -277,6 +285,41 @@
case event.EvtLocalReachabilityChanged:
log.Info(ctx, "Libp2p reachability changed", z.Any("status", evt.Reachability))
reachableGauge.Set(float64(evt.Reachability))
case event.EvtLocalAddressesUpdated:
var addrs []string

for _, a := range evt.Current {
if a.Action == event.Added {
addrs = append(addrs, a.Address.String())
}
}

if len(addrs) > 0 {
log.Debug(ctx, "Libp2p addresses updated, new addresses added",
z.Any("added", addrs),
z.Any("all_host_addrs", p2pNode.Addrs()),
)
}
case event.EvtNATDeviceTypeChanged:
log.Debug(ctx, "NAT device type changed",
z.Any("transport", evt.TransportProtocol),
z.Any("nat_type", evt.NatDeviceType),
)
case event.EvtPeerIdentificationCompleted:
isPublic := evt.ObservedAddr != nil && manet.IsPublicAddr(evt.ObservedAddr)
supportsDCUtR := slices.Contains(evt.Protocols, holepunch.Protocol)
log.Debug(ctx, "Peer identification completed",
z.Str("peer", PeerName(evt.Peer)),
z.Any("observed_addr", evt.ObservedAddr),
z.Any("conn_local", evt.Conn.LocalMultiaddr()),
z.Any("conn_remote", evt.Conn.RemoteMultiaddr()),
z.Bool("observed_is_public", isPublic),
z.Bool("peer_supports_dcutr", supportsDCUtR),
)
case event.EvtPeerIdentificationFailed:
log.Warn(ctx, "Peer identification failed", evt.Reason,
z.Str("peer", PeerName(evt.Peer)),
)
default:
log.Warn(ctx, "Unknown libp2p event", nil, z.Str("type", fmt.Sprintf("%T", e)))
}
Expand All @@ -285,56 +328,130 @@
}
}

// peerRoutingFunc wraps a function to implement routing.PeerRouting.
type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error)

func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
return f(ctx, p)
// holePunchTracer implements holepunch.EventTracer to log all DCUtR lifecycle events.
type holePunchTracer struct {
ctx context.Context

Check warning on line 333 in p2p/p2p.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'context.Context' field and pass context as a parameter to methods that need it.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ4_jSaMSosnWSdcPU6X&open=AZ4_jSaMSosnWSdcPU6X&pullRequest=4516
}

// ForceDirectConnections attempts to establish a direct connection if there is an existing relay connection to the peer.
// The idea is to enable switching to a direct connection as soon as the host has a connection to the peer.
func ForceDirectConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx {
forceDirectConn := func(ctx context.Context) {
for _, p := range peerIDs {
if p2pNode.ID() == p {
continue // Skip self
}

conns := p2pNode.Network().ConnsToPeer(p)
if len(conns) == 0 {
// Skip if there isn't any existing connection to peer. Note that we only force direct connection
// if there is already an existing relay connection between the host and peer.
continue
}

if isDirectConnAvailable(conns) {
continue
}
func newHolePunchTracer(ctx context.Context) *holePunchTracer {
return &holePunchTracer{ctx: log.WithTopic(ctx, "p2p")}
}

// All existing connections are through relays, so we can try force dialing a direct connection.
err := p2pNode.Connect(network.WithForceDirectDial(ctx, "relay_to_direct"), peer.AddrInfo{ID: p})
if err == nil {
log.Debug(ctx, "Forced direct connection to peer successful", z.Str("peer", PeerName(p)))
}
func (t *holePunchTracer) Trace(evt *holepunch.Event) {
name := PeerName(evt.Remote)
switch e := evt.Evt.(type) {
case *holepunch.StartHolePunchEvt:
log.Debug(t.ctx, "Hole punch started",
z.Str("peer", name),
z.Any("remote_addrs", e.RemoteAddrs),
z.Any("rtt", e.RTT),
)
case *holepunch.EndHolePunchEvt:
if e.Success {
log.Debug(t.ctx, "Hole punch succeeded",
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
)
} else {
log.Warn(t.ctx, "Hole punch failed", errors.New(e.Error),
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
)
}
case *holepunch.HolePunchAttemptEvt:
log.Debug(t.ctx, "Hole punch attempt",
z.Str("peer", name),
z.Int("attempt", e.Attempt),
)
case *holepunch.DirectDialEvt:
if e.Success {
log.Debug(t.ctx, "Direct dial succeeded during hole punch",
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
)
} else {
log.Debug(t.ctx, "Direct dial failed during hole punch",
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
z.Str("error", e.Error),
)
}
case *holepunch.ProtocolErrorEvt:
log.Warn(t.ctx, "Hole punch protocol error", errors.New(e.Error),
z.Str("peer", name),
)
default:
log.Warn(t.ctx, "Unknown hole punch event", nil, z.Str("type", fmt.Sprintf("%T", evt.Evt)))
}
}

// NewPeerStateDiagnostic returns a lifecycle hook that periodically logs the connection
// and peerstore address state for each cluster peer to help diagnose hole punching issues.
func NewPeerStateDiagnostic(p2pNode host.Host, peers []peer.ID) lifecycle.HookFuncCtx {

Check failure on line 390 in p2p/p2p.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 28 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ4_jSaMSosnWSdcPU6W&open=AZ4_jSaMSosnWSdcPU6W&pullRequest=4516
return func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
ctx = log.WithTopic(ctx, "p2p")

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
forceDirectConn(ctx)
}

selfID := p2pNode.ID()
for _, pID := range peers {
if pID == selfID {
continue
}

name := PeerName(pID)

var relayConns, directConns int

for _, conn := range p2pNode.Network().ConnsToPeer(pID) {
if isRelayAddr(conn.RemoteMultiaddr()) {
relayConns++
} else {
directConns++
}
}

var publicAddrs, privateAddrs, relayAddrs int

for _, addr := range p2pNode.Peerstore().Addrs(pID) {
switch {
case isRelayAddr(addr):
relayAddrs++
case manet.IsPublicAddr(addr):
publicAddrs++
default:
privateAddrs++
}
}

log.Debug(ctx, "Peer connection state",
z.Str("peer", name),
z.Int("relay_conns", relayConns),
z.Int("direct_conns", directConns),
z.Int("peerstore_public_addrs", publicAddrs),
z.Int("peerstore_private_addrs", privateAddrs),
z.Int("peerstore_relay_addrs", relayAddrs),
)
}
}
}
}

// peerRoutingFunc wraps a function to implement routing.PeerRouting.
type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error)

func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
return f(ctx, p)
}

// isQUICEnabled returns true if the host has an address or listening address on QUIC.
func isQUICEnabled(h host.Host) bool {
if slices.ContainsFunc(h.Network().ListenAddresses(), isQUICAddr) {
Expand All @@ -350,19 +467,6 @@
return false
}

// isDirectConnAvailable returns true if direct connection is available in the given set of connections.
func isDirectConnAvailable(conns []network.Conn) bool {
for _, conn := range conns {
if IsRelayAddr(conn.RemoteMultiaddr()) {
continue
}

return true
}

return false
}

// UpgradeToQUICConnections tries to upgrade a direct TCP connection to a direct QUIC connection
// if there is known QUIC addresses from the peerstore.
func UpgradeToQUICConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx {
Expand Down Expand Up @@ -457,7 +561,7 @@

if !hasDirectTCPConn(conns) {
log.Debug(ctx, "No direct connection via TCP to peer", z.Str("peer", PeerName(p)), z.Any("conns", conns))
continue // no direct TPC connection to upgrade to QUIC, ForceDirectConnections shall upgrade to direct
continue // no direct TCP connection to upgrade to QUIC, hole punching shall upgrade to direct
}

// Get known QUIC addrs from peerstore
Expand Down Expand Up @@ -733,6 +837,13 @@
z.Any("direction", e.Direction),
z.Str("type", typ),
)

if typ == addrTypeRelay && e.Direction == network.DirInbound {
log.Debug(ctx, "Inbound relay connection detected, DCUtR hole punch should initiate",
z.Str("peer", name),
z.Any("peer_address", addr),
)
}
} else if e.Disconnect {
log.Debug(ctx, "Libp2p disconnected",
z.Str("peer", name),
Expand Down
Loading
Loading