Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into use-middleware-to-…
Browse files Browse the repository at this point in the history
…log-grpc-start
  • Loading branch information
poszu committed Aug 21, 2023
2 parents 5927a12 + 3f8aeb2 commit c7100ec
Show file tree
Hide file tree
Showing 61 changed files with 2,246 additions and 415 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# go-spacemesh needs at least ubuntu 20.04 (because gpu-post and post-rs are linked to glibc 2.31)
# newer versions of ubuntu should work as well, so far only 22.04 has been tested
# go-spacemesh needs at least ubuntu 22.04. newer versions of ubuntu might work as well, but are untested
FROM ubuntu:22.04 AS linux
ENV DEBIAN_FRONTEND noninteractive
ENV SHELL /bin/bash
Expand Down
10 changes: 8 additions & 2 deletions Makefile-libs.Inc
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ $(BINDIR_POSTRS_SETUP_LIBS): $(PROJ_DIR)$(POSTRS_SETUP_ZIP)
unzip -o -j $(PROJ_DIR)$(POSTRS_SETUP_ZIP) -d $(dir $@) $(notdir $@)
touch $@

CURL_OPTIONS = --retry 10 --retry-max-time 120
CURL_VERSION = $(shell curl --version 2>/dev/null | head -n 1 | cut -d' ' -f2)
ifeq ($(shell expr "$(CURL_VERSION)" \>= 7.71.0),1)
CURL_OPTIONS := $(CURL_OPTIONS) --retry-all-errors
endif

$(PROJ_DIR)$(POSTRS_SETUP_ZIP):
curl -sSL --retry 10 --retry-max-time 120 --retry-all-errors $(POSTRS_SETUP_URL_ZIP) -o $(PROJ_DIR)$(POSTRS_SETUP_ZIP)
curl -sSL $(CURL_OPTIONS) $(POSTRS_SETUP_URL_ZIP) -o $(PROJ_DIR)$(POSTRS_SETUP_ZIP)

$(BIN_DIR)$(POSTRS_PROFILER_BIN):
curl -sSL --retry 10 --retry-max-time 120 --retry-all-errors $(POSTRS_PROFILER_URL) -o $(PROJ_DIR)$(POSTRS_PROFILER_ZIP)
curl -sSL $(CURL_OPTIONS) $(POSTRS_PROFILER_URL) -o $(PROJ_DIR)$(POSTRS_PROFILER_ZIP)
unzip -o -j $(PROJ_DIR)$(POSTRS_PROFILER_ZIP) -d $(BIN_DIR)

get-postrs-lib: $(PROJ_DIR)$(POSTRS_SETUP_ZIP) $(BINDIR_POSTRS_SETUP_LIBS)
Expand Down
39 changes: 38 additions & 1 deletion api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/spacemeshos/go-spacemesh/checkpoint"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand All @@ -32,10 +33,11 @@ type AdminService struct {
db *sql.Database
dataDir string
recover func()
p peers
}

// NewAdminService creates a new admin grpc service.
func NewAdminService(db *sql.Database, dataDir string) *AdminService {
func NewAdminService(db *sql.Database, dataDir string, p peers) *AdminService {
return &AdminService{
db: db,
dataDir: dataDir,
Expand All @@ -46,6 +48,7 @@ func NewAdminService(db *sql.Database, dataDir string) *AdminService {
os.Exit(0)
}()
},
p: p,
}
}

Expand Down Expand Up @@ -136,3 +139,37 @@ func (a AdminService) EventsStream(req *pb.EventStreamRequest, stream pb.AdminSe
}
}
}

func (a AdminService) PeerInfoStream(_ *empty.Empty, stream pb.AdminService_PeerInfoStreamServer) error {
for _, p := range a.p.GetPeers() {
select {
case <-stream.Context().Done():
return nil
default:
info := a.p.ConnectedPeerInfo(p)
// There is no guarantee that the peers originally returned will still
// be connected by the time we call ConnectedPeerInfo.
if info == nil {
continue
}
connections := make([]*pb.ConnectionInfo, len(info.Connections))
for j, c := range info.Connections {
connections[j] = &pb.ConnectionInfo{
Address: c.Address.String(),
Uptime: durationpb.New(c.Uptime),
Outbound: c.Outbound,
}
}
err := stream.Send(&pb.PeerInfo{
Id: info.ID.String(),
Connections: connections,
Tags: info.Tags,
})
if err != nil {
return fmt.Errorf("send to stream: %w", err)
}
}
}

return nil
}
6 changes: 3 additions & 3 deletions api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func createMesh(tb testing.TB, db *sql.Database) {
func TestAdminService_Checkpoint(t *testing.T) {
db := sql.InMemory()
createMesh(t, db)
svc := NewAdminService(db, t.TempDir())
svc := NewAdminService(db, t.TempDir(), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestAdminService_Checkpoint(t *testing.T) {

func TestAdminService_CheckpointError(t *testing.T) {
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir())
svc := NewAdminService(db, t.TempDir(), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -108,7 +108,7 @@ func TestAdminService_CheckpointError(t *testing.T) {
func TestAdminService_Recovery(t *testing.T) {
db := sql.InMemory()
recoveryCalled := atomic.Bool{}
svc := NewAdminService(db, t.TempDir())
svc := NewAdminService(db, t.TempDir(), nil)
svc.recover = func() { recoveryCalled.Store(true) }

t.Cleanup(launchServer(t, cfg, svc))
Expand Down
44 changes: 22 additions & 22 deletions api/grpcserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
Expand All @@ -18,9 +19,15 @@ type ServiceAPI interface {

// Server is a very basic grpc server.
type Server struct {
Listener string
logger log.Logger
GrpcServer *grpc.Server
Listener string
logger log.Logger
// BoundAddress contains the address that the server bound to, useful if
// the server uses a dynamic port. It is set during startup and can be
// safely accessed after Start has completed (I.E. the returned channel has
// been waited on)
BoundAddress string
GrpcServer *grpc.Server
grp errgroup.Group
}

// New creates and returns a new Server with port and interface.
Expand All @@ -34,7 +41,7 @@ func New(listener string, lg log.Logger, opts ...grpc.ServerOption) *Server {
}

// Start starts the server.
func (s *Server) Start() <-chan struct{} {
func (s *Server) Start() error {
s.logger.With().Info("starting grpc server",
log.String("address", s.Listener),
log.Array("services", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error {
Expand All @@ -44,35 +51,28 @@ func (s *Server) Start() <-chan struct{} {
return nil
})),
)

started := make(chan struct{})
go s.startInternal(started)

return started
}

// Blocking, should be called in a goroutine.
func (s *Server) startInternal(started chan<- struct{}) {
lis, err := net.Listen("tcp", s.Listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return
return err
}
s.BoundAddress = lis.Addr().String()
reflection.Register(s.GrpcServer)
s.logger.Info("starting new grpc server on %s", s.Listener)
close(started)
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
}
s.grp.Go(func() error {
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
return err
}
return nil
})
return nil
}

// Close stops the server.
func (s *Server) Close() error {
s.logger.Info("stopping the grpc server")
s.GrpcServer.Stop()

// We don't return any errors but we want to conform to io.Closer so return a nil error
return nil
return s.grp.Wait()
}

// ServerOptions are shared by all grpc servers.
Expand Down
17 changes: 5 additions & 12 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,11 @@ func launchServer(tb testing.TB, cfg Config, services ...ServiceAPI) func() {
}

// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), services...)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
err := grpcService.Start()
require.NoError(tb, err)
if len(services) > 0 {
err = jsonService.StartService(context.Background(), services...)
require.NoError(tb, err)
}

return func() {
Expand Down
81 changes: 31 additions & 50 deletions api/grpcserver/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/log"
)
Expand All @@ -18,9 +19,9 @@ import (
type JSONHTTPServer struct {
logger log.Logger

mu sync.RWMutex
listener string
server *http.Server
grp errgroup.Group
}

// NewJSONHTTPServer creates a new json http server.
Expand All @@ -34,41 +35,32 @@ func NewJSONHTTPServer(listener string, lg log.Logger) *JSONHTTPServer {
// Shutdown stops the server.
func (s *JSONHTTPServer) Shutdown(ctx context.Context) error {
s.logger.Debug("stopping json-http service...")
server := s.getServer()
if server != nil {
err := server.Shutdown(ctx)
if s.server != nil {
err := s.server.Shutdown(ctx)
if errors.Is(err, http.ErrServerClosed) {
return nil
}
if err != nil {
return fmt.Errorf("shutdown: %w", err)
}
}

return nil
err := s.grp.Wait()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
}

// StartService starts the json api server and listens for status (started, stopped).
func (s *JSONHTTPServer) StartService(
ctx context.Context,
services ...ServiceAPI,
) <-chan struct{} {
started := make(chan struct{})

// This will block, so run it in a goroutine
go s.startInternal(
ctx,
started,
services...)

return started
}

func (s *JSONHTTPServer) startInternal(
ctx context.Context,
started chan<- struct{},
services ...ServiceAPI,
) {
) error {
// At least one service must be enabled
if len(services) == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return errors.New("no services provided")
}
ctx, cancel := context.WithCancel(ctx)

// This will close all downstream connections when the server closes
Expand Down Expand Up @@ -96,38 +88,27 @@ func (s *JSONHTTPServer) startInternal(
}
if err != nil {
s.logger.Error("registering %T with grpc gateway failed with %v", svc, err)
return err
}
serviceCount++
}

close(started)
s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))

// At least one service must be enabled
if serviceCount == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return
lis, err := net.Listen("tcp", s.listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return err
}

s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))
s.setServer(&http.Server{
Addr: s.listener,
s.server = &http.Server{
Handler: mux,
}
s.grp.Go(func() error {
if err := s.server.Serve(lis); err != nil {
s.logger.Error("error from grpc http server: %v", err)
return err
}
return nil
})

// This will block
s.logger.Error("error from grpc http listener: %v", s.getServer().ListenAndServe())
}

func (s *JSONHTTPServer) getServer() *http.Server {
s.mu.RLock()
defer s.mu.RUnlock()

return s.server
}

func (s *JSONHTTPServer) setServer(server *http.Server) {
s.mu.Lock()
defer s.mu.Unlock()

s.server = server
return nil
}
6 changes: 6 additions & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type peerCounter interface {
PeerCount() uint64
}

// Peers is an api to get peer related info.
type peers interface {
ConnectedPeerInfo(p2p.Peer) *p2p.PeerInfo
GetPeers() []p2p.Peer
}

// genesisTimeAPI is an API to get genesis time and current layer of the system.
type genesisTimeAPI interface {
GenesisTime() time.Time
Expand Down

0 comments on commit c7100ec

Please sign in to comment.