Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - p2p: add configuration for direct peers #4769

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 91 additions & 0 deletions p2p/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
P2P
===

### Direct connections

If you are running multiple nodes in the local network and exposing all of them
is not convenient there is an option to setup network manually, by making
couple of nodes publicly available and the rest connected to them directly.

### Get network id
- get it with grpcurl

> grpcurl -plaintext 127.0.0.1:9092 spacemesh.v1.DebugService.NetworkInfo

```json
{
"id": "12D3KooWRfy4Sj4rDHDuBaYw3Mg5d2puwiCyqBCWMziFquaGQ5g8"
}
```

- get it from stored data

> cat ~/spacemesh/p2p/p2p.key
```json
{"Key":"CAESQAQN38GXvr+L+G+/JWoimpqPBK7I6INe+PKYA+hRJg0I65Q3IPK49Ii9dcnC3+UqB+jMEL16sqDfUubxTs62rZU=","ID":"12D3KooWRfy4Sj4rDHDuBaYw3Mg5d2puwiCyqBCWMziFquaGQ5g8"}
```

### Configuration for public node

Public node should have higher peer limits to help with network connectivity
and a list of botnodes. Direct connections should be reciprocal, otherwise
public node may prune your private node if overloaded.

Setup more than one public node, and perform rolling upgrades or restart
for them if needed.

```json
{
"p2p": {
"min-peers": 30,
"low-peers": 60,
"high-peers": 100,
"p2p-disable-legacy-discovery": true,
"direct": [
"/ip4/0.0.0.0/tcp/6000/p2p/12D3KooWRkBh6QayKLb1pDRJGMHE94Lix4ZBVh2BJJeX6mghk8VH"
],
"bootnodes": [
"/dns4/mainnet-bootnode-10.spacemesh.network/tcp/5000/p2p/12D3KooWHK5m83sNj2eNMJMGAngcS9gBja27ho83t79Q2CD4iRjQ",
"/dns4/mainnet-bootnode-11.spacemesh.network/tcp/5000/p2p/12D3KooWFrCDS8tc29nxJEYf4sKFXhXw7wMSdhQP4S7tsbfh6ngn"
]
}
}
```

### Configuration for private node

Set min-peers to the number of peers in the config and disable-dht.
low-peers and high-peers should not be lower than min-peers.

```json
{
"p2p": {
"listen": "/ip4/0.0.0.0/tcp/6000",
"min-peers": 1,
"low-peers": 10,
"high-peers": 20,
"p2p-disable-legacy-discovery": true,
"disable-dht": true,
"bootnodes": [],
"direct": [
"/ip4/0.0.0.0/tcp/7513/p2p/12D3KooWRfy4Sj4rDHDuBaYw3Mg5d2puwiCyqBCWMziFquaGQ5g8"
]
}
}
```

#### Expected result

Public node will maintain many open connections

> ss -npO4 | rg spacemesh | rg 7513 | rg ESTAB | wc -l
> 52

Private will connect only to the specified public node:

> ss -npO4 | rg spacemesh | rg 6000

```
tcp ESTAB 0 0 127.0.0.1:7513 127.0.0.1:6000 users:(("go-spacemesh",pid=39165,fd=11))
tcp ESTAB 0 0 127.0.0.1:6000 127.0.0.1:7513 users:(("go-spacemesh",pid=39202,fd=47))
```
58 changes: 45 additions & 13 deletions p2p/dhtdiscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
}
}

func WithDirect(direct []peer.AddrInfo) Opt {
return func(d *Discovery) {
d.direct = direct
}
}

func WithLogger(logger *zap.Logger) Opt {
return func(d *Discovery) {
d.logger = logger
Expand All @@ -84,6 +90,12 @@
}
}

func DisableDHT() Opt {
return func(d *Discovery) {
d.disableDht = true
}

Check warning on line 96 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L93-L96

Added lines #L93 - L96 were not covered by tests
}

func New(h host.Host, opts ...Opt) (*Discovery, error) {
ctx, cancel := context.WithCancel(context.Background())
d := Discovery{
Expand All @@ -104,18 +116,21 @@
if len(d.bootnodes) == 0 {
d.logger.Warn("no bootnodes in the config")
}
dht, err := newDht(ctx, h, d.public, d.server, d.dir)
if err != nil {
return nil, err
if !d.disableDht {
dht, err := newDht(ctx, h, d.public, d.server, d.dir)
if err != nil {
return nil, err
}

Check warning on line 123 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L122-L123

Added lines #L122 - L123 were not covered by tests
d.dht = dht
}
d.dht = dht
return &d, nil
}

type Discovery struct {
public bool
server bool
dir string
public bool
server bool
disableDht bool
dir string

logger *zap.Logger
eg errgroup.Group
Expand All @@ -128,13 +143,17 @@
// how often to check if we have enough peers
period time.Duration
// timeout used for connections
timeout time.Duration
bootstrapDuration time.Duration
minPeers, highPeers int
backup, bootnodes []peer.AddrInfo
timeout time.Duration
bootstrapDuration time.Duration
minPeers, highPeers int
direct, backup, bootnodes []peer.AddrInfo
}

func (d *Discovery) Start() {
direct := map[peer.ID]struct{}{}
for _, peer := range d.direct {
direct[peer.ID] = struct{}{}
}

Check warning on line 156 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L155-L156

Added lines #L155 - L156 were not covered by tests
d.eg.Go(func() error {
var connEg errgroup.Group
disconnected := make(chan struct{}, 1)
Expand Down Expand Up @@ -166,6 +185,10 @@
)
if connected >= d.highPeers {
for _, boot := range d.bootnodes {
// preserve connection is bootnode is in direct peers
if _, exist := direct[boot.ID]; exist {
continue

Check warning on line 190 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L188-L190

Added lines #L188 - L190 were not covered by tests
}
if err := d.h.Network().ClosePeer(boot.ID); err != nil {
d.logger.Warn("failed to close bootnode connection",
zap.Stringer("address", boot), zap.Error(err))
Expand All @@ -188,12 +211,17 @@
func (d *Discovery) Stop() {
d.cancel()
d.eg.Wait()
if err := d.dht.Close(); err != nil {
d.logger.Error("error closing dht", zap.Error(err))
if d.dht != nil {
if err := d.dht.Close(); err != nil {
d.logger.Error("error closing dht", zap.Error(err))
}

Check warning on line 217 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L216-L217

Added lines #L216 - L217 were not covered by tests
}
}

func (d *Discovery) bootstrap() {
if d.dht == nil {
return
}

Check warning on line 224 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L223-L224

Added lines #L223 - L224 were not covered by tests
ctx, cancel := context.WithTimeout(d.ctx, d.bootstrapDuration)
defer cancel()
if err := d.dht.Bootstrap(ctx); err != nil {
Expand All @@ -207,6 +235,10 @@
defer cancel()
for _, boot := range nodes {
boot := boot
if boot.ID == d.h.ID() {
d.logger.Debug("not dialing self")
continue

Check warning on line 240 in p2p/dhtdiscovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

p2p/dhtdiscovery/discovery.go#L239-L240

Added lines #L239 - L240 were not covered by tests
}
eg.Go(func() error {
if err := d.h.Connect(ctx, boot); err != nil {
d.logger.Warn("failed to connect",
Expand Down
2 changes: 2 additions & 0 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ type Config struct {
// see https://lwn.net/Articles/542629/ for reuseport explanation
DisableReusePort bool `mapstructure:"disable-reuseport"`
DisableNatPort bool `mapstructure:"disable-natport"`
DisableDHT bool `mapstructure:"disable-dht"`
Flood bool `mapstructure:"flood"`
Listen string `mapstructure:"listen"`
Bootnodes []string `mapstructure:"bootnodes"`
Direct []string `mapstructure:"direct"`
MinPeers int `mapstructure:"min-peers"`
LowPeers int `mapstructure:"low-peers"`
HighPeers int `mapstructure:"high-peers"`
Expand Down
9 changes: 6 additions & 3 deletions p2p/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ func DefaultConfig() Config {

// Config for PubSub.
type Config struct {
Flood bool
IsBootnode bool
Bootnodes []peer.AddrInfo
Flood bool
IsBootnode bool
Bootnodes []peer.AddrInfo
// Direct peers should be configured on both ends.
Direct []peer.AddrInfo
MaxMessageSize int
}

Expand Down Expand Up @@ -182,6 +184,7 @@ func getOptions(cfg Config) []pubsub.Option {
options := []pubsub.Option{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(cfg.Flood),
pubsub.WithDirectPeers(cfg.Direct),
pubsub.WithMessageIdFn(msgID),
pubsub.WithNoAuthor(),
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
Expand Down
12 changes: 12 additions & 0 deletions p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,17 @@
if err != nil {
return nil, err
}
direct, err := parseIntoAddr(fh.cfg.Direct)
if err != nil {
return nil, err
}

Check warning on line 99 in p2p/upgrade.go

View check run for this annotation

Codecov / codecov/patch

p2p/upgrade.go#L98-L99

Added lines #L98 - L99 were not covered by tests
for _, peer := range direct {
h.ConnManager().Protect(peer.ID, "direct")
}

Check warning on line 102 in p2p/upgrade.go

View check run for this annotation

Codecov / codecov/patch

p2p/upgrade.go#L101-L102

Added lines #L101 - L102 were not covered by tests
if fh.PubSub, err = pubsub.New(fh.ctx, fh.logger, h, pubsub.Config{
Flood: cfg.Flood,
IsBootnode: cfg.Bootnode,
Direct: direct,
Bootnodes: bootnodes,
MaxMessageSize: cfg.MaxMessageSize,
}); err != nil {
Expand All @@ -119,11 +127,15 @@
discovery.WithHighPeers(cfg.HighPeers),
discovery.WithDir(cfg.DataDir),
discovery.WithBootnodes(bootnodes),
discovery.WithDirect(direct),
discovery.WithLogger(fh.logger.Zap()),
}
if cfg.PrivateNetwork {
dopts = append(dopts, discovery.Private())
}
if cfg.DisableDHT {
dopts = append(dopts, discovery.DisableDHT())
}

Check warning on line 138 in p2p/upgrade.go

View check run for this annotation

Codecov / codecov/patch

p2p/upgrade.go#L137-L138

Added lines #L137 - L138 were not covered by tests
if cfg.Bootnode {
dopts = append(dopts, discovery.Server())
} else {
Expand Down