Skip to content

Commit

Permalink
Make grpc servers' start methods be synchronous (#4866)
Browse files Browse the repository at this point in the history
## Motivation
Closes #4861 
## Changes

Updates grpc server startup code to execute synchronously, returning only after the server has started listening.

## Test Plan
Existing tests should pass

## DevOps Notes
<!-- Please uncheck these items as applicable to make DevOps aware of changes that may affect releases -->
- [x] This PR does not require configuration changes (e.g., environment variables, GitHub secrets, VM resources)
- [x] This PR does not affect public APIs
- [x] This PR does not rely on a new version of external services (PoET, elasticsearch, etc.)
- [x] This PR does not make changes to log messages (which monitoring infrastructure may rely on)
  • Loading branch information
piersy committed Aug 17, 2023
1 parent 04b9597 commit ab5c253
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 89 deletions.
32 changes: 13 additions & 19 deletions api/grpcserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
Expand All @@ -21,6 +22,7 @@ type Server struct {
Listener string
logger log.Logger
GrpcServer *grpc.Server
grp errgroup.Group
}

// New creates and returns a new Server with port and interface.
Expand All @@ -34,7 +36,7 @@ func New(listener string, lg log.Logger, opts ...grpc.ServerOption) *Server {
}

// Start starts the server.
func (s *Server) Start() <-chan struct{} {
func (s *Server) Start() error {
s.logger.With().Info("starting grpc server",
log.String("address", s.Listener),
log.Array("services", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error {
Expand All @@ -44,35 +46,27 @@ func (s *Server) Start() <-chan struct{} {
return nil
})),
)

started := make(chan struct{})
go s.startInternal(started)

return started
}

// Blocking, should be called in a goroutine.
func (s *Server) startInternal(started chan<- struct{}) {
lis, err := net.Listen("tcp", s.Listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return
return err
}
reflection.Register(s.GrpcServer)
s.logger.Info("starting new grpc server on %s", s.Listener)
close(started)
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
}
s.grp.Go(func() error {
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
return err
}
return nil
})
return nil
}

// Close stops the server.
func (s *Server) Close() error {
s.logger.Info("stopping the grpc server")
s.GrpcServer.Stop()

// We don't return any errors but we want to conform to io.Closer so return a nil error
return nil
return s.grp.Wait()
}

// ServerOptions are shared by all grpc servers.
Expand Down
14 changes: 4 additions & 10 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,20 +491,14 @@ func launchServer(tb testing.TB, cfg Config, services ...ServiceAPI) func() {
}

// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), services...)
err := grpcService.Start()
require.NoError(tb, err)
err = jsonService.StartService(context.Background(), services...)
require.NoError(tb, err)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
}

return func() {
require.NoError(tb, jsonService.Shutdown(context.Background()))
_ = grpcService.Close()
Expand Down
78 changes: 28 additions & 50 deletions api/grpcserver/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/log"
)
Expand All @@ -18,9 +19,9 @@ import (
type JSONHTTPServer struct {
logger log.Logger

mu sync.RWMutex
listener string
server *http.Server
grp errgroup.Group
}

// NewJSONHTTPServer creates a new json http server.
Expand All @@ -34,41 +35,28 @@ func NewJSONHTTPServer(listener string, lg log.Logger) *JSONHTTPServer {
// Shutdown stops the server.
func (s *JSONHTTPServer) Shutdown(ctx context.Context) error {
s.logger.Debug("stopping json-http service...")
server := s.getServer()
if server != nil {
err := server.Shutdown(ctx)
if s.server != nil {
err := s.server.Shutdown(ctx)
if errors.Is(err, http.ErrServerClosed) {
return nil
}
if err != nil {
return fmt.Errorf("shutdown: %w", err)
}
}

return nil
return s.grp.Wait()
}

// StartService starts the json api server and listens for status (started, stopped).
func (s *JSONHTTPServer) StartService(
ctx context.Context,
services ...ServiceAPI,
) <-chan struct{} {
started := make(chan struct{})

// This will block, so run it in a goroutine
go s.startInternal(
ctx,
started,
services...)

return started
}

func (s *JSONHTTPServer) startInternal(
ctx context.Context,
started chan<- struct{},
services ...ServiceAPI,
) {
) error {
// At least one service must be enabled
if len(services) == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return errors.New("no services provided")
}
ctx, cancel := context.WithCancel(ctx)

// This will close all downstream connections when the server closes
Expand Down Expand Up @@ -96,38 +84,28 @@ func (s *JSONHTTPServer) startInternal(
}
if err != nil {
s.logger.Error("registering %T with grpc gateway failed with %v", svc, err)
return err
}
serviceCount++
}

close(started)
s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))

// At least one service must be enabled
if serviceCount == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return
lis, err := net.Listen("tcp", s.listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return err
}

s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))
s.setServer(&http.Server{
Addr: s.listener,
s.server = &http.Server{
Handler: mux,
}
s.server.Serve(lis)
s.grp.Go(func() error {
if err := s.server.Serve(lis); err != nil {
s.logger.Error("error from grpc http server: %v", err)
return err
}
return nil
})

// This will block
s.logger.Error("error from grpc http listener: %v", s.getServer().ListenAndServe())
}

func (s *JSONHTTPServer) getServer() *http.Server {
s.mu.RLock()
defer s.mu.RUnlock()

return s.server
}

func (s *JSONHTTPServer) setServer(server *http.Server) {
s.mu.Lock()
defer s.mu.Unlock()

s.server = server
return nil
}
14 changes: 4 additions & 10 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,14 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() {

pb.RegisterMeshServiceServer(grpcService.GrpcServer, s)
// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), s)
err := grpcService.Start()
require.NoError(tb, err)
err = jsonService.StartService(context.Background(), s)
require.NoError(tb, err)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
}

return func() {
require.NoError(tb, jsonService.Shutdown(context.Background()))
_ = grpcService.Close()
Expand Down

0 comments on commit ab5c253

Please sign in to comment.