Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Use ctxzap in GRPC services #4816

Closed
wants to merge 9 commits into from
23 changes: 11 additions & 12 deletions api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,31 @@
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

type activationService struct {
logger log.Logger
goldenAtx types.ATXID
atxProvider atxProvider
}

func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID, lg log.Logger) *activationService {
func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID) *activationService {
return &activationService{
logger: lg,
goldenAtx: goldenAtx,
atxProvider: atxProvider,
}
}

// RegisterService implements ServiceAPI.
func (s *activationService) RegisterService(server *Server) {
s.logger.Info("registering GRPC Activation Service")
pb.RegisterActivationServiceServer(server.GrpcServer, s)
}

Expand All @@ -45,18 +43,19 @@
atxId := types.ATXID(types.BytesToHash(request.Id))
atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
s.logger.With().Debug("failed to get ATX",
log.Stringer("atx id", atxId),
log.Err(err),
ctxzap.Debug(ctx, "failed to get ATX",
zap.Stringer("id", atxId),
zap.Error(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
proof, err := s.atxProvider.GetMalfeasanceProof(atx.SmesherID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
s.logger.With().Error("failed to get malfeasance proof",
log.Stringer("smesher", atx.SmesherID),
log.Stringer("id", atxId),
log.Err(err),
ctxzap.Error(ctx, "failed to get malfeasance proof",
zap.Stringer("smesher", atx.SmesherID),
zap.Stringer("smesher", atx.SmesherID),
zap.Stringer("id", atxId),
zap.Error(err),

Check warning on line 58 in api/grpcserver/activation_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/activation_service.go#L54-L58

Added lines #L54 - L58 were not covered by tests
)
return nil, status.Error(codes.NotFound, "id was not found")
}
Expand Down
15 changes: 7 additions & 8 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import (
"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)

atxProvider.EXPECT().MaxHeightAtx().Return(types.EmptyATXID, errors.New("blah"))
response, err := activationService.Highest(context.Background(), &empty.Empty{})
Expand All @@ -42,7 +41,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)

atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
Expand Down Expand Up @@ -77,7 +76,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
func TestGet_RejectInvalidAtxID(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

_, err := activationService.Get(context.Background(), &pb.GetRequest{Id: []byte{1, 2, 3}})
require.Error(t, err)
Expand All @@ -87,7 +86,7 @@ func TestGet_RejectInvalidAtxID(t *testing.T) {
func TestGet_AtxNotPresent(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(nil, nil)
Expand All @@ -100,7 +99,7 @@ func TestGet_AtxNotPresent(t *testing.T) {
func TestGet_AtxProviderReturnsFailure(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(&types.VerifiedActivationTx{}, errors.New(""))
Expand All @@ -113,7 +112,7 @@ func TestGet_AtxProviderReturnsFailure(t *testing.T) {
func TestGet_HappyPath(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atx := types.VerifiedActivationTx{
Expand Down Expand Up @@ -150,7 +149,7 @@ func TestGet_HappyPath(t *testing.T) {
func TestGet_IdentityCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

smesher, proof := grpcserver.BallotMalfeasance(t, sql.InMemory())
id := types.RandomATXID()
Expand Down
22 changes: 15 additions & 7 deletions api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
"fmt"
"io"
"os"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spf13/afero"
"google.golang.org/grpc/codes"
Expand All @@ -18,7 +20,6 @@
"github.com/spacemeshos/go-spacemesh/checkpoint"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

Expand All @@ -29,19 +30,25 @@

// AdminService exposes endpoints for node administration.
type AdminService struct {
logger log.Logger
db *sql.Database
dataDir string
recover func()
p peers
}

// NewAdminService creates a new admin grpc service.
func NewAdminService(db *sql.Database, dataDir string, lg log.Logger, p peers) *AdminService {
func NewAdminService(db *sql.Database, dataDir string, p peers) *AdminService {
return &AdminService{
logger: lg,
db: db,
dataDir: dataDir,
p: p,
recover: func() {
go func() {
// Allow time for the response to be sent.
time.Sleep(time.Second)
os.Exit(0)
}()

Check warning on line 49 in api/grpcserver/admin_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/admin_service.go#L45-L49

Added lines #L45 - L49 were not covered by tests
},
p: p,
}
}

Expand Down Expand Up @@ -95,8 +102,9 @@
}
}

func (a AdminService) Recover(_ context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
a.logger.Panic("going to recover from checkpoint")
func (a AdminService) Recover(ctx context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
ctxzap.Info(ctx, "going to recover from checkpoint")
a.recover()
return &empty.Empty{}, nil
}

Expand Down
24 changes: 21 additions & 3 deletions api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"io"
"sync/atomic"
"testing"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
Expand Down Expand Up @@ -56,7 +56,7 @@ func createMesh(tb testing.TB, db *sql.Database) {
func TestAdminService_Checkpoint(t *testing.T) {
db := sql.InMemory()
createMesh(t, db)
svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil)
svc := NewAdminService(db, t.TempDir(), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestAdminService_Checkpoint(t *testing.T) {

func TestAdminService_CheckpointError(t *testing.T) {
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil)
svc := NewAdminService(db, t.TempDir(), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -104,3 +104,21 @@ func TestAdminService_CheckpointError(t *testing.T) {
_, err = stream.Recv()
require.ErrorContains(t, err, sql.ErrNotFound.Error())
}

func TestAdminService_Recovery(t *testing.T) {
db := sql.InMemory()
recoveryCalled := atomic.Bool{}
svc := NewAdminService(db, t.TempDir(), nil)
svc.recover = func() { recoveryCalled.Store(true) }

t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg.PublicListener)
c := pb.NewAdminServiceClient(conn)

_, err := c.Recover(ctx, &pb.RecoverRequest{})
require.NoError(t, err)
require.True(t, recoveryCalled.Load())
}
13 changes: 5 additions & 8 deletions api/grpcserver/debug_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ import (
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
)

// DebugService exposes global state data, output from the STF.
type DebugService struct {
db *sql.Database
logger log.Logger
conState conservativeState
identity networkIdentity
oracle oracle
Expand All @@ -33,20 +33,17 @@ func (d DebugService) RegisterService(server *Server) {
}

// NewDebugService creates a new grpc service using config data.
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle, lg log.Logger) *DebugService {
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle) *DebugService {
return &DebugService{
db: db,
logger: lg,
conState: conState,
identity: host,
oracle: oracle,
}
}

// Accounts returns current counter and balance for all accounts.
func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
d.logger.Info("GRPC DebugServices.Accounts")

func (d DebugService) Accounts(ctx context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
var (
accts []*types.Account
err error
Expand All @@ -57,7 +54,7 @@ func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.A
accts, err = accounts.Snapshot(d.db, types.LayerID(in.Layer))
}
if err != nil {
d.logger.Error("Failed to get all accounts from state: %s", err)
ctxzap.Error(ctx, " Failed to get all accounts from state", zap.Error(err))
return nil, status.Errorf(codes.Internal, "error fetching accounts state")
}

Expand Down