Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - serve malfeasance proof over grpc #4851

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 22 additions & 3 deletions api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"errors"
"fmt"

"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -10,7 +11,9 @@
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

type activationService struct {
Expand Down Expand Up @@ -42,12 +45,28 @@
atxId := types.ATXID(types.BytesToHash(request.Id))
atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
s.logger.With().Debug("failed to get the ATX", log.Err(err), log.Stringer("id", atxId))
s.logger.With().Debug("failed to get ATX",
log.Stringer("atx id", atxId),
log.Err(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
return &pb.GetResponse{
proof, err := s.atxProvider.GetMalfeasanceProof(atx.SmesherID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
s.logger.With().Error("failed to get malfeasance proof",
log.Stringer("smesher", atx.SmesherID),
log.Stringer("id", atxId),
log.Err(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}

Check warning on line 62 in api/grpcserver/activation_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/activation_service.go#L56-L62

Added lines #L56 - L62 were not covered by tests
resp := &pb.GetResponse{
Atx: convertActivation(atx),
}, nil
}
if proof != nil {
resp.MalfeasanceProof = events.ToMalfeasancePB(atx.SmesherID, proof, false)
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
return resp, nil
}

func (s *activationService) Highest(ctx context.Context, req *empty.Empty) (*pb.HighestResponse, error) {
Expand Down
43 changes: 43 additions & 0 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestGet_HappyPath(t *testing.T) {
}
atx.SetID(id)
atxProvider.EXPECT().GetFullAtx(id).Return(&atx, nil)
atxProvider.EXPECT().GetMalfeasanceProof(gomock.Any()).Return(nil, sql.ErrNotFound)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
require.NoError(t, err)
Expand All @@ -141,4 +144,44 @@ func TestGet_HappyPath(t *testing.T) {
require.Equal(t, atx.PrevATXID.Bytes(), response.Atx.PrevAtx.Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Nil(t, response.MalfeasanceProof)
}

func TestGet_IdentityCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

smesher, proof := grpcserver.BallotMalfeasance(t, sql.InMemory())
id := types.RandomATXID()
atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
Sequence: rand.Uint64(),
PrevATXID: types.RandomATXID(),
PublishEpoch: 0,
PositioningATX: types.RandomATXID(),
},
Coinbase: types.GenerateAddress(types.RandomBytes(32)),
NumUnits: rand.Uint32(),
},
SmesherID: smesher,
},
}
atx.SetID(id)
atxProvider.EXPECT().GetFullAtx(id).Return(&atx, nil)
atxProvider.EXPECT().GetMalfeasanceProof(smesher).Return(proof, nil)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
require.NoError(t, err)

require.Equal(t, atx.ID().Bytes(), response.Atx.Id.Id)
require.Equal(t, atx.PublishEpoch.Uint32(), response.Atx.Layer.Number)
require.Equal(t, atx.SmesherID.Bytes(), response.Atx.SmesherId.Id)
require.Equal(t, atx.Coinbase.String(), response.Atx.Coinbase.Address)
require.Equal(t, atx.PrevATXID.Bytes(), response.Atx.PrevAtx.Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Equal(t, events.ToMalfeasancePB(smesher, proof, false), response.MalfeasanceProof)
}
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type txValidator interface {
type atxProvider interface {
GetFullAtx(id types.ATXID) (*types.VerifiedActivationTx, error)
MaxHeightAtx() (types.ATXID, error)
GetMalfeasanceProof(id types.NodeID) (*types.MalfeasanceProof, error)
}

type postSetupProvider interface {
Expand Down
60 changes: 60 additions & 0 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

// MeshService exposes mesh data such as accounts, blocks, and transactions.
Expand Down Expand Up @@ -610,3 +614,59 @@
)
return nil
}

func (s MeshService) MalfeasanceQuery(ctx context.Context, req *pb.MalfeasanceRequest) (*pb.MalfeasanceResponse, error) {
parsed, err := hex.DecodeString(req.SmesherHex)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

Check warning on line 622 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L621-L622

Added lines #L621 - L622 were not covered by tests
id := types.BytesToNodeID(parsed)
fasmat marked this conversation as resolved.
Show resolved Hide resolved
proof, err := s.cdb.GetMalfeasanceProof(id)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}

Check warning on line 627 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L626-L627

Added lines #L626 - L627 were not covered by tests
return &pb.MalfeasanceResponse{
Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof),
}, nil
}

func (s MeshService) MalfeasanceStream(req *pb.MalfeasanceStreamRequest, stream pb.MeshService_MalfeasanceStreamServer) error {
sub := events.SubscribeMalfeasance()
if sub == nil {
return status.Errorf(codes.FailedPrecondition, "event reporting is not enabled")
}

Check warning on line 637 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L636-L637

Added lines #L636 - L637 were not covered by tests
eventch, fullch := consumeEvents[events.EventMalfeasance](stream.Context(), sub)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT (and unrelated to this PR):

I don't think it makes sense to have eventch and fullch as return value in consumeEvents if the buffer in eventch runs full, the listener doesn't read from it. Why should the listener read from fullch?

The buffer is also huge -> 32768 events. So if fullch is closed the listener hasn't read events for quite some time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see this more as a safeguard to stopgap a programming error from OOM e.g. doing something blocking when processing an event. not sure about the history here.

Copy link
Member

@fasmat fasmat Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I am not sure if we are doing it correctly. I see 2 possible approaches:

  • Events can be dropped / there is no guarantee that every event will be sent to every consumer. This is the "safe" approach since busy consumers won't cause the producer to hang because they are not listening
  • Events cannot be dropped / every event is guaranteed to be sent to every consumer. This way a malfunctioning consumer might cause the producer to get stuck because it cannot send more events.

Both approaches can give some leeway to busy consumers by using buffered channels. Here it looks like the first approach was taken (if the consumer isn't listening eventually the producer will stop sending). Just that the consumer will also not handle the events any more that were buffered for it when it becomes ready again. Additionally his happens not immediately: the channels are read in the same select statement, so some events might still be processed before the consumer gets the message that no events will be coming any more and stops reading from eventch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other stream APIs are implemented this way as well. if you feel strongly about changing the behavior, maybe file an issue for what you want to fix?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea is that stream subscriber should never be able to block consensus code and we shouldn't silently drop events either. so once we see that subscriber is not reading fast enough error is returned from api and channel is unregistered

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand. I think there is an issue on the consumer side with this approach where overflows of the eventch are not handled deterministically:

Both eventch and fullch are always read from in the same select statement (not just here). If fullch is closed the cosumer might read anything between 0 to 10 more events (more than 10 become extremely unlikely) from eventch before they actually stop processing events.

if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")
}

Check warning on line 641 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L640-L641

Added lines #L640 - L641 were not covered by tests

// first serve those already existed locally.
if err := s.cdb.IterateMalfeasanceProofs(func(id types.NodeID, mp *types.MalfeasanceProof) error {
select {
case <-stream.Context().Done():
return nil

Check warning on line 647 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L646-L647

Added lines #L646 - L647 were not covered by tests
default:
res := &pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(id, mp, req.IncludeProof),
}
return stream.Send(res)
}
}); err != nil {
return status.Error(codes.Internal, err.Error())
}

Check warning on line 656 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L654-L656

Added lines #L654 - L656 were not covered by tests

for {
select {
case <-stream.Context().Done():
return nil
case <-fullch:
return status.Errorf(codes.Canceled, "buffer is full")

Check warning on line 663 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L662-L663

Added lines #L662 - L663 were not covered by tests
case ev := <-eventch:
fasmat marked this conversation as resolved.
Show resolved Hide resolved
if err := stream.Send(&pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(ev.Smesher, ev.Proof, req.IncludeProof),
}); err != nil {
return status.Error(codes.Internal, fmt.Errorf("send to stream: %w", err).Error())
}

Check warning on line 669 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L668-L669

Added lines #L668 - L669 were not covered by tests
}
}
}