Skip to content

Commit

Permalink
Try #4769:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Aug 4, 2023
2 parents 366da6d + 7066feb commit 419382f
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 16 deletions.
91 changes: 91 additions & 0 deletions p2p/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
P2P
===

### Direct connections

If you are running multiple nodes in the local network and exposing all of them
is not convenient there is an option to setup network manually, by making
couple of nodes publicly available and the rest connected to them directly.

### Get network id
- get it with grpcurl

> grpcurl -plaintext 127.0.0.1:9092 spacemesh.v1.DebugService.NetworkInfo
```json
{
"id": "12D3KooWRfy4Sj4rDHDuBaYw3Mg5d2puwiCyqBCWMziFquaGQ5g8"
}
```

- get it from stored data

> cat ~/spacemesh/p2p/p2p.key
```json
{"Key":"CAESQAQN38GXvr+L+G+/JWoimpqPBK7I6INe+PKYA+hRJg0I65Q3IPK49Ii9dcnC3+UqB+jMEL16sqDfUubxTs62rZU=","ID":"12D3KooWRfy4Sj4rDHDuBaYw3Mg5d2puwiCyqBCWMziFquaGQ5g8"}
```

### Configuration for public node

Public node should have higher peer limits to help with network connectivity
and a list of botnodes. Direct connections should be reciprocal, otherwise
public node may prune your private node if overloaded.

Setup more than one public node, and perform rolling upgrades or restart
for them if needed.

```json
{
"p2p": {
"min-peers": 30,
"low-peers": 60,
"high-peers": 100,
"p2p-disable-legacy-discovery": true,
"direct": [
"/ip4/0.0.0.0/tcp/6000/p2p/12D3KooWRkBh6QayKLb1pDRJGMHE94Lix4ZBVh2BJJeX6mghk8VH"
],
"bootnodes": [
"/dns4/mainnet-bootnode-10.spacemesh.network/tcp/5000/p2p/12D3KooWHK5m83sNj2eNMJMGAngcS9gBja27ho83t79Q2CD4iRjQ",
"/dns4/mainnet-bootnode-11.spacemesh.network/tcp/5000/p2p/12D3KooWFrCDS8tc29nxJEYf4sKFXhXw7wMSdhQP4S7tsbfh6ngn"
]
}
}
```

### Configuration for private node

Set min-peers to the number of peers in the config and disable-dht.
low-peers and high-peers should not be lower than min-peers.

```json
{
"p2p": {
"listen": "/ip4/0.0.0.0/tcp/6000",
"min-peers": 1,
"low-peers": 10,
"high-peers": 20,
"p2p-disable-legacy-discovery": true,
"disable-dht": true,
"bootnodes": [],
"direct": [
"/ip4/0.0.0.0/tcp/7513/p2p/12D3KooWRfy4Sj4rDHDuBaYw3Mg5d2puwiCyqBCWMziFquaGQ5g8"
]
}
}
```

#### Expected result

Public node will maintain many open connections

> ss -npO4 | rg spacemesh | rg 7513 | rg ESTAB | wc -l
> 52
Private will connect only to the specified public node:

> ss -npO4 | rg spacemesh | rg 6000
```
tcp ESTAB 0 0 127.0.0.1:7513 127.0.0.1:6000 users:(("go-spacemesh",pid=39165,fd=11))
tcp ESTAB 0 0 127.0.0.1:6000 127.0.0.1:7513 users:(("go-spacemesh",pid=39202,fd=47))
```
58 changes: 45 additions & 13 deletions p2p/dhtdiscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func WithBackup(backup []peer.AddrInfo) Opt {
}
}

func WithDirect(direct []peer.AddrInfo) Opt {
return func(d *Discovery) {
d.direct = direct
}
}

func WithLogger(logger *zap.Logger) Opt {
return func(d *Discovery) {
d.logger = logger
Expand All @@ -84,6 +90,12 @@ func WithDir(path string) Opt {
}
}

func DisableDHT() Opt {
return func(d *Discovery) {
d.disableDht = true
}
}

func New(h host.Host, opts ...Opt) (*Discovery, error) {
ctx, cancel := context.WithCancel(context.Background())
d := Discovery{
Expand All @@ -104,18 +116,21 @@ func New(h host.Host, opts ...Opt) (*Discovery, error) {
if len(d.bootnodes) == 0 {
d.logger.Warn("no bootnodes in the config")
}
dht, err := newDht(ctx, h, d.public, d.server, d.dir)
if err != nil {
return nil, err
if !d.disableDht {
dht, err := newDht(ctx, h, d.public, d.server, d.dir)
if err != nil {
return nil, err
}
d.dht = dht
}
d.dht = dht
return &d, nil
}

type Discovery struct {
public bool
server bool
dir string
public bool
server bool
disableDht bool
dir string

logger *zap.Logger
eg errgroup.Group
Expand All @@ -128,13 +143,17 @@ type Discovery struct {
// how often to check if we have enough peers
period time.Duration
// timeout used for connections
timeout time.Duration
bootstrapDuration time.Duration
minPeers, highPeers int
backup, bootnodes []peer.AddrInfo
timeout time.Duration
bootstrapDuration time.Duration
minPeers, highPeers int
direct, backup, bootnodes []peer.AddrInfo
}

func (d *Discovery) Start() {
direct := map[peer.ID]struct{}{}
for _, peer := range d.direct {
direct[peer.ID] = struct{}{}
}
d.eg.Go(func() error {
var connEg errgroup.Group
disconnected := make(chan struct{}, 1)
Expand Down Expand Up @@ -166,6 +185,10 @@ func (d *Discovery) Start() {
)
if connected >= d.highPeers {
for _, boot := range d.bootnodes {
// preserve connection is bootnode is in direct peers
if _, exist := direct[boot.ID]; exist {
continue
}
if err := d.h.Network().ClosePeer(boot.ID); err != nil {
d.logger.Warn("failed to close bootnode connection",
zap.Stringer("address", boot), zap.Error(err))
Expand All @@ -188,12 +211,17 @@ func (d *Discovery) Start() {
func (d *Discovery) Stop() {
d.cancel()
d.eg.Wait()
if err := d.dht.Close(); err != nil {
d.logger.Error("error closing dht", zap.Error(err))
if d.dht != nil {
if err := d.dht.Close(); err != nil {
d.logger.Error("error closing dht", zap.Error(err))
}
}
}

func (d *Discovery) bootstrap() {
if d.dht == nil {
return
}
ctx, cancel := context.WithTimeout(d.ctx, d.bootstrapDuration)
defer cancel()
if err := d.dht.Bootstrap(ctx); err != nil {
Expand All @@ -207,6 +235,10 @@ func (d *Discovery) connect(eg *errgroup.Group, nodes []peer.AddrInfo) {
defer cancel()
for _, boot := range nodes {
boot := boot
if boot.ID == d.h.ID() {
d.logger.Debug("not dialing self")
continue
}
eg.Go(func() error {
if err := d.h.Connect(ctx, boot); err != nil {
d.logger.Warn("failed to connect",
Expand Down
2 changes: 2 additions & 0 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ type Config struct {
// see https://lwn.net/Articles/542629/ for reuseport explanation
DisableReusePort bool `mapstructure:"disable-reuseport"`
DisableNatPort bool `mapstructure:"disable-natport"`
DisableDHT bool `mapstructure:"disable-dht"`
Flood bool `mapstructure:"flood"`
Listen string `mapstructure:"listen"`
Bootnodes []string `mapstructure:"bootnodes"`
Direct []string `mapstructure:"direct"`
MinPeers int `mapstructure:"min-peers"`
LowPeers int `mapstructure:"low-peers"`
HighPeers int `mapstructure:"high-peers"`
Expand Down
9 changes: 6 additions & 3 deletions p2p/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ func DefaultConfig() Config {

// Config for PubSub.
type Config struct {
Flood bool
IsBootnode bool
Bootnodes []peer.AddrInfo
Flood bool
IsBootnode bool
Bootnodes []peer.AddrInfo
// Direct peers should be configured on both ends.
Direct []peer.AddrInfo
MaxMessageSize int
}

Expand Down Expand Up @@ -182,6 +184,7 @@ func getOptions(cfg Config) []pubsub.Option {
options := []pubsub.Option{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(cfg.Flood),
pubsub.WithDirectPeers(cfg.Direct),
pubsub.WithMessageIdFn(msgID),
pubsub.WithNoAuthor(),
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
Expand Down
12 changes: 12 additions & 0 deletions p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,17 @@ func Upgrade(h host.Host, opts ...Opt) (*Host, error) {
if err != nil {
return nil, err
}
direct, err := parseIntoAddr(fh.cfg.Direct)
if err != nil {
return nil, err
}
for _, peer := range direct {
h.ConnManager().Protect(peer.ID, "direct")
}
if fh.PubSub, err = pubsub.New(fh.ctx, fh.logger, h, pubsub.Config{
Flood: cfg.Flood,
IsBootnode: cfg.Bootnode,
Direct: direct,
Bootnodes: bootnodes,
MaxMessageSize: cfg.MaxMessageSize,
}); err != nil {
Expand All @@ -119,11 +127,15 @@ func Upgrade(h host.Host, opts ...Opt) (*Host, error) {
discovery.WithHighPeers(cfg.HighPeers),
discovery.WithDir(cfg.DataDir),
discovery.WithBootnodes(bootnodes),
discovery.WithDirect(direct),
discovery.WithLogger(fh.logger.Zap()),
}
if cfg.PrivateNetwork {
dopts = append(dopts, discovery.Private())
}
if cfg.DisableDHT {
dopts = append(dopts, discovery.DisableDHT())
}
if cfg.Bootnode {
dopts = append(dopts, discovery.Server())
} else {
Expand Down

0 comments on commit 419382f

Please sign in to comment.