Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add peer info api #4845

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8c1847d
Most work done for API but lacking tests
piersy Aug 11, 2023
3233402
Add test of peer info api
piersy Aug 14, 2023
3aa5aed
Add replace directive for api repo
piersy Aug 14, 2023
a859ac2
Re-generate mocks
piersy Aug 14, 2023
9264c0c
Remove unnecessary change to tcp4
piersy Aug 14, 2023
888e221
Remove unnecessary config
piersy Aug 14, 2023
e7627cf
Update comment
piersy Aug 14, 2023
464dd8a
Rename function
piersy Aug 15, 2023
1d7c2aa
Remove unnecessary config option
piersy Aug 15, 2023
cf0e613
Don't include backup in peer info tags
piersy Aug 15, 2023
596b656
Move PeerInfo from Node service to Admin service
piersy Aug 15, 2023
b552554
Rename test file to match service
piersy Aug 15, 2023
c7690a7
Convert NewTestNetwork to accept and use testing.T
piersy Aug 15, 2023
89a9234
Add timeout to context
piersy Aug 15, 2023
58a72e9
Don't log node startup errors when we have canceled the context
piersy Aug 15, 2023
1fe25ef
Actually use t.TempDir
piersy Aug 15, 2023
c76f3b4
Revert modification to PostProviderID
piersy Aug 15, 2023
487f50b
Re-generate mocks
piersy Aug 15, 2023
efdfcb9
Await grpc service startup at node startup
piersy Aug 15, 2023
216bf89
Prevent test from hanging due to grpc server port clash
piersy Aug 15, 2023
9d7b77c
Fix handling of instance start error
piersy Aug 16, 2023
674409c
Convert peer info api to streaming
piersy Aug 16, 2023
1a4cf9e
Move bootnode and direct checks to p2p/host.go
piersy Aug 16, 2023
28e2017
Simplify streaming code
piersy Aug 16, 2023
c612a4b
Re-generate mocks
piersy Aug 16, 2023
ffc4917
Merge branch 'develop' into piersy/add-peer-info-api
piersy Aug 16, 2023
dbd3e38
Replace replace statement with direct dependency update
piersy Aug 16, 2023
f1ad546
Make use of errorgrp.WithContext to fast fail on server startup
piersy Aug 16, 2023
d6678e4
Merge branch 'develop' into piersy/add-peer-info-api
piersy Aug 16, 2023
dbbb49b
Remove timeout on contexts used to shut down nodes
piersy Aug 16, 2023
660f42b
Merge branch 'develop' into piersy/add-peer-info-api
piersy Aug 17, 2023
81a3aa4
Merge branch 'develop' into piersy/add-peer-info-api
piersy Aug 17, 2023
1f67718
Inline NewTestApp
piersy Aug 18, 2023
5afc705
Merge remote-tracking branch 'origin/develop' into piersy/add-peer-in…
piersy Aug 18, 2023
7f1f409
Change conn uptime field from string to duration
piersy Aug 21, 2023
d99bf92
Merge remote-tracking branch 'origin/develop' into piersy/add-peer-in…
piersy Aug 21, 2023
1e8abb9
Update go.mod with new release of api module
piersy Aug 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 37 additions & 1 deletion api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
logger log.Logger
db *sql.Database
dataDir string
p peers
}

// NewAdminService creates a new admin grpc service.
func NewAdminService(db *sql.Database, dataDir string, lg log.Logger) *AdminService {
func NewAdminService(db *sql.Database, dataDir string, lg log.Logger, p peers) *AdminService {
return &AdminService{
logger: lg,
db: db,
dataDir: dataDir,
p: p,
}
}

Expand Down Expand Up @@ -128,3 +130,37 @@
}
}
}

func (a AdminService) PeerInfoStream(_ *empty.Empty, stream pb.AdminService_PeerInfoStreamServer) error {
for _, p := range a.p.GetPeers() {
select {
case <-stream.Context().Done():
return nil

Check warning on line 138 in api/grpcserver/admin_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/admin_service.go#L137-L138

Added lines #L137 - L138 were not covered by tests
default:
info := a.p.ConnectedPeerInfo(p)
// There is no guarantee that the peers originally returned will still
// be connected by the time we call ConnectedPeerInfo.
if info == nil {
continue

Check warning on line 144 in api/grpcserver/admin_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/admin_service.go#L144

Added line #L144 was not covered by tests
}
connections := make([]*pb.ConnectionInfo, len(info.Connections))
for j, c := range info.Connections {
connections[j] = &pb.ConnectionInfo{
Address: c.Address.String(),
Uptime: c.Uptime.String(),
piersy marked this conversation as resolved.
Show resolved Hide resolved
Outbound: c.Outbound,
}
}
err := stream.Send(&pb.PeerInfo{
Id: info.ID.String(),
Connections: connections,
Tags: info.Tags,
})
if err != nil {
return fmt.Errorf("send to stream: %w", err)
}

Check warning on line 161 in api/grpcserver/admin_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/admin_service.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}
}

return nil
}
4 changes: 2 additions & 2 deletions api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func createMesh(tb testing.TB, db *sql.Database) {
func TestAdminService_Checkpoint(t *testing.T) {
db := sql.InMemory()
createMesh(t, db)
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestAdminService_Checkpoint(t *testing.T) {

func TestAdminService_CheckpointError(t *testing.T) {
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
12 changes: 9 additions & 3 deletions api/grpcserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ type ServiceAPI interface {

// Server is a very basic grpc server.
type Server struct {
Listener string
logger log.Logger
GrpcServer *grpc.Server
Listener string
logger log.Logger
// BoundAddress contains the address that the server bound to, useful if
// the server uses a dynamic port. It is set during startup and can be
// safely accessed after Start has completed (I.E. the returned channel has
// been waited on)
BoundAddress string
GrpcServer *grpc.Server
}

// New creates and returns a new Server with port and interface.
Expand Down Expand Up @@ -58,6 +63,7 @@ func (s *Server) startInternal(started chan<- struct{}) {
s.logger.Error("error listening: %v", err)
return
}
s.BoundAddress = lis.Addr().String()
reflection.Register(s.GrpcServer)
s.logger.Info("starting new grpc server on %s", s.Listener)
close(started)
Expand Down
6 changes: 6 additions & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type peerCounter interface {
PeerCount() uint64
}

// Peers is an api to get peer related info.
type peers interface {
ConnectedPeerInfo(p2p.Peer) *p2p.PeerInfo
GetPeers() []p2p.Peer
}

// genesisTimeAPI is an API to get genesis time and current layer of the system.
type genesisTimeAPI interface {
GenesisTime() time.Time
Expand Down
51 changes: 51 additions & 0 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/pyroscope-io/pyroscope v0.37.2
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/seehuhn/mt19937 v1.0.0
github.com/spacemeshos/api/release/go v1.19.0
github.com/spacemeshos/api/release/go v1.19.1-0.20230818155305-f006d101d548
github.com/spacemeshos/economics v0.1.0
github.com/spacemeshos/fixed v0.1.0
github.com/spacemeshos/go-scale v1.1.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hg
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemeshos/api/release/go v1.19.0 h1:QPt1nUuSVQ4DfZPNsSAuJpjjYlbS9HJhDunE7K8rn08=
github.com/spacemeshos/api/release/go v1.19.0/go.mod h1:nC5g7IVRNxF8Kl/Tzvr7cQeYBwHN4sMTVsIEa6Ebtl4=
github.com/spacemeshos/api/release/go v1.19.1-0.20230818155305-f006d101d548 h1:ilyQXo1rPTLjR0X3hRtP8BsIq8bGPUT7iSOz7wEEcIM=
github.com/spacemeshos/api/release/go v1.19.1-0.20230818155305-f006d101d548/go.mod h1:nC5g7IVRNxF8Kl/Tzvr7cQeYBwHN4sMTVsIEa6Ebtl4=
github.com/spacemeshos/economics v0.1.0 h1:PJAKbhBKqbbdCYTB29pkmc8sYqK3pKUAiuAvQxuSJEg=
github.com/spacemeshos/economics v0.1.0/go.mod h1:Bz0wRDwCOUP1A6w3cPW6iuUBGME8Tz48sIriYiohsBg=
github.com/spacemeshos/fixed v0.1.0 h1:20KIGvxLlAsuidQrvuwwHe6PrvqeTKzbJIsScbmnUPQ=
Expand Down
78 changes: 78 additions & 0 deletions node/adminservice_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package node

import (
"context"
"io"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/libp2p/go-libp2p/core/peer"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/config"
"github.com/spacemeshos/go-spacemesh/log/logtest"
)

func TestPeerInfoApi(t *testing.T) {
cfg := config.DefaultTestConfig()
cfg.P2P.DisableNatPort = true
cfg.P2P.Listen = "/ip4/127.0.0.1/tcp/0"

cfg.API.PublicListener = "0.0.0.0:0"
cfg.API.PrivateServices = nil
cfg.API.PublicServices = []string{grpcserver.Admin}
l := logtest.New(t)
networkSize := 3
network := NewTestNetwork(t, cfg, l, networkSize)
infos := make([][]*pb.PeerInfo, networkSize)
for i, app := range network {
adminapi := pb.NewAdminServiceClient(app.Conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

streamClient, err := adminapi.PeerInfoStream(ctx, &empty.Empty{})
require.NoError(t, err)
for {
info, err := streamClient.Recv()
if err == io.EOF {
break
}
require.NoError(t, err)
infos[i] = append(infos[i], info)
}
}

for i, app := range network {
for j, innerApp := range network {
if j == i {
continue
}
peers := infos[i]
require.Len(t, peers, networkSize-1, "expecting each node to have connections to all other nodes")
peer := getPeerInfo(peers, innerApp.host.ID())
require.NotNil(t, peer, "info is missing connection to %v")
require.Len(t, peer.Connections, 1, "expecting only 1 connection to each peer")
require.Equal(t, innerApp.host.Addrs()[0].String(), peer.Connections[0].Address, "connection address should match address of peer")
outbound := peer.Connections[0].Outbound

// Check that outbound matches with the other side of the connection
otherSide := getPeerInfo(infos[j], app.host.ID())
require.NotNil(t, peer, "one side missing peer connection")
require.Len(t, otherSide.Connections, 1, "expecting only 1 connection to each peer")
require.Equal(t, outbound, !otherSide.Connections[0].Outbound, "expecting pairwise connections to agree on outbound direction")
}
}
}

func getPeerInfo(peers []*pb.PeerInfo, id peer.ID) *pb.PeerInfo {
str := id.String()
for _, p := range peers {
if str == p.Id {
return p
}
}
return nil
}
42 changes: 12 additions & 30 deletions node/bad_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,31 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
varint "github.com/multiformats/go-varint"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/config"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/log/logtest"
ps "github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/signing"
)

func TestPeerDisconnectForMessageResultValidationReject(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

l := logtest.New(t)

// Make 2 node instances
conf1 := config.DefaultTestConfig()
conf1.DataDirParent = t.TempDir()
conf1.FileLock = filepath.Join(conf1.DataDirParent, "LOCK")
conf1.P2P.Listen = "/ip4/127.0.0.1/tcp/0"
app1, err := NewApp(&conf1)
require.NoError(t, err)
// We setup the api to listen on an OS assigned port, which avoids the second instance getting stuck when
conf1.API.PublicListener = "0.0.0.0:0"
conf1.API.PrivateListener = "0.0.0.0:0"
app1 := NewApp(t, &conf1, l)

conf2 := config.DefaultTestConfig()
// We need to copy the genesis config to ensure that both nodes share the
// same gnenesis ID, otherwise they will not be able to connect to each
Expand All @@ -46,8 +47,9 @@ func TestPeerDisconnectForMessageResultValidationReject(t *testing.T) {
conf2.DataDirParent = t.TempDir()
conf2.FileLock = filepath.Join(conf2.DataDirParent, "LOCK")
conf2.P2P.Listen = "/ip4/127.0.0.1/tcp/0"
app2, err := NewApp(&conf2)
require.NoError(t, err)
conf2.API.PublicListener = "0.0.0.0:0"
conf2.API.PrivateListener = "0.0.0.0:0"
app2 := NewApp(t, &conf2, l)

types.SetLayersPerEpoch(conf1.LayersPerEpoch)
t.Cleanup(func() {
Expand All @@ -65,7 +67,7 @@ func TestPeerDisconnectForMessageResultValidationReject(t *testing.T) {
<-app2.Started()

// Connect app2 to app1
err = app2.Host().Connect(context.Background(), peer.AddrInfo{
err := app2.Host().Connect(context.Background(), peer.AddrInfo{
ID: app1.Host().ID(),
Addrs: app1.Host().Addrs(),
})
Expand Down Expand Up @@ -122,26 +124,6 @@ func TestPeerDisconnectForMessageResultValidationReject(t *testing.T) {
require.NoError(t, g.Wait())
}

func NewApp(conf *config.Config) (*App, error) {
app := New(
WithConfig(conf),
WithLog(log.RegisterHooks(
log.NewWithLevel("", zap.NewAtomicLevelAt(zapcore.DebugLevel)),
events.EventHook()),
),
)

var err error
if err = app.Initialize(); err != nil {
return nil, err
}
app.edSgn, err = signing.NewEdSigner()
if err != nil {
return nil, err
}
return app, err
}

func getStream(c network.Conn, p protocol.ID, dir network.Direction) network.Stream {
for _, s := range c.GetStreams() {
if s.Protocol() == p && s.Stat().Direction == dir {
Expand Down
6 changes: 3 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ func (app *App) initService(ctx context.Context, svc grpcserver.Service) (grpcse
case grpcserver.Node:
return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit, logger.WithName("Node")), nil
case grpcserver.Admin:
return grpcserver.NewAdminService(app.db, app.Config.DataDir(), logger.WithName("Admin")), nil
return grpcserver.NewAdminService(app.db, app.Config.DataDir(), logger.WithName("Admin"), app.host), nil
case grpcserver.Smesher:
return grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder, app.Config.API.SmesherStreamInterval, app.Config.SMESHING.Opts, logger.WithName("Smesher")), nil
case grpcserver.Transaction:
Expand Down Expand Up @@ -1109,10 +1109,10 @@ func (app *App) startAPIServices(ctx context.Context) error {
app.jsonAPIService.StartService(ctx, public...)
}
if app.grpcPublicService != nil {
app.grpcPublicService.Start()
<-app.grpcPublicService.Start()
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
if app.grpcPrivateService != nil {
app.grpcPrivateService.Start()
<-app.grpcPrivateService.Start()
}
return nil
}
Expand Down