Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Make grpc servers' start methods be synchronous #4866

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 13 additions & 19 deletions api/grpcserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"net"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
Expand All @@ -21,6 +22,7 @@
Listener string
logger log.Logger
GrpcServer *grpc.Server
grp errgroup.Group
}

// New creates and returns a new Server with port and interface.
Expand All @@ -34,7 +36,7 @@
}

// Start starts the server.
func (s *Server) Start() <-chan struct{} {
func (s *Server) Start() error {
s.logger.With().Info("starting grpc server",
log.String("address", s.Listener),
log.Array("services", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error {
Expand All @@ -44,35 +46,27 @@
return nil
})),
)

started := make(chan struct{})
go s.startInternal(started)

return started
}

// Blocking, should be called in a goroutine.
func (s *Server) startInternal(started chan<- struct{}) {
lis, err := net.Listen("tcp", s.Listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return
return err

Check warning on line 52 in api/grpcserver/grpc.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/grpc.go#L52

Added line #L52 was not covered by tests
}
reflection.Register(s.GrpcServer)
s.logger.Info("starting new grpc server on %s", s.Listener)
close(started)
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
}
s.grp.Go(func() error {
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
return err
}

Check warning on line 59 in api/grpcserver/grpc.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/grpc.go#L57-L59

Added lines #L57 - L59 were not covered by tests
return nil
})
return nil
}

// Close stops the server.
func (s *Server) Close() error {
s.logger.Info("stopping the grpc server")
s.GrpcServer.Stop()
piersy marked this conversation as resolved.
Show resolved Hide resolved

// We don't return any errors but we want to conform to io.Closer so return a nil error
return nil
return s.grp.Wait()
}

// ServerOptions are shared by all grpc servers.
Expand Down
17 changes: 5 additions & 12 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,11 @@ func launchServer(tb testing.TB, cfg Config, services ...ServiceAPI) func() {
}

// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), services...)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
err := grpcService.Start()
require.NoError(tb, err)
if len(services) > 0 {
err = jsonService.StartService(context.Background(), services...)
require.NoError(tb, err)
}

return func() {
Expand Down
81 changes: 31 additions & 50 deletions api/grpcserver/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/log"
)
Expand All @@ -18,9 +19,9 @@
type JSONHTTPServer struct {
logger log.Logger

mu sync.RWMutex
listener string
server *http.Server
grp errgroup.Group
}

// NewJSONHTTPServer creates a new json http server.
Expand All @@ -34,41 +35,32 @@
// Shutdown stops the server.
func (s *JSONHTTPServer) Shutdown(ctx context.Context) error {
s.logger.Debug("stopping json-http service...")
server := s.getServer()
if server != nil {
err := server.Shutdown(ctx)
if s.server != nil {
err := s.server.Shutdown(ctx)
if errors.Is(err, http.ErrServerClosed) {
return nil
}
if err != nil {
return fmt.Errorf("shutdown: %w", err)
}
}

return nil
err := s.grp.Wait()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
}

// StartService starts the json api server and listens for status (started, stopped).
func (s *JSONHTTPServer) StartService(
ctx context.Context,
services ...ServiceAPI,
) <-chan struct{} {
started := make(chan struct{})

// This will block, so run it in a goroutine
go s.startInternal(
ctx,
started,
services...)

return started
}

func (s *JSONHTTPServer) startInternal(
ctx context.Context,
started chan<- struct{},
services ...ServiceAPI,
) {
) error {
// At least one service must be enabled
if len(services) == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return errors.New("no services provided")
}

Check warning on line 63 in api/grpcserver/http_server.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/http_server.go#L61-L63

Added lines #L61 - L63 were not covered by tests
ctx, cancel := context.WithCancel(ctx)

// This will close all downstream connections when the server closes
Expand Down Expand Up @@ -96,38 +88,27 @@
}
if err != nil {
s.logger.Error("registering %T with grpc gateway failed with %v", svc, err)
return err

Check warning on line 91 in api/grpcserver/http_server.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/http_server.go#L91

Added line #L91 was not covered by tests
}
serviceCount++
}

close(started)
s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))

// At least one service must be enabled
if serviceCount == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return
lis, err := net.Listen("tcp", s.listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return err
}

s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))
s.setServer(&http.Server{
Addr: s.listener,
s.server = &http.Server{
Handler: mux,
}
s.grp.Go(func() error {
if err := s.server.Serve(lis); err != nil {
s.logger.Error("error from grpc http server: %v", err)
return err
}
return nil

Check warning on line 111 in api/grpcserver/http_server.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/http_server.go#L111

Added line #L111 was not covered by tests
})

// This will block
s.logger.Error("error from grpc http listener: %v", s.getServer().ListenAndServe())
}

func (s *JSONHTTPServer) getServer() *http.Server {
s.mu.RLock()
defer s.mu.RUnlock()

return s.server
}

func (s *JSONHTTPServer) setServer(server *http.Server) {
s.mu.Lock()
defer s.mu.Unlock()

s.server = server
return nil
}
26 changes: 11 additions & 15 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -139,23 +140,18 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() {

pb.RegisterMeshServiceServer(grpcService.GrpcServer, s)
// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), s)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
}
err := grpcService.Start()
require.NoError(tb, err)
err = jsonService.StartService(context.Background(), s)
require.NoError(tb, err)

return func() {
require.NoError(tb, jsonService.Shutdown(context.Background()))
_ = grpcService.Close()
err := jsonService.Shutdown(context.Background())
if !errors.Is(err, http.ErrServerClosed) {
require.NoError(tb, err)
}
err = grpcService.Close()
require.NoError(tb, err)
}
}

Expand Down
12 changes: 9 additions & 3 deletions node/bad_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ func TestPeerDisconnectForMessageResultValidationReject(t *testing.T) {
conf1.DataDirParent = t.TempDir()
conf1.FileLock = filepath.Join(conf1.DataDirParent, "LOCK")
conf1.P2P.Listen = "/ip4/127.0.0.1/tcp/0"
// We setup the api to listen on an OS assigned port, which avoids the second instance getting stuck when
conf1.API.PublicListener = "0.0.0.0:0"
conf1.API.PrivateListener = "0.0.0.0:0"
app1, err := NewApp(&conf1)
require.NoError(t, err)
conf2 := config.DefaultTestConfig()

// We need to copy the genesis config to ensure that both nodes share the
// same gnenesis ID, otherwise they will not be able to connect to each
// other.
*conf2.Genesis = *conf1.Genesis
conf2.DataDirParent = t.TempDir()
conf2.FileLock = filepath.Join(conf2.DataDirParent, "LOCK")
conf2.P2P.Listen = "/ip4/127.0.0.1/tcp/0"
conf2.API.PublicListener = "0.0.0.0:0"
conf2.API.PrivateListener = "0.0.0.0:0"
app2, err := NewApp(&conf2)
require.NoError(t, err)

Expand All @@ -54,13 +60,13 @@ func TestPeerDisconnectForMessageResultValidationReject(t *testing.T) {
app1.Cleanup(ctx)
app2.Cleanup(ctx)
})
g := errgroup.Group{}
g, grpContext := errgroup.WithContext(ctx)
g.Go(func() error {
return app1.Start(ctx)
return app1.Start(grpContext)
})
<-app1.Started()
g.Go(func() error {
return app2.Start(ctx)
return app2.Start(grpContext)
})
<-app2.Started()

Expand Down
8 changes: 6 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,10 +1109,14 @@
app.jsonAPIService.StartService(ctx, public...)
}
if app.grpcPublicService != nil {
app.grpcPublicService.Start()
if err := app.grpcPublicService.Start(); err != nil {
return err
}

Check warning on line 1114 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1113-L1114

Added lines #L1113 - L1114 were not covered by tests
}
if app.grpcPrivateService != nil {
app.grpcPrivateService.Start()
if err := app.grpcPrivateService.Start(); err != nil {
return err
}

Check warning on line 1119 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1118-L1119

Added lines #L1118 - L1119 were not covered by tests
}
return nil
}
Expand Down