Skip to content

Commit

Permalink
Try #4775:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Aug 7, 2023
2 parents 6a4208b + 6717dae commit f4e0fee
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 24 deletions.
45 changes: 45 additions & 0 deletions p2p/gater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package p2p

import (
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

var _ connmgr.ConnectionGater = (*gater)(nil)

type gater struct {
h host.Host
inbound, outbound int
direct map[peer.ID]struct{}
}

func (g *gater) updateHost(h host.Host) {
g.h = h
}

func (g *gater) InterceptPeerDial(pid peer.ID) bool {
if _, exist := g.direct[pid]; exist {
return true
}
return len(g.h.Network().Peers()) <= g.outbound
}

func (*gater) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) bool {
return true
}

func (g *gater) InterceptAccept(n network.ConnMultiaddrs) bool {
return len(g.h.Network().Peers()) <= g.inbound
}

func (*gater) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) bool {
return true
}

func (*gater) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}
71 changes: 47 additions & 24 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func DefaultConfig() Config {
MinPeers: 20,
LowPeers: 40,
HighPeers: 100,
AutoscalePeers: true,
GracePeersShutdown: 30 * time.Second,
MaxMessageSize: 2 << 20,
AcceptQueue: tptu.AcceptQueueLength,
Expand All @@ -58,26 +59,28 @@ type Config struct {
MaxMessageSize int

// see https://lwn.net/Articles/542629/ for reuseport explanation
DisableReusePort bool `mapstructure:"disable-reuseport"`
DisableNatPort bool `mapstructure:"disable-natport"`
DisableDHT bool `mapstructure:"disable-dht"`
Flood bool `mapstructure:"flood"`
Listen string `mapstructure:"listen"`
Bootnodes []string `mapstructure:"bootnodes"`
Direct []string `mapstructure:"direct"`
MinPeers int `mapstructure:"min-peers"`
LowPeers int `mapstructure:"low-peers"`
HighPeers int `mapstructure:"high-peers"`
AutoscalePeers bool `mapstructure:"autoscale-peers"`
AdvertiseAddress string `mapstructure:"advertise-address"`
AcceptQueue int `mapstructure:"p2p-accept-queue"`
Metrics bool `mapstructure:"p2p-metrics"`
Bootnode bool `mapstructure:"p2p-bootnode"`
ForceReachability string `mapstructure:"p2p-reachability"`
EnableHolepunching bool `mapstructure:"p2p-holepunching"`
DisableLegacyDiscovery bool `mapstructure:"p2p-disable-legacy-discovery"`
PrivateNetwork bool `mapstructure:"p2p-private-network"`
RelayServer RelayServer `mapstructure:"relay-server"`
DisableReusePort bool `mapstructure:"disable-reuseport"`
DisableNatPort bool `mapstructure:"disable-natport"`
DisableConnectionManager bool `mapstructure:"disable-connection-manager"`
DisableResourceManager bool `mapstructure:"disable-resource-manager"`
DisableDHT bool `mapstructure:"disable-dht"`
Flood bool `mapstructure:"flood"`
Listen string `mapstructure:"listen"`
Bootnodes []string `mapstructure:"bootnodes"`
Direct []string `mapstructure:"direct"`
MinPeers int `mapstructure:"min-peers"`
LowPeers int `mapstructure:"low-peers"`
HighPeers int `mapstructure:"high-peers"`
AutoscalePeers bool `mapstructure:"autoscale-peers"`
AdvertiseAddress string `mapstructure:"advertise-address"`
AcceptQueue int `mapstructure:"p2p-accept-queue"`
Metrics bool `mapstructure:"p2p-metrics"`
Bootnode bool `mapstructure:"p2p-bootnode"`
ForceReachability string `mapstructure:"p2p-reachability"`
EnableHolepunching bool `mapstructure:"p2p-holepunching"`
DisableLegacyDiscovery bool `mapstructure:"p2p-disable-legacy-discovery"`
PrivateNetwork bool `mapstructure:"p2p-private-network"`
RelayServer RelayServer `mapstructure:"relay-server"`
}

type RelayServer struct {
Expand Down Expand Up @@ -125,6 +128,20 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
if err != nil {
return nil, fmt.Errorf("can't create peer store: %w", err)
}
// leaves a small room for outbound connections in order to
// reduce risk of network isolation
g := &gater{
inbound: int(float64(cfg.HighPeers) * 0.8),
outbound: int(float64(cfg.HighPeers) * 1.1),
direct: map[peer.ID]struct{}{},
}
direct, err := parseIntoAddr(cfg.Direct)
if err != nil {
return nil, err
}
for _, pid := range direct {
g.direct[pid.ID] = struct{}{}
}
lopts := []libp2p.Option{
libp2p.Identity(key),
libp2p.ListenAddrStrings(cfg.Listen),
Expand All @@ -147,10 +164,14 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
return tp.WithSessionOptions(noise.Prologue(prologue))
}),
libp2p.Muxer("/yamux/1.0.0", &streamer),
libp2p.ConnectionManager(cm),
libp2p.Peerstore(ps),
libp2p.BandwidthReporter(p2pmetrics.NewBandwidthCollector()),
libp2p.EnableNATService(),
libp2p.ConnectionGater(g),
libp2p.Ping(false),
}
if !cfg.DisableConnectionManager {
lopts = append(lopts, libp2p.ConnectionManager(cm))
}
if len(cfg.AdvertiseAddress) > 0 {
addr, err := multiaddr.NewMultiaddr(cfg.AdvertiseAddress)
Expand Down Expand Up @@ -194,6 +215,7 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
if err != nil {
return nil, fmt.Errorf("failed to initialize libp2p host: %w", err)
}
g.updateHost(h)
h.Network().Notify(p2pmetrics.NewConnectionsMeeter())

logger.Zap().Info("local node identity", zap.Stringer("identity", h.ID()))
Expand All @@ -214,7 +236,7 @@ func setupResourcesManager(hostcfg Config) func(cfg *libp2p.Config) error {
limits := rcmgr.DefaultLimits
limits.SystemBaseLimit.ConnsInbound = highPeers
limits.SystemBaseLimit.ConnsOutbound = highPeers
limits.SystemBaseLimit.Conns = highPeers + hostcfg.MinPeers
limits.SystemBaseLimit.Conns = 2 * highPeers
limits.SystemBaseLimit.FD = 2 * highPeers
limits.SystemBaseLimit.StreamsInbound = 8 * highPeers
limits.SystemBaseLimit.StreamsOutbound = 8 * highPeers
Expand All @@ -223,8 +245,6 @@ func setupResourcesManager(hostcfg Config) func(cfg *libp2p.Config) error {
limits.ServiceBaseLimit.StreamsOutbound = 8 * highPeers
limits.ServiceBaseLimit.Streams = 16 * highPeers

limits.TransientBaseLimit.Conns = hostcfg.MinPeers

limits.ProtocolBaseLimit.StreamsInbound = 8 * highPeers
limits.ProtocolBaseLimit.StreamsOutbound = 8 * highPeers
limits.ProtocolBaseLimit.Streams = 16 * highPeers
Expand All @@ -234,6 +254,9 @@ func setupResourcesManager(hostcfg Config) func(cfg *libp2p.Config) error {
if !hostcfg.AutoscalePeers {
concrete = limits.Scale(0, 0)
}
if hostcfg.DisableResourceManager {
concrete = rcmgr.InfiniteLimits
}
mgr, err := rcmgr.NewResourceManager(
rcmgr.NewFixedLimiter(concrete),
rcmgr.WithTraceReporter(str),
Expand Down

0 comments on commit f4e0fee

Please sign in to comment.