Skip to content

Commit

Permalink
Try #4749:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Jul 26, 2023
2 parents a7e5aae + 8ec6f4c commit 54ad98a
Show file tree
Hide file tree
Showing 24 changed files with 222 additions and 305 deletions.
1 change: 0 additions & 1 deletion activation/nipost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func TestPostSetup(t *testing.T) {

func TestNIPostBuilderWithClients(t *testing.T) {
t.Parallel()
logtest.SetupGlobal(t)

challenge := types.NIPostChallenge{
PublishEpoch: postGenesisEpoch + 2,
Expand Down
10 changes: 5 additions & 5 deletions api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ import (
)

type activationService struct {
logger log.Logger
goldenAtx types.ATXID
atxProvider atxProvider
}

func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID) *activationService {
func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID, lg log.Logger) *activationService {
return &activationService{
logger: lg,
goldenAtx: goldenAtx,
atxProvider: atxProvider,
}
}

// RegisterService implements ServiceAPI.
func (s *activationService) RegisterService(server *Server) {
log.Info("registering GRPC Activation Service")
s.logger.Info("registering GRPC Activation Service")
pb.RegisterActivationServiceServer(server.GrpcServer, s)
}

Expand All @@ -38,11 +40,9 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p
}

atxId := types.ATXID(types.BytesToHash(request.Id))
logger := log.GetLogger().WithFields(log.Stringer("id", atxId))

atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
logger.With().Debug("failed to get the ATX", log.Err(err))
s.logger.With().Debug("failed to get the ATX", log.Err(err), log.Stringer("id", atxId))
return nil, status.Error(codes.NotFound, "id was not found")
}
return &pb.GetResponse{
Expand Down
13 changes: 7 additions & 6 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))

atxProvider.EXPECT().MaxHeightAtx().Return(types.EmptyATXID, errors.New("blah"))
response, err := activationService.Highest(context.Background(), &empty.Empty{})
Expand All @@ -39,7 +40,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))

atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
Expand Down Expand Up @@ -74,7 +75,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
func TestGet_RejectInvalidAtxID(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

_, err := activationService.Get(context.Background(), &pb.GetRequest{Id: []byte{1, 2, 3}})
require.Error(t, err)
Expand All @@ -84,7 +85,7 @@ func TestGet_RejectInvalidAtxID(t *testing.T) {
func TestGet_AtxNotPresent(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(nil, nil)
Expand All @@ -97,7 +98,7 @@ func TestGet_AtxNotPresent(t *testing.T) {
func TestGet_AtxProviderReturnsFailure(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(&types.VerifiedActivationTx{}, errors.New(""))
Expand All @@ -110,7 +111,7 @@ func TestGet_AtxProviderReturnsFailure(t *testing.T) {
func TestGet_HappyPath(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

id := types.RandomATXID()
atx := types.VerifiedActivationTx{
Expand Down
4 changes: 2 additions & 2 deletions api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ const (

// AdminService exposes endpoints for node administration.
type AdminService struct {
logger log.Log
logger log.Logger
db *sql.Database
dataDir string
}

// NewAdminService creates a new admin grpc service.
func NewAdminService(db *sql.Database, dataDir string, lg log.Log) *AdminService {
func NewAdminService(db *sql.Database, dataDir string, lg log.Logger) *AdminService {
return &AdminService{
logger: lg,
db: db,
Expand Down
2 changes: 0 additions & 2 deletions api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func createMesh(tb testing.TB, db *sql.Database) {
}

func TestAdminService_Checkpoint(t *testing.T) {
logtest.SetupGlobal(t)
db := sql.InMemory()
createMesh(t, db)
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
Expand Down Expand Up @@ -91,7 +90,6 @@ func TestAdminService_Checkpoint(t *testing.T) {
}

func TestAdminService_CheckpointError(t *testing.T) {
logtest.SetupGlobal(t)
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
t.Cleanup(launchServer(t, cfg, svc))
Expand Down
8 changes: 5 additions & 3 deletions api/grpcserver/debug_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// DebugService exposes global state data, output from the STF.
type DebugService struct {
db *sql.Database
logger log.Logger
conState conservativeState
identity networkIdentity
oracle oracle
Expand All @@ -32,9 +33,10 @@ func (d DebugService) RegisterService(server *Server) {
}

// NewDebugService creates a new grpc service using config data.
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle) *DebugService {
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle, lg log.Logger) *DebugService {
return &DebugService{
db: db,
logger: lg,
conState: conState,
identity: host,
oracle: oracle,
Expand All @@ -43,7 +45,7 @@ func NewDebugService(db *sql.Database, conState conservativeState, host networkI

// Accounts returns current counter and balance for all accounts.
func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
log.Info("GRPC DebugServices.Accounts")
d.logger.Info("GRPC DebugServices.Accounts")

var (
accts []*types.Account
Expand All @@ -55,7 +57,7 @@ func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.A
accts, err = accounts.Snapshot(d.db, types.LayerID(in.Layer))
}
if err != nil {
log.Error("Failed to get all accounts from state: %s", err)
d.logger.Error("Failed to get all accounts from state: %s", err)
return nil, status.Errorf(codes.Internal, "error fetching accounts state")
}

Expand Down
50 changes: 27 additions & 23 deletions api/grpcserver/globalstate_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

// GlobalStateService exposes global state data, output from the STF.
type GlobalStateService struct {
logger log.Logger
mesh meshAPI
conState conservativeState
}
Expand All @@ -26,16 +27,18 @@ func (s GlobalStateService) RegisterService(server *Server) {
}

// NewGlobalStateService creates a new grpc service using config data.
func NewGlobalStateService(msh meshAPI, conState conservativeState) *GlobalStateService {
func NewGlobalStateService(msh meshAPI, conState conservativeState, lg log.Logger) *GlobalStateService {
return &GlobalStateService{
logger: lg,
mesh: msh,
conState: conState,
}
}

// GlobalStateHash returns the latest layer and its computed global state hash.
func (s GlobalStateService) GlobalStateHash(context.Context, *pb.GlobalStateHashRequest) (*pb.GlobalStateHashResponse, error) {
log.Info("GRPC GlobalStateService.GlobalStateHash")
s.logger.Info("GRPC GlobalStateService.GlobalStateHash")

root, err := s.conState.GetStateRoot()
if err != nil {
return nil, err
Expand Down Expand Up @@ -71,7 +74,7 @@ func (s GlobalStateService) getAccount(addr types.Address) (acct *pb.Account, er

// Account returns current and projected counter and balance for one account.
func (s GlobalStateService) Account(_ context.Context, in *pb.AccountRequest) (*pb.AccountResponse, error) {
log.Info("GRPC GlobalStateService.Account")
s.logger.Info("GRPC GlobalStateService.Account")

if in.AccountId == nil {
return nil, status.Errorf(codes.InvalidArgument, "`AccountId` must be provided")
Expand All @@ -84,23 +87,24 @@ func (s GlobalStateService) Account(_ context.Context, in *pb.AccountRequest) (*
}
acct, err := s.getAccount(addr)
if err != nil {
log.With().Error("unable to fetch projected account state", log.Err(err))
s.logger.With().Error("unable to fetch projected account state", log.Err(err))
return nil, status.Errorf(codes.Internal, "error fetching projected account data")
}

log.With().Debug("GRPC GlobalStateService.Account",
s.logger.With().Debug("GRPC GlobalStateService.Account",
addr,
log.Uint64("balance", acct.StateCurrent.Balance.Value),
log.Uint64("counter", acct.StateCurrent.Counter),
log.Uint64("balance projected", acct.StateProjected.Balance.Value),
log.Uint64("counter projected", acct.StateProjected.Counter))
log.Uint64("counter projected", acct.StateProjected.Counter),
)

return &pb.AccountResponse{AccountWrapper: acct}, nil
}

// AccountDataQuery returns historical account data such as rewards and receipts.
func (s GlobalStateService) AccountDataQuery(_ context.Context, in *pb.AccountDataQueryRequest) (*pb.AccountDataQueryResponse, error) {
log.Info("GRPC GlobalStateService.AccountDataQuery")
s.logger.Info("GRPC GlobalStateService.AccountDataQuery")

if in.Filter == nil {
return nil, status.Errorf(codes.InvalidArgument, "`Filter` must be provided")
Expand Down Expand Up @@ -151,7 +155,7 @@ func (s GlobalStateService) AccountDataQuery(_ context.Context, in *pb.AccountDa
if filterAccount {
acct, err := s.getAccount(addr)
if err != nil {
log.With().Error("unable to fetch projected account state", log.Err(err))
s.logger.With().Error("unable to fetch projected account state", log.Err(err))
return nil, status.Errorf(codes.Internal, "error fetching projected account data")
}
res.AccountItem = append(res.AccountItem, &pb.AccountData{Datum: &pb.AccountData_AccountWrapper{
Expand Down Expand Up @@ -190,15 +194,15 @@ func (s GlobalStateService) AccountDataQuery(_ context.Context, in *pb.AccountDa

// SmesherDataQuery returns historical info on smesher rewards.
func (s GlobalStateService) SmesherDataQuery(_ context.Context, in *pb.SmesherDataQueryRequest) (*pb.SmesherDataQueryResponse, error) {
log.Info("DEPRECATED GRPC GlobalStateService.SmesherDataQuery")
s.logger.Info("DEPRECATED GRPC GlobalStateService.SmesherDataQuery")
return nil, status.Errorf(codes.Unimplemented, "DEPRECATED")
}

// STREAMS

// AccountDataStream exposes a stream of account-related data.
func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, stream pb.GlobalStateService_AccountDataStreamServer) error {
log.Info("GRPC GlobalStateService.AccountDataStream")
s.logger.Info("GRPC GlobalStateService.AccountDataStream")

if in.Filter == nil {
return status.Errorf(codes.InvalidArgument, "`Filter` must be provided")
Expand Down Expand Up @@ -242,10 +246,10 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s
for {
select {
case <-accountBufFull:
log.Info("account buffer is full, shutting down")
s.logger.Info("account buffer is full, shutting down")
return status.Error(codes.Canceled, errAccountBufferFull)
case <-rewardsBufFull:
log.Info("rewards buffer is full, shutting down")
s.logger.Info("rewards buffer is full, shutting down")
return status.Error(codes.Canceled, errRewardsBufferFull)
case updatedAccountEvent := <-accountCh:
// Apply address filter
Expand All @@ -255,7 +259,7 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s
// nonce.
acct, err := s.getAccount(addr)
if err != nil {
log.With().Error("unable to fetch projected account state", log.Err(err))
s.logger.With().Error("unable to fetch projected account state", log.Err(err))
return status.Errorf(codes.Internal, "error fetching projected account data")
}
resp := &pb.AccountDataStreamResponse{Datum: &pb.AccountData{Datum: &pb.AccountData_AccountWrapper{
Expand Down Expand Up @@ -306,7 +310,7 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s
}

case <-stream.Context().Done():
log.Info("AccountDataStream closing stream, client disconnected")
s.logger.Info("AccountDataStream closing stream, client disconnected")
return nil
}
// TODO: do we need an additional case here for a context to indicate
Expand All @@ -317,13 +321,13 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s

// SmesherRewardStream exposes a stream of smesher rewards.
func (s GlobalStateService) SmesherRewardStream(in *pb.SmesherRewardStreamRequest, stream pb.GlobalStateService_SmesherRewardStreamServer) error {
log.Info("DEPRECATED GRPC GlobalStateService.SmesherRewardStream")
s.logger.Info("DEPRECATED GRPC GlobalStateService.SmesherRewardStream")
return status.Errorf(codes.Unimplemented, "DEPRECATED")
}

// AppEventStream exposes a stream of emitted app events.
func (s GlobalStateService) AppEventStream(*pb.AppEventStreamRequest, pb.GlobalStateService_AppEventStreamServer) error {
log.Info("GRPC GlobalStateService.AppEventStream")
s.logger.Info("GRPC GlobalStateService.AppEventStream")

// TODO: implement me! We don't currently have any app events
// See https://github.com/spacemeshos/go-spacemesh/issues/2074
Expand All @@ -333,7 +337,7 @@ func (s GlobalStateService) AppEventStream(*pb.AppEventStreamRequest, pb.GlobalS

// GlobalStateStream exposes a stream of global data data items: rewards, receipts, account info, global state hash.
func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, stream pb.GlobalStateService_GlobalStateStreamServer) error {
log.Info("GRPC GlobalStateService.GlobalStateStream")
s.logger.Info("GRPC GlobalStateService.GlobalStateStream")

if in.GlobalStateDataFlags == uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_UNSPECIFIED) {
return status.Errorf(codes.InvalidArgument, "`GlobalStateDataFlags` must set at least one bitfield")
Expand Down Expand Up @@ -374,21 +378,21 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s
for {
select {
case <-accountBufFull:
log.Info("account buffer is full, shutting down")
s.logger.Info("account buffer is full, shutting down")
return status.Error(codes.Canceled, errAccountBufferFull)
case <-rewardsBufFull:
log.Info("rewards buffer is full, shutting down")
s.logger.Info("rewards buffer is full, shutting down")
return status.Error(codes.Canceled, errRewardsBufferFull)
case <-layersBufFull:
log.Info("layers buffer is full, shutting down")
s.logger.Info("layers buffer is full, shutting down")
return status.Error(codes.Canceled, errLayerBufferFull)
case updatedAccount := <-accountCh:
// The Reporter service just sends us the account address. We are responsible
// for looking up the other required data here. Get the account balance and
// nonce.
acct, err := s.getAccount(updatedAccount.Address)
if err != nil {
log.With().Error("unable to fetch projected account state", log.Err(err))
s.logger.With().Error("unable to fetch projected account state", log.Err(err))
return status.Errorf(codes.Internal, "error fetching projected account data")
}
resp := &pb.GlobalStateStreamResponse{Datum: &pb.GlobalStateData{Datum: &pb.GlobalStateData_AccountWrapper{
Expand Down Expand Up @@ -418,7 +422,7 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s
}
root, err := s.conState.GetLayerStateRoot(layer.LayerID)
if err != nil {
log.With().Warning("error retrieving layer data", log.Err(err))
s.logger.With().Warning("error retrieving layer data", log.Err(err))
root = types.Hash32{}
}
resp := &pb.GlobalStateStreamResponse{Datum: &pb.GlobalStateData{Datum: &pb.GlobalStateData_GlobalState{
Expand All @@ -431,7 +435,7 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s
return fmt.Errorf("send to stream: %w", err)
}
case <-stream.Context().Done():
log.Info("AccountDataStream closing stream, client disconnected")
s.logger.Info("AccountDataStream closing stream, client disconnected")
return nil
}
// TODO: do we need an additional case here for a context to indicate
Expand Down

0 comments on commit 54ad98a

Please sign in to comment.