Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Use ctxzap in GRPC services #4816

Closed
wants to merge 9 commits into from
10 changes: 4 additions & 6 deletions api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,29 @@ import (
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
)

type activationService struct {
logger log.Logger
goldenAtx types.ATXID
atxProvider atxProvider
}

func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID, lg log.Logger) *activationService {
func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID) *activationService {
return &activationService{
logger: lg,
goldenAtx: goldenAtx,
atxProvider: atxProvider,
}
}

// RegisterService implements ServiceAPI.
func (s *activationService) RegisterService(server *Server) {
s.logger.Info("registering GRPC Activation Service")
pb.RegisterActivationServiceServer(server.GrpcServer, s)
}

Expand All @@ -42,7 +40,7 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p
atxId := types.ATXID(types.BytesToHash(request.Id))
atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
s.logger.With().Debug("failed to get the ATX", log.Err(err), log.Stringer("id", atxId))
ctxzap.Debug(ctx, "failed to get the ATX", zap.Error(err), zap.Stringer("id", atxId))
return nil, status.Error(codes.NotFound, "id was not found")
}
return &pb.GetResponse{
Expand Down
13 changes: 6 additions & 7 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import (

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)

atxProvider.EXPECT().MaxHeightAtx().Return(types.EmptyATXID, errors.New("blah"))
response, err := activationService.Highest(context.Background(), &empty.Empty{})
Expand All @@ -40,7 +39,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)

atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
Expand Down Expand Up @@ -75,7 +74,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
func TestGet_RejectInvalidAtxID(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

_, err := activationService.Get(context.Background(), &pb.GetRequest{Id: []byte{1, 2, 3}})
require.Error(t, err)
Expand All @@ -85,7 +84,7 @@ func TestGet_RejectInvalidAtxID(t *testing.T) {
func TestGet_AtxNotPresent(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(nil, nil)
Expand All @@ -98,7 +97,7 @@ func TestGet_AtxNotPresent(t *testing.T) {
func TestGet_AtxProviderReturnsFailure(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(&types.VerifiedActivationTx{}, errors.New(""))
Expand All @@ -111,7 +110,7 @@ func TestGet_AtxProviderReturnsFailure(t *testing.T) {
func TestGet_HappyPath(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atx := types.VerifiedActivationTx{
Expand Down
15 changes: 9 additions & 6 deletions api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spf13/afero"
"google.golang.org/grpc/codes"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/spacemeshos/go-spacemesh/checkpoint"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

Expand All @@ -26,19 +26,21 @@ const (
defaultNumAtxs = 4
)

type RecoverFunc func(context.Context)

// AdminService exposes endpoints for node administration.
type AdminService struct {
logger log.Logger
db *sql.Database
dataDir string
recover RecoverFunc
}

// NewAdminService creates a new admin grpc service.
func NewAdminService(db *sql.Database, dataDir string, lg log.Logger) *AdminService {
func NewAdminService(db *sql.Database, dataDir string, recover RecoverFunc) *AdminService {
return &AdminService{
logger: lg,
db: db,
dataDir: dataDir,
recover: recover,
}
}

Expand Down Expand Up @@ -92,8 +94,9 @@ func (a AdminService) CheckpointStream(req *pb.CheckpointStreamRequest, stream p
}
}

func (a AdminService) Recover(_ context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
a.logger.Panic("going to recover from checkpoint")
func (a AdminService) Recover(ctx context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
ctxzap.Info(ctx, "going to recover from checkpoint")
a.recover(ctx)
return &empty.Empty{}, nil
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
25 changes: 22 additions & 3 deletions api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"io"
"sync/atomic"
"testing"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
Expand Down Expand Up @@ -56,7 +56,7 @@ func createMesh(tb testing.TB, db *sql.Database) {
func TestAdminService_Checkpoint(t *testing.T) {
db := sql.InMemory()
createMesh(t, db)
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
svc := NewAdminService(db, t.TempDir(), func(context.Context) {})
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestAdminService_Checkpoint(t *testing.T) {

func TestAdminService_CheckpointError(t *testing.T) {
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
svc := NewAdminService(db, t.TempDir(), func(context.Context) {})
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -104,3 +104,22 @@ func TestAdminService_CheckpointError(t *testing.T) {
_, err = stream.Recv()
require.ErrorContains(t, err, sql.ErrNotFound.Error())
}

func TestAdminService_Recovery(t *testing.T) {
db := sql.InMemory()
recoveryCalled := atomic.Bool{}
svc := NewAdminService(db, t.TempDir(), func(context.Context) {
recoveryCalled.Store(true)
})
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg.PublicListener)
c := pb.NewAdminServiceClient(conn)

_, err := c.Recover(ctx, &pb.RecoverRequest{})
require.NoError(t, err)

require.Eventually(t, func() bool { return recoveryCalled.Load() }, 5*time.Second, 100*time.Millisecond)
}
13 changes: 5 additions & 8 deletions api/grpcserver/debug_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ import (
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
)

// DebugService exposes global state data, output from the STF.
type DebugService struct {
db *sql.Database
logger log.Logger
conState conservativeState
identity networkIdentity
oracle oracle
Expand All @@ -33,20 +33,17 @@ func (d DebugService) RegisterService(server *Server) {
}

// NewDebugService creates a new grpc service using config data.
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle, lg log.Logger) *DebugService {
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle) *DebugService {
return &DebugService{
db: db,
logger: lg,
conState: conState,
identity: host,
oracle: oracle,
}
}

// Accounts returns current counter and balance for all accounts.
func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
d.logger.Info("GRPC DebugServices.Accounts")

func (d DebugService) Accounts(ctx context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
var (
accts []*types.Account
err error
Expand All @@ -57,7 +54,7 @@ func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.A
accts, err = accounts.Snapshot(d.db, types.LayerID(in.Layer))
}
if err != nil {
d.logger.Error("Failed to get all accounts from state: %s", err)
ctxzap.Error(ctx, " Failed to get all accounts from state", zap.Error(err))
return nil, status.Errorf(codes.Internal, "error fetching accounts state")
}

Expand Down