Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Make grpc servers' start methods be synchronous #4866

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 13 additions & 19 deletions api/grpcserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
Expand All @@ -21,6 +22,7 @@ type Server struct {
Listener string
logger log.Logger
GrpcServer *grpc.Server
grp errgroup.Group
}

// New creates and returns a new Server with port and interface.
Expand All @@ -34,7 +36,7 @@ func New(listener string, lg log.Logger, opts ...grpc.ServerOption) *Server {
}

// Start starts the server.
func (s *Server) Start() <-chan struct{} {
func (s *Server) Start() error {
s.logger.With().Info("starting grpc server",
log.String("address", s.Listener),
log.Array("services", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error {
Expand All @@ -44,35 +46,27 @@ func (s *Server) Start() <-chan struct{} {
return nil
})),
)

started := make(chan struct{})
go s.startInternal(started)

return started
}

// Blocking, should be called in a goroutine.
func (s *Server) startInternal(started chan<- struct{}) {
lis, err := net.Listen("tcp", s.Listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return
return err
}
reflection.Register(s.GrpcServer)
s.logger.Info("starting new grpc server on %s", s.Listener)
close(started)
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
}
s.grp.Go(func() error {
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
return err
}
return nil
})
return nil
}

// Close stops the server.
func (s *Server) Close() error {
s.logger.Info("stopping the grpc server")
s.GrpcServer.Stop()
piersy marked this conversation as resolved.
Show resolved Hide resolved

// We don't return any errors but we want to conform to io.Closer so return a nil error
return nil
return s.grp.Wait()
}

// ServerOptions are shared by all grpc servers.
Expand Down
14 changes: 4 additions & 10 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,20 +491,14 @@ func launchServer(tb testing.TB, cfg Config, services ...ServiceAPI) func() {
}

// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), services...)
err := grpcService.Start()
require.NoError(tb, err)
err = jsonService.StartService(context.Background(), services...)
require.NoError(tb, err)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
piersy marked this conversation as resolved.
Show resolved Hide resolved

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
}

return func() {
require.NoError(tb, jsonService.Shutdown(context.Background()))
_ = grpcService.Close()
Expand Down
78 changes: 28 additions & 50 deletions api/grpcserver/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/log"
)
Expand All @@ -18,9 +19,9 @@ import (
type JSONHTTPServer struct {
logger log.Logger

mu sync.RWMutex
listener string
server *http.Server
grp errgroup.Group
}

// NewJSONHTTPServer creates a new json http server.
Expand All @@ -34,41 +35,28 @@ func NewJSONHTTPServer(listener string, lg log.Logger) *JSONHTTPServer {
// Shutdown stops the server.
func (s *JSONHTTPServer) Shutdown(ctx context.Context) error {
s.logger.Debug("stopping json-http service...")
server := s.getServer()
if server != nil {
err := server.Shutdown(ctx)
if s.server != nil {
err := s.server.Shutdown(ctx)
if errors.Is(err, http.ErrServerClosed) {
return nil
}
if err != nil {
return fmt.Errorf("shutdown: %w", err)
}
}

return nil
return s.grp.Wait()
}

// StartService starts the json api server and listens for status (started, stopped).
func (s *JSONHTTPServer) StartService(
ctx context.Context,
services ...ServiceAPI,
) <-chan struct{} {
started := make(chan struct{})

// This will block, so run it in a goroutine
go s.startInternal(
ctx,
started,
services...)

return started
}

func (s *JSONHTTPServer) startInternal(
ctx context.Context,
started chan<- struct{},
services ...ServiceAPI,
) {
) error {
// At least one service must be enabled
if len(services) == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return errors.New("no services provided")
}
ctx, cancel := context.WithCancel(ctx)

// This will close all downstream connections when the server closes
Expand Down Expand Up @@ -96,38 +84,28 @@ func (s *JSONHTTPServer) startInternal(
}
if err != nil {
s.logger.Error("registering %T with grpc gateway failed with %v", svc, err)
return err
}
serviceCount++
}

close(started)
s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))

// At least one service must be enabled
if serviceCount == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return
lis, err := net.Listen("tcp", s.listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return err
}

s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))
s.setServer(&http.Server{
Addr: s.listener,
s.server = &http.Server{
Handler: mux,
}
s.server.Serve(lis)
s.grp.Go(func() error {
if err := s.server.Serve(lis); err != nil {
s.logger.Error("error from grpc http server: %v", err)
return err
}
return nil
})

// This will block
s.logger.Error("error from grpc http listener: %v", s.getServer().ListenAndServe())
}

func (s *JSONHTTPServer) getServer() *http.Server {
s.mu.RLock()
defer s.mu.RUnlock()

return s.server
}

func (s *JSONHTTPServer) setServer(server *http.Server) {
s.mu.Lock()
defer s.mu.Unlock()

s.server = server
return nil
}
14 changes: 4 additions & 10 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,14 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() {

pb.RegisterMeshServiceServer(grpcService.GrpcServer, s)
// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), s)
err := grpcService.Start()
require.NoError(tb, err)
err = jsonService.StartService(context.Background(), s)
require.NoError(tb, err)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
piersy marked this conversation as resolved.
Show resolved Hide resolved

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
}

return func() {
require.NoError(tb, jsonService.Shutdown(context.Background()))
_ = grpcService.Close()
Expand Down