Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - api: exclude malicious atxs from epoch atx stream #4776

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 46 additions & 15 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/spacemeshos/go-spacemesh/activation"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
vm "github.com/spacemeshos/go-spacemesh/genvm"
"github.com/spacemeshos/go-spacemesh/genvm/sdk"
Expand All @@ -49,6 +50,8 @@ import (
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/txs"
)
Expand Down Expand Up @@ -305,10 +308,6 @@ func (m *MeshAPIMock) GetLayer(tid types.LayerID) (*types.Layer, error) {
return types.NewExistingLayer(tid, ballots, blocks), nil
}

func (m *MeshAPIMock) EpochAtxs(types.EpochID) ([]types.ATXID, error) {
return types.RandomActiveSet(activesetSize), nil
}

func (m *MeshAPIMock) GetATXs(context.Context, []types.ATXID) (map[types.ATXID]*types.VerifiedActivationTx, []types.ATXID) {
atxs := map[types.ATXID]*types.VerifiedActivationTx{
globalAtx.ID(): globalAtx,
Expand Down Expand Up @@ -1051,7 +1050,7 @@ func TestMeshService(t *testing.T) {
genesis := time.Unix(genTimeUnix, 0)
genTime.EXPECT().GenesisTime().Return(genesis)
genTime.EXPECT().CurrentLayer().Return(layerCurrent).AnyTimes()
grpcService := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -1997,7 +1996,7 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) {

ctrl := gomock.NewController(t)
genTime := NewMockgenesisTimeAPI(ctrl)
grpcService := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -2171,7 +2170,7 @@ func TestLayerStream_comprehensive(t *testing.T) {

ctrl := gomock.NewController(t)
genTime := NewMockgenesisTimeAPI(ctrl)
grpcService := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -2313,7 +2312,7 @@ func TestMultiService(t *testing.T) {
genesis := time.Unix(genTimeUnix, 0)
genTime.EXPECT().GenesisTime().Return(genesis)
svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe", logtest.New(t).WithName("grpc.Node"))
svc2 := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
shutDown := launchServer(t, cfg, svc1, svc2)
t.Cleanup(shutDown)

Expand Down Expand Up @@ -2368,7 +2367,7 @@ func TestJsonApi(t *testing.T) {
genesis := time.Unix(genTimeUnix, 0)
genTime.EXPECT().GenesisTime().Return(genesis)
svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe", logtest.New(t).WithName("grpc.Node"))
svc2 := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, svc1, svc2))
time.Sleep(time.Second)

Expand Down Expand Up @@ -2712,26 +2711,58 @@ func TestVMAccountUpdates(t *testing.T) {
require.Equal(t, len(accounts), i)
}

func createAtxs(tb testing.TB, epoch types.EpochID, atxids []types.ATXID) []*types.VerifiedActivationTx {
all := make([]*types.VerifiedActivationTx, 0, len(atxids))
for _, id := range atxids {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: epoch,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
atx.SmesherID = types.RandomNodeID()
vAtx, err := atx.Verify(0, 1)
require.NoError(tb, err)
all = append(all, vAtx)
}
return all
}

func TestMeshService_EpochStream(t *testing.T) {
ctrl := gomock.NewController(t)
genTime := NewMockgenesisTimeAPI(ctrl)
srv := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
db := sql.InMemory()
srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, srv))

epoch := types.EpochID(3)
atxids := types.RandomActiveSet(100)
all := createAtxs(t, epoch, atxids)
var expected, got []types.ATXID
for i, vatx := range all {
require.NoError(t, atxs.Add(db, vatx))
if i%2 == 0 {
require.NoError(t, identities.SetMalicious(db, vatx.SmesherID, []byte("bad"), time.Now()))
} else {
expected = append(expected, vatx.ID())
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg.PublicListener)
client := pb.NewMeshServiceClient(conn)

stream, err := client.EpochStream(ctx, &pb.EpochStreamRequest{Epoch: 3})
stream, err := client.EpochStream(ctx, &pb.EpochStreamRequest{Epoch: epoch.Uint32()})
require.NoError(t, err)
var total int
for {
_, err = stream.Recv()
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
total++
got = append(got, types.ATXID(types.BytesToHash(resp.GetId().GetId())))
}
require.Equal(t, activesetSize, total)
require.ElementsMatch(t, expected, got)
}
1 change: 0 additions & 1 deletion api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type genesisTimeAPI interface {

// meshAPI is an api for getting mesh status about layers/blocks/rewards.
type meshAPI interface {
EpochAtxs(types.EpochID) ([]types.ATXID, error)
GetATXs(context.Context, []types.ATXID) (map[types.ATXID]*types.VerifiedActivationTx, []types.ATXID)
GetLayer(types.LayerID) (*types.Layer, error)
GetRewards(types.Address) ([]*types.Reward, error)
Expand Down
39 changes: 26 additions & 13 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
)

// MeshService exposes mesh data such as accounts, blocks, and transactions.
type MeshService struct {
logger log.Logger
cdb *datastore.CachedDB
mesh meshAPI // Mesh
conState conservativeState
genTime genesisTimeAPI
Expand All @@ -34,12 +36,20 @@

// NewMeshService creates a new service using config data.
func NewMeshService(
msh meshAPI, cstate conservativeState, genTime genesisTimeAPI,
layersPerEpoch uint32, genesisID types.Hash20, layerDuration time.Duration,
layerAvgSize, txsPerProposal uint32, lg log.Logger,
cdb *datastore.CachedDB,
msh meshAPI,
cstate conservativeState,
genTime genesisTimeAPI,
layersPerEpoch uint32,
genesisID types.Hash20,
layerDuration time.Duration,
layerAvgSize,
txsPerProposal uint32,
lg log.Logger,
) *MeshService {
return &MeshService{
logger: lg,
cdb: cdb,
mesh: msh,
conState: cstate,
genTime: genTime,
Expand All @@ -63,7 +73,7 @@
func (s MeshService) CurrentLayer(context.Context, *pb.CurrentLayerRequest) (*pb.CurrentLayerResponse, error) {
s.logger.Info("GRPC MeshService.CurrentLayer")
return &pb.CurrentLayerResponse{Layernum: &pb.LayerNumber{
Number: uint32(s.genTime.CurrentLayer().Uint32()),
Number: s.genTime.CurrentLayer().Uint32(),
}}, nil
}

Expand Down Expand Up @@ -571,21 +581,24 @@

func (s MeshService) EpochStream(req *pb.EpochStreamRequest, stream pb.MeshService_EpochStreamServer) error {
epoch := types.EpochID(req.Epoch)
atxids, err := s.mesh.EpochAtxs(epoch)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
for _, id := range atxids {
if err := s.cdb.IterateEpochATXHeaders(epoch+1, func(header *types.ActivationTxHeader) error {
select {
case <-stream.Context().Done():
return nil
default:
var res pb.EpochStreamResponse
res.Id = &pb.ActivationId{Id: id.Bytes()}
if err = stream.Send(&res); err != nil {
return status.Error(codes.Internal, err.Error())
malicious, err := s.cdb.IsMalicious(header.NodeID)
if err != nil {
return err

Check warning on line 591 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L591

Added line #L591 was not covered by tests
}
if malicious {
return nil
}
var res pb.EpochStreamResponse
res.Id = &pb.ActivationId{Id: header.ID.Bytes()}
return stream.Send(&res)
}
}); err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 601 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L600-L601

Added lines #L600 - L601 were not covered by tests
}
return nil
}
15 changes: 0 additions & 15 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 26 additions & 6 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/bootstrap"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -110,14 +113,29 @@ func (m *MeshAPIMock) GetATXs(context.Context, []types.ATXID) (map[types.ATXID]*
panic("not implemented")
}
func (m *MeshAPIMock) MeshHash(types.LayerID) (types.Hash32, error) { panic("not implemented") }
func (m *MeshAPIMock) EpochAtxs(types.EpochID) ([]types.ATXID, error) {
return types.RandomActiveSet(activeSetSize), nil

func createAtxs(tb testing.TB, db sql.Executor, epoch types.EpochID, atxids []types.ATXID) {
for _, id := range atxids {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: epoch,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
atx.SmesherID = types.RandomNodeID()
vAtx, err := atx.Verify(0, 1)
require.NoError(tb, err)
require.NoError(tb, atxs.Add(db, vAtx))
}
}

func launchServer(tb testing.TB) func() {
func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() {
grpcService := grpcserver.New(fmt.Sprintf("127.0.0.1:%d", grpcPort), logtest.New(tb).Named("grpc"))
jsonService := grpcserver.NewJSONHTTPServer(fmt.Sprintf("127.0.0.1:%d", jsonport), logtest.New(tb).WithName("grpc.JSON"))
s := grpcserver.NewMeshService(&MeshAPIMock{}, nil, nil, 0, types.Hash20{}, 0, 0, 0, logtest.New(tb).WithName("grpc.Mesh"))
s := grpcserver.NewMeshService(cdb, &MeshAPIMock{}, nil, nil, 0, types.Hash20{}, 0, 0, 0, logtest.New(tb).WithName("grpc.Mesh"))

pb.RegisterMeshServiceServer(grpcService.GrpcServer, s)
// start gRPC and json servers
Expand Down Expand Up @@ -152,7 +170,10 @@ func verifyUpdate(t *testing.T, data []byte, epoch types.EpochID, expBeacon stri
}

func TestGenerator_Generate(t *testing.T) {
t.Cleanup(launchServer(t))
targetEpoch := types.EpochID(3)
db := sql.InMemory()
createAtxs(t, db, targetEpoch-1, types.RandomActiveSet(activeSetSize))
t.Cleanup(launchServer(t, datastore.NewCachedDB(db, logtest.New(t))))

for _, tc := range []struct {
desc string
Expand Down Expand Up @@ -198,7 +219,6 @@ func TestGenerator_Generate(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
targetEpoch := types.EpochID(3)
persisted, err := g.Generate(ctx, targetEpoch, tc.beacon, tc.actives)
require.NoError(t, err)

Expand Down
6 changes: 5 additions & 1 deletion cmd/bootstrapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (

"github.com/spacemeshos/go-spacemesh/bootstrap"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

const checkpointdata = `{
Expand Down Expand Up @@ -95,7 +97,8 @@ func updateCheckpoint(t *testing.T, ctx context.Context, data string) {
}

func TestServer(t *testing.T) {
t.Cleanup(launchServer(t))
db := sql.InMemory()
t.Cleanup(launchServer(t, datastore.NewCachedDB(db, logtest.New(t))))

fs := afero.NewMemMapFs()
g := NewGenerator(
Expand Down Expand Up @@ -123,6 +126,7 @@ func TestServer(t *testing.T) {
srv.Start(ctx, ch, np)

for _, epoch := range epochs {
createAtxs(t, db, epoch-1, types.RandomActiveSet(activeSetSize))
fname := PersistedFilename(epoch, bootstrap.SuffixBoostrap)
require.Eventually(t, func() bool {
_, err := fs.Stat(fname)
Expand Down
5 changes: 0 additions & 5 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/spacemeshos/go-spacemesh/hash"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
Expand Down Expand Up @@ -616,10 +615,6 @@ func (msh *Mesh) GetATXs(ctx context.Context, atxIds []types.ATXID) (map[types.A
return atxs, mIds
}

func (msh *Mesh) EpochAtxs(epoch types.EpochID) ([]types.ATXID, error) {
return atxs.GetIDsByEpoch(msh.cdb, epoch)
}

// GetRewards retrieves account's rewards by the coinbase address.
func (msh *Mesh) GetRewards(coinbase types.Address) ([]*types.Reward, error) {
return rewards.List(msh.cdb, coinbase)
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ func (app *App) initService(ctx context.Context, svc grpcserver.Service) (grpcse
case grpcserver.GlobalState:
return grpcserver.NewGlobalStateService(app.mesh, app.conState, app.log.WithName("grpc.GlobalState")), nil
case grpcserver.Mesh:
return grpcserver.NewMeshService(app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal), app.log.WithName("grpc.Mesh")), nil
return grpcserver.NewMeshService(app.cachedDB, app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal), app.log.WithName("grpc.Mesh")), nil
case grpcserver.Node:
return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit, app.log.WithName("grpc.Node")), nil
case grpcserver.Admin:
Expand Down