diff --git a/app/app.go b/app/app.go index a85334ab45..f28af83dcf 100644 --- a/app/app.go +++ b/app/app.go @@ -404,8 +404,8 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PPing, p2p.NewPingService(p2pNode, peerIDs, conf.TestConfig.TestPingConfig)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(p2pNode)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewRelayRouter(p2pNode, peerIDs, relays)) - life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceDirectConns, p2p.ForceDirectConnections(p2pNode, peerIDs)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceQUICConns, p2p.UpgradeToQUICConnections(p2pNode, peerIDs)) + life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiagnostic, p2p.NewPeerStateDiagnostic(p2pNode, peerIDs)) return p2pNode, nil } diff --git a/app/lifecycle/order.go b/app/lifecycle/order.go index 4969432e7a..ca1d7da150 100644 --- a/app/lifecycle/order.go +++ b/app/lifecycle/order.go @@ -23,8 +23,8 @@ const ( StartValidatorAPI StartP2PPing StartP2PRouters - StartForceDirectConns StartForceQUICConns + StartP2PDiagnostic StartP2PConsensus StartSimulator StartScheduler diff --git a/app/lifecycle/orderstart_string.go b/app/lifecycle/orderstart_string.go index 8f402f0b10..acaccea6ef 100644 --- a/app/lifecycle/orderstart_string.go +++ b/app/lifecycle/orderstart_string.go @@ -20,8 +20,8 @@ func _() { _ = x[StartValidatorAPI-7] _ = x[StartP2PPing-8] _ = x[StartP2PRouters-9] - _ = x[StartForceDirectConns-10] - _ = x[StartForceQUICConns-11] + _ = x[StartForceQUICConns-10] + _ = x[StartP2PDiagnostic-11] _ = x[StartP2PConsensus-12] _ = x[StartSimulator-13] _ = x[StartScheduler-14] @@ -32,14 +32,13 @@ func _() { _ = x[StartStackSnipe-19] } -const _OrderStart_name = "TrackerPrivkeyLockEth1ClientAggSigDBRelayMonitoringAPIDebugAPIValidatorAPIP2PPingP2PRoutersForceDirectConnsForceQUICConnsP2PConsensusSimulatorSchedulerBuilderRegWatcherP2PEventCollectorPeerInfoParSigDBStackSnipe" +const _OrderStart_name = "TrackerPrivkeyLockEth1ClientAggSigDBRelayMonitoringAPIDebugAPIValidatorAPIP2PPingP2PRoutersForceQUICConnsP2PDiagnosticP2PConsensusSimulatorSchedulerBuilderRegWatcherP2PEventCollectorPeerInfoParSigDBStackSnipe" -var _OrderStart_index = [...]uint8{0, 7, 18, 28, 36, 41, 54, 62, 74, 81, 91, 107, 121, 133, 142, 151, 168, 185, 193, 201, 211} +var _OrderStart_index = [...]uint8{0, 7, 18, 28, 36, 41, 54, 62, 74, 81, 91, 105, 118, 130, 139, 148, 165, 182, 190, 198, 208} func (i OrderStart) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_OrderStart_index)-1 { + if i < 0 || i >= OrderStart(len(_OrderStart_index)-1) { return "OrderStart(" + strconv.FormatInt(int64(i), 10) + ")" } - return _OrderStart_name[_OrderStart_index[idx]:_OrderStart_index[idx+1]] + return _OrderStart_name[_OrderStart_index[i]:_OrderStart_index[i+1]] } diff --git a/app/lifecycle/orderstop_string.go b/app/lifecycle/orderstop_string.go index 97b41a26fa..6cf1f1c103 100644 --- a/app/lifecycle/orderstop_string.go +++ b/app/lifecycle/orderstop_string.go @@ -28,9 +28,8 @@ const _OrderStop_name = "SchedulerPrivkeyLockRetryerDutyDBBeaconMockValidatorAPI var _OrderStop_index = [...]uint8{0, 9, 20, 27, 33, 43, 55, 62, 71, 78, 86, 99} func (i OrderStop) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_OrderStop_index)-1 { + if i < 0 || i >= OrderStop(len(_OrderStop_index)-1) { return "OrderStop(" + strconv.FormatInt(int64(i), 10) + ")" } - return _OrderStop_name[_OrderStop_index[idx]:_OrderStop_index[idx+1]] + return _OrderStop_name[_OrderStop_index[i]:_OrderStop_index[i+1]] } diff --git a/cmd/run.go b/cmd/run.go index 3b3f07c697..c7709033c7 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -38,6 +38,9 @@ func newRunCmd(runFunc func(context.Context, app.Config) error, unsafe bool) *co libp2plog.SetPrimaryCore(log.LoggerCore()) // Set libp2p logger to use charon logger + _ = libp2plog.SetLogLevel("p2p-holepunch", "debug") + _ = libp2plog.SetLogLevel("observedaddrs", "debug") + printLicense(cmd.Context()) printFlags(cmd.Context(), cmd.Flags()) diff --git a/p2p/p2p.go b/p2p/p2p.go index 999ff7685b..9db8c60166 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" //nolint:revive // Must be imported with alias "github.com/libp2p/go-libp2p/p2p/transport/tcp" ma "github.com/multiformats/go-multiaddr" @@ -106,6 +107,7 @@ func NewNode(ctx context.Context, cfg Config, key *k1.PrivateKey, connGater Conn libp2p.ConnectionGater(connGater), // Enable Autonat (required for hole punching) libp2p.EnableNATService(), + libp2p.EnableHolePunching(holepunch.WithTracer(newHolePunchTracer(ctx))), libp2p.AddrsFactory(func(internalAddrs []ma.Multiaddr) []ma.Multiaddr { return filterAdvertisedAddrs(externalAddrs, internalAddrs, filterPrivateAddrs) }), @@ -258,7 +260,13 @@ func multiAddrsViaRelay(relayPeer Peer, peerID peer.ID) ([]ma.Multiaddr, error) // NewEventCollector returns a lifecycle hook that instruments libp2p events. func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx { return func(ctx context.Context) { - sub, err := p2pNode.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + sub, err := p2pNode.EventBus().Subscribe([]any{ + new(event.EvtLocalReachabilityChanged), + new(event.EvtLocalAddressesUpdated), + new(event.EvtNATDeviceTypeChanged), + new(event.EvtPeerIdentificationCompleted), + new(event.EvtPeerIdentificationFailed), + }) if err != nil { log.Error(ctx, "Failed to subscribe to libp2p events", err) return @@ -277,6 +285,41 @@ func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx { case event.EvtLocalReachabilityChanged: log.Info(ctx, "Libp2p reachability changed", z.Any("status", evt.Reachability)) reachableGauge.Set(float64(evt.Reachability)) + case event.EvtLocalAddressesUpdated: + var addrs []string + + for _, a := range evt.Current { + if a.Action == event.Added { + addrs = append(addrs, a.Address.String()) + } + } + + if len(addrs) > 0 { + log.Debug(ctx, "Libp2p addresses updated, new addresses added", + z.Any("added", addrs), + z.Any("all_host_addrs", p2pNode.Addrs()), + ) + } + case event.EvtNATDeviceTypeChanged: + log.Debug(ctx, "NAT device type changed", + z.Any("transport", evt.TransportProtocol), + z.Any("nat_type", evt.NatDeviceType), + ) + case event.EvtPeerIdentificationCompleted: + isPublic := evt.ObservedAddr != nil && manet.IsPublicAddr(evt.ObservedAddr) + supportsDCUtR := slices.Contains(evt.Protocols, holepunch.Protocol) + log.Debug(ctx, "Peer identification completed", + z.Str("peer", PeerName(evt.Peer)), + z.Any("observed_addr", evt.ObservedAddr), + z.Any("conn_local", evt.Conn.LocalMultiaddr()), + z.Any("conn_remote", evt.Conn.RemoteMultiaddr()), + z.Bool("observed_is_public", isPublic), + z.Bool("peer_supports_dcutr", supportsDCUtR), + ) + case event.EvtPeerIdentificationFailed: + log.Warn(ctx, "Peer identification failed", evt.Reason, + z.Str("peer", PeerName(evt.Peer)), + ) default: log.Warn(ctx, "Unknown libp2p event", nil, z.Str("type", fmt.Sprintf("%T", e))) } @@ -285,43 +328,70 @@ func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx { } } -// peerRoutingFunc wraps a function to implement routing.PeerRouting. -type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error) - -func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { - return f(ctx, p) +// holePunchTracer implements holepunch.EventTracer to log all DCUtR lifecycle events. +type holePunchTracer struct { + ctx context.Context } -// ForceDirectConnections attempts to establish a direct connection if there is an existing relay connection to the peer. -// The idea is to enable switching to a direct connection as soon as the host has a connection to the peer. -func ForceDirectConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx { - forceDirectConn := func(ctx context.Context) { - for _, p := range peerIDs { - if p2pNode.ID() == p { - continue // Skip self - } - - conns := p2pNode.Network().ConnsToPeer(p) - if len(conns) == 0 { - // Skip if there isn't any existing connection to peer. Note that we only force direct connection - // if there is already an existing relay connection between the host and peer. - continue - } - - if isDirectConnAvailable(conns) { - continue - } +func newHolePunchTracer(ctx context.Context) *holePunchTracer { + return &holePunchTracer{ctx: log.WithTopic(ctx, "p2p")} +} - // All existing connections are through relays, so we can try force dialing a direct connection. - err := p2pNode.Connect(network.WithForceDirectDial(ctx, "relay_to_direct"), peer.AddrInfo{ID: p}) - if err == nil { - log.Debug(ctx, "Forced direct connection to peer successful", z.Str("peer", PeerName(p))) - } +func (t *holePunchTracer) Trace(evt *holepunch.Event) { + name := PeerName(evt.Remote) + switch e := evt.Evt.(type) { + case *holepunch.StartHolePunchEvt: + log.Debug(t.ctx, "Hole punch started", + z.Str("peer", name), + z.Any("remote_addrs", e.RemoteAddrs), + z.Any("rtt", e.RTT), + ) + case *holepunch.EndHolePunchEvt: + if e.Success { + log.Debug(t.ctx, "Hole punch succeeded", + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + ) + } else { + log.Warn(t.ctx, "Hole punch failed", errors.New(e.Error), + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + ) + } + case *holepunch.HolePunchAttemptEvt: + log.Debug(t.ctx, "Hole punch attempt", + z.Str("peer", name), + z.Int("attempt", e.Attempt), + ) + case *holepunch.DirectDialEvt: + if e.Success { + log.Debug(t.ctx, "Direct dial succeeded during hole punch", + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + ) + } else { + log.Debug(t.ctx, "Direct dial failed during hole punch", + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + z.Str("error", e.Error), + ) } + case *holepunch.ProtocolErrorEvt: + log.Warn(t.ctx, "Hole punch protocol error", errors.New(e.Error), + z.Str("peer", name), + ) + default: + log.Warn(t.ctx, "Unknown hole punch event", nil, z.Str("type", fmt.Sprintf("%T", evt.Evt))) } +} +// NewPeerStateDiagnostic returns a lifecycle hook that periodically logs the connection +// and peerstore address state for each cluster peer to help diagnose hole punching issues. +func NewPeerStateDiagnostic(p2pNode host.Host, peers []peer.ID) lifecycle.HookFuncCtx { return func(ctx context.Context) { - ticker := time.NewTicker(1 * time.Minute) + ctx = log.WithTopic(ctx, "p2p") + + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { @@ -329,12 +399,59 @@ func ForceDirectConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.Hook case <-ctx.Done(): return case <-ticker.C: - forceDirectConn(ctx) + } + + selfID := p2pNode.ID() + for _, pID := range peers { + if pID == selfID { + continue + } + + name := PeerName(pID) + + var relayConns, directConns int + + for _, conn := range p2pNode.Network().ConnsToPeer(pID) { + if isRelayAddr(conn.RemoteMultiaddr()) { + relayConns++ + } else { + directConns++ + } + } + + var publicAddrs, privateAddrs, relayAddrs int + + for _, addr := range p2pNode.Peerstore().Addrs(pID) { + switch { + case isRelayAddr(addr): + relayAddrs++ + case manet.IsPublicAddr(addr): + publicAddrs++ + default: + privateAddrs++ + } + } + + log.Debug(ctx, "Peer connection state", + z.Str("peer", name), + z.Int("relay_conns", relayConns), + z.Int("direct_conns", directConns), + z.Int("peerstore_public_addrs", publicAddrs), + z.Int("peerstore_private_addrs", privateAddrs), + z.Int("peerstore_relay_addrs", relayAddrs), + ) } } } } +// peerRoutingFunc wraps a function to implement routing.PeerRouting. +type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error) + +func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { + return f(ctx, p) +} + // isQUICEnabled returns true if the host has an address or listening address on QUIC. func isQUICEnabled(h host.Host) bool { if slices.ContainsFunc(h.Network().ListenAddresses(), isQUICAddr) { @@ -350,19 +467,6 @@ func isQUICEnabled(h host.Host) bool { return false } -// isDirectConnAvailable returns true if direct connection is available in the given set of connections. -func isDirectConnAvailable(conns []network.Conn) bool { - for _, conn := range conns { - if IsRelayAddr(conn.RemoteMultiaddr()) { - continue - } - - return true - } - - return false -} - // UpgradeToQUICConnections tries to upgrade a direct TCP connection to a direct QUIC connection // if there is known QUIC addresses from the peerstore. func UpgradeToQUICConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx { @@ -457,7 +561,7 @@ func UpgradeToQUICConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.Ho if !hasDirectTCPConn(conns) { log.Debug(ctx, "No direct connection via TCP to peer", z.Str("peer", PeerName(p)), z.Any("conns", conns)) - continue // no direct TPC connection to upgrade to QUIC, ForceDirectConnections shall upgrade to direct + continue // no direct TCP connection to upgrade to QUIC, hole punching shall upgrade to direct } // Get known QUIC addrs from peerstore @@ -733,6 +837,13 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs [] z.Any("direction", e.Direction), z.Str("type", typ), ) + + if typ == addrTypeRelay && e.Direction == network.DirInbound { + log.Debug(ctx, "Inbound relay connection detected, DCUtR hole punch should initiate", + z.Str("peer", name), + z.Any("peer_address", addr), + ) + } } else if e.Disconnect { log.Debug(ctx, "Libp2p disconnected", z.Str("peer", name), diff --git a/p2p/relay.go b/p2p/relay.go index 95a0b9cb80..5dd1fae3ea 100644 --- a/p2p/relay.go +++ b/p2p/relay.go @@ -105,6 +105,15 @@ func NewRelayReserver(p2pNode host.Host, relay *MutablePeer) lifecycle.HookFuncC // NewRelayRouter returns a life cycle hook that routes peers via relays in libp2p by // continuously adding peer relay addresses to libp2p peer store. +// +// Only relay routes for peers that THIS node should dial are added. For each +// peer pair, the node with the smaller peer ID dials and the node with the +// larger peer ID waits. This asymmetry is required for DCUtR (hole punching) +// to work: libp2p's holepunch service only initiates the DCUtR protocol on +// INBOUND relay connections. If both sides add relay routes and dial +// simultaneously, both see outbound connections and neither side triggers +// DCUtR. By having one side wait, it sees the other's dial as an inbound +// relay connection, which activates DCUtR and enables NAT traversal. func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) lifecycle.HookFuncCtx { return func(ctx context.Context) { if len(relays) == 0 { @@ -113,13 +122,38 @@ func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) l ctx = log.WithTopic(ctx, "p2p") + selfID := p2pNode.ID() + + for _, pID := range peers { + if pID == selfID { + continue + } + + if shouldDialPeer(selfID, pID) { + log.Debug(ctx, "Will dial peer via relay (our ID < peer ID)", + z.Str("peer", PeerName(pID)), + z.Str("self_id", selfID.String()), + z.Str("peer_id", pID.String()), + ) + } else { + log.Debug(ctx, "Will wait for inbound relay connection from peer (our ID > peer ID), expecting DCUtR hole punch", + z.Str("peer", PeerName(pID)), + z.Str("self_id", selfID.String()), + z.Str("peer_id", pID.String()), + ) + } + } + ticker := time.NewTicker(routedAddrTTL * 9 / 10) defer ticker.Stop() for { for _, pID := range peers { - if pID == p2pNode.ID() { - // Skip self + if pID == selfID { + continue + } + + if !shouldDialPeer(selfID, pID) { continue } @@ -147,3 +181,10 @@ func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) l } } } + +// shouldDialPeer returns true if this node should proactively dial the given +// peer via relay. The peer with the smaller ID dials; the peer with the larger +// ID waits for the inbound connection (which triggers DCUtR hole punching). +func shouldDialPeer(self, remote peer.ID) bool { + return self < remote +}