Skip to content

Commit

Permalink
serve malfeasance proof over grpc (#4851)
Browse files Browse the repository at this point in the history
## Motivation
as title
depends on spacemeshos/api#259

## Changes
- add MeshService.MalfeasanceStream and MeshService:MalfeasanceQuery
- add malfeasance proof to ActivationService.Get
- add own malfeasance proof to node event update

## output
```
{
  "proof": {
    "smesherId": {
      "id": "8JhffQsAWuCBQ8YRWrLpOj8tesNx/SKV611TDbPnXCA="
    },
    "layer": {
      "number": 8112
    },
    "kind": "MALFEASANCE_HARE",
    "debugInfo": "generate layer: 8112\nsmesher id: f0985f7d0b005ae08143c6115ab2e93a3f2d7ac371fd2295eb5d530db3e75c20\ncause: smesher published multiple hare messages in layer 8112 round 3\n1st message hash: 9c40b8789bad681d25081f85af7f9a4af87c5d70ac0637934b4e6fc935b9566c\n1st message signature: efdb895917b2303442a3b0c8f455c5cea79f8a2969af2c7f5b3bbbbfa09a77441e3d886e049f39dbf4dc0d49e78537fe3035de2d25d9d295f02c4928677ea107\n2nd message hash: d005c8ff6ece20cb87acc7ea97d5473559a3e197f9a66621505383132515a2ae\n2nd message signature: f3c014fd0f1353896222f22ee71b7fb2a10623394d4905b8ea447ac2e45350e487de04df71448cb19012b897728bb8580499b3e16a5edb546333b432a927d301\n"
  }
}
```
  • Loading branch information
countvonzero committed Aug 17, 2023
1 parent 6518d2b commit 04b9597
Show file tree
Hide file tree
Showing 16 changed files with 561 additions and 35 deletions.
25 changes: 22 additions & 3 deletions api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpcserver

import (
"context"
"errors"
"fmt"

"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -10,7 +11,9 @@ import (
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

type activationService struct {
Expand Down Expand Up @@ -42,12 +45,28 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p
atxId := types.ATXID(types.BytesToHash(request.Id))
atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
s.logger.With().Debug("failed to get the ATX", log.Err(err), log.Stringer("id", atxId))
s.logger.With().Debug("failed to get ATX",
log.Stringer("atx id", atxId),
log.Err(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
return &pb.GetResponse{
proof, err := s.atxProvider.GetMalfeasanceProof(atx.SmesherID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
s.logger.With().Error("failed to get malfeasance proof",
log.Stringer("smesher", atx.SmesherID),
log.Stringer("id", atxId),
log.Err(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
resp := &pb.GetResponse{
Atx: convertActivation(atx),
}, nil
}
if proof != nil {
resp.MalfeasanceProof = events.ToMalfeasancePB(atx.SmesherID, proof, false)
}
return resp, nil
}

func (s *activationService) Highest(ctx context.Context, req *empty.Empty) (*pb.HighestResponse, error) {
Expand Down
43 changes: 43 additions & 0 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestGet_HappyPath(t *testing.T) {
}
atx.SetID(id)
atxProvider.EXPECT().GetFullAtx(id).Return(&atx, nil)
atxProvider.EXPECT().GetMalfeasanceProof(gomock.Any()).Return(nil, sql.ErrNotFound)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
require.NoError(t, err)
Expand All @@ -141,4 +144,44 @@ func TestGet_HappyPath(t *testing.T) {
require.Equal(t, atx.PrevATXID.Bytes(), response.Atx.PrevAtx.Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Nil(t, response.MalfeasanceProof)
}

func TestGet_IdentityCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

smesher, proof := grpcserver.BallotMalfeasance(t, sql.InMemory())
id := types.RandomATXID()
atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
Sequence: rand.Uint64(),
PrevATXID: types.RandomATXID(),
PublishEpoch: 0,
PositioningATX: types.RandomATXID(),
},
Coinbase: types.GenerateAddress(types.RandomBytes(32)),
NumUnits: rand.Uint32(),
},
SmesherID: smesher,
},
}
atx.SetID(id)
atxProvider.EXPECT().GetFullAtx(id).Return(&atx, nil)
atxProvider.EXPECT().GetMalfeasanceProof(smesher).Return(proof, nil)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
require.NoError(t, err)

require.Equal(t, atx.ID().Bytes(), response.Atx.Id.Id)
require.Equal(t, atx.PublishEpoch.Uint32(), response.Atx.Layer.Number)
require.Equal(t, atx.SmesherID.Bytes(), response.Atx.SmesherId.Id)
require.Equal(t, atx.Coinbase.String(), response.Atx.Coinbase.Address)
require.Equal(t, atx.PrevATXID.Bytes(), response.Atx.PrevAtx.Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Equal(t, events.ToMalfeasancePB(smesher, proof, false), response.MalfeasanceProof)
}
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type txValidator interface {
type atxProvider interface {
GetFullAtx(id types.ATXID) (*types.VerifiedActivationTx, error)
MaxHeightAtx() (types.ATXID, error)
GetMalfeasanceProof(id types.NodeID) (*types.MalfeasanceProof, error)
}

type postSetupProvider interface {
Expand Down
64 changes: 64 additions & 0 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package grpcserver

import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

// MeshService exposes mesh data such as accounts, blocks, and transactions.
Expand Down Expand Up @@ -610,3 +614,63 @@ func (s MeshService) EpochStream(req *pb.EpochStreamRequest, stream pb.MeshServi
)
return nil
}

func (s MeshService) MalfeasanceQuery(ctx context.Context, req *pb.MalfeasanceRequest) (*pb.MalfeasanceResponse, error) {
parsed, err := hex.DecodeString(req.SmesherHex)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if l := len(parsed); l != types.NodeIDSize {
return nil, status.Error(codes.InvalidArgument,
fmt.Sprintf("invalid smesher id length (%d), expected (%d)", l, types.NodeIDSize))
}
id := types.BytesToNodeID(parsed)
proof, err := s.cdb.GetMalfeasanceProof(id)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.MalfeasanceResponse{
Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof),
}, nil
}

func (s MeshService) MalfeasanceStream(req *pb.MalfeasanceStreamRequest, stream pb.MeshService_MalfeasanceStreamServer) error {
sub := events.SubscribeMalfeasance()
if sub == nil {
return status.Errorf(codes.FailedPrecondition, "event reporting is not enabled")
}
eventch, fullch := consumeEvents[events.EventMalfeasance](stream.Context(), sub)
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")
}

// first serve those already existed locally.
if err := s.cdb.IterateMalfeasanceProofs(func(id types.NodeID, mp *types.MalfeasanceProof) error {
select {
case <-stream.Context().Done():
return nil
default:
res := &pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(id, mp, req.IncludeProof),
}
return stream.Send(res)
}
}); err != nil {
return status.Error(codes.Internal, err.Error())
}

for {
select {
case <-stream.Context().Done():
return nil
case <-fullch:
return status.Errorf(codes.Canceled, "buffer is full")
case ev := <-eventch:
if err := stream.Send(&pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(ev.Smesher, ev.Proof, req.IncludeProof),
}); err != nil {
return status.Error(codes.Internal, fmt.Errorf("send to stream: %w", err).Error())
}
}
}
}

0 comments on commit 04b9597

Please sign in to comment.