Skip to content

Commit

Permalink
ballots: request active set if reference ballot was submitted with em…
Browse files Browse the repository at this point in the history
…pty active set (#4956)

closes: #4672

starting from layer X reference ballots will be gossiped with empty active sets.
any updated node will request active set by its hash from the ballot sender, and save it in a separate table. unless  it wasn't saved before, in that case we won't download anything.

reference ballot will be stored with full active set, so that sync protocol remains compatible with nodes that didn't upgrade. for them reference ballot without active set will look syntactically invalid. but they will be able to download it later and not get stuck this way.
  • Loading branch information
dshulyak committed Sep 8, 2023
1 parent 430408c commit f8aea0a
Show file tree
Hide file tree
Showing 20 changed files with 525 additions and 93 deletions.
5 changes: 5 additions & 0 deletions common/types/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,8 @@ func ATXIDsToHashes(ids []ATXID) []Hash32 {
}
return hashes
}

type EpochActiveSet struct {
Epoch EpochID
Set []ATXID `scale:"max=1000000"`
}
38 changes: 38 additions & 0 deletions common/types/activation_scale.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func MainnetConfig() Config {
// 1000 - is assumed minimal number of units
// 5000 - half of the expected poet ticks
MinimalActiveSetWeight: 1000 * 5000,
EmitEmptyActiveSet: 20000,
},
HARE: hareConfig.Config{
N: 200,
Expand Down
4 changes: 4 additions & 0 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/activesets"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
Expand Down Expand Up @@ -320,6 +321,7 @@ const (
TXDB Hint = "TXDB"
POETDB Hint = "POETDB"
Malfeasance Hint = "malfeasance"
ActiveSet Hint = "activeset"
)

// NewBlobStore returns a BlobStore.
Expand Down Expand Up @@ -369,6 +371,8 @@ func (bs *BlobStore) Get(hint Hint, key []byte) ([]byte, error) {
return poets.Get(bs.DB, ref)
case Malfeasance:
return identities.GetMalfeasanceBlob(bs.DB, key)
case ActiveSet:
return activesets.GetBlob(bs.DB, key)
}
return nil, fmt.Errorf("blob store not found %s", hint)
}
Expand Down
3 changes: 3 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ type dataValidators struct {
atx SyncValidator
poet SyncValidator
ballot SyncValidator
activeset SyncValidator
block SyncValidator
proposal SyncValidator
txBlock SyncValidator
Expand All @@ -222,6 +223,7 @@ func (f *Fetch) SetValidators(
atx SyncValidator,
poet SyncValidator,
ballot SyncValidator,
activeset SyncValidator,
block SyncValidator,
prop SyncValidator,
txBlock SyncValidator,
Expand All @@ -232,6 +234,7 @@ func (f *Fetch) SetValidators(
atx: atx,
poet: poet,
ballot: ballot,
activeset: activeset,
block: block,
proposal: prop,
txBlock: txBlock,
Expand Down
8 changes: 5 additions & 3 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type testFetch struct {
mMalH *mocks.MockSyncValidator
mAtxH *mocks.MockSyncValidator
mBallotH *mocks.MockSyncValidator
mActiveSetH *mocks.MockSyncValidator
mBlocksH *mocks.MockSyncValidator
mProposalH *mocks.MockSyncValidator
method int
Expand All @@ -59,6 +60,7 @@ func createFetch(tb testing.TB) *testFetch {
mMalH: mocks.NewMockSyncValidator(ctrl),
mAtxH: mocks.NewMockSyncValidator(ctrl),
mBallotH: mocks.NewMockSyncValidator(ctrl),
mActiveSetH: mocks.NewMockSyncValidator(ctrl),
mBlocksH: mocks.NewMockSyncValidator(ctrl),
mProposalH: mocks.NewMockSyncValidator(ctrl),
mTxBlocksH: mocks.NewMockSyncValidator(ctrl),
Expand Down Expand Up @@ -89,7 +91,7 @@ func createFetch(tb testing.TB) *testFetch {
OpnProtocol: tf.mOpn2S,
}),
withHost(tf.mh))
tf.Fetch.SetValidators(tf.mAtxH, tf.mPoetH, tf.mBallotH, tf.mBlocksH, tf.mProposalH, tf.mTxBlocksH, tf.mTxProposalH, tf.mMalH)
tf.Fetch.SetValidators(tf.mAtxH, tf.mPoetH, tf.mBallotH, tf.mActiveSetH, tf.mBlocksH, tf.mProposalH, tf.mTxBlocksH, tf.mTxProposalH, tf.mMalH)
return tf
}

Expand Down Expand Up @@ -415,7 +417,7 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {

// We set a validatior just for atxs, this validator does not drop connections
vf := ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return pubsub.ErrValidationReject })
fetcher.SetValidators(vf, nil, nil, nil, nil, nil, nil, nil)
fetcher.SetValidators(vf, nil, nil, nil, nil, nil, nil, nil, nil)

// Request an atx by hash
_, err = fetcher.getHash(ctx, types.Hash32{}, datastore.ATXDB, fetcher.validators.atx.HandleMessage)
Expand All @@ -430,7 +432,7 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
}

// Now wrap the atx validator with DropPeerOnValidationReject and set it again
fetcher.SetValidators(ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(vf, h, lg)), nil, nil, nil, nil, nil, nil, nil)
fetcher.SetValidators(ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(vf, h, lg)), nil, nil, nil, nil, nil, nil, nil, nil)

// Request an atx by hash
_, err = fetcher.getHash(ctx, types.Hash32{}, datastore.ATXDB, fetcher.validators.atx.HandleMessage)
Expand Down
6 changes: 6 additions & 0 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (f *Fetch) getHashes(ctx context.Context, hashes []types.Hash32, hint datas
return errors.Join(errs, err)
}

// GetActiveSet downloads activeset.
func (f *Fetch) GetActiveSet(ctx context.Context, set types.Hash32) error {
f.logger.WithContext(ctx).With().Debug("request active set", log.ShortStringer("id", set))
return f.getHashes(ctx, []types.Hash32{set}, datastore.ActiveSet, f.validators.activeset.HandleMessage)
}

// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.
func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error {
if len(ids) == 0 {
Expand Down
13 changes: 13 additions & 0 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,19 @@ func TestGetATXs(t *testing.T) {
require.NoError(t, eg.Wait())
}

func TestGetActiveSet(t *testing.T) {
f := createFetch(t)
f.mActiveSetH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)

stop := make(chan struct{}, 1)
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

require.NoError(t, f.GetActiveSet(context.Background(), types.Hash32{1, 2, 3}))
close(stop)
require.NoError(t, eg.Wait())
}

func TestGetPoetProof(t *testing.T) {
f := createFetch(t)
h := types.RandomHash()
Expand Down
28 changes: 27 additions & 1 deletion miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/activesets"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/layers"
Expand Down Expand Up @@ -65,13 +66,25 @@ type config struct {
layersPerEpoch uint32
hdist uint32
minActiveSetWeight uint64
emitEmptyActiveSet types.LayerID
nodeID types.NodeID
networkDelay time.Duration

// used to determine whether a node has enough information on the active set this epoch
goodAtxPct int
}

func (c *config) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddUint32("layer size", c.layerSize)
encoder.AddUint32("epoch size", c.layersPerEpoch)
encoder.AddUint32("hdist", c.hdist)
encoder.AddUint64("min active weight", c.minActiveSetWeight)
encoder.AddUint32("emit empty set", c.emitEmptyActiveSet.Uint32())
encoder.AddDuration("network delay", c.networkDelay)
encoder.AddInt("good atx percent", c.goodAtxPct)
return nil
}

type defaultFetcher struct {
cdb *datastore.CachedDB
}
Expand Down Expand Up @@ -139,6 +152,12 @@ func WithMinGoodAtxPct(pct int) Opt {
}
}

func WithEmitEmptyActiveSet(lid types.LayerID) Opt {
return func(pb *ProposalBuilder) {
pb.cfg.emitEmptyActiveSet = lid
}
}

func withOracle(o proposalOracle) Opt {
return func(pb *ProposalBuilder) {
pb.proposalOracle = o
Expand Down Expand Up @@ -257,6 +276,12 @@ func (pb *ProposalBuilder) createProposal(
Beacon: beacon,
EligibilityCount: epochEligibility.Slots,
}
if err := activesets.Add(pb.cdb, ib.EpochData.ActiveSetHash, &types.EpochActiveSet{
Epoch: epoch,
Set: epochEligibility.ActiveSet,
}); err != nil && !errors.Is(err, sql.ErrObjectExists) {
return nil, err
}
} else {
pb.logger.With().Debug("creating ballot with reference ballot (no active set)",
log.Context(ctx),
Expand All @@ -277,7 +302,7 @@ func (pb *ProposalBuilder) createProposal(
MeshHash: pb.decideMeshHash(ctx, layerID),
},
}
if p.EpochData != nil {
if p.EpochData != nil && layerID < pb.cfg.emitEmptyActiveSet {
p.ActiveSet = epochEligibility.ActiveSet
}
p.Ballot.Signature = pb.signer.Sign(signing.BALLOT, p.Ballot.SignedBytes())
Expand Down Expand Up @@ -461,6 +486,7 @@ func (pb *ProposalBuilder) handleLayer(ctx context.Context, layerID types.LayerI

func (pb *ProposalBuilder) createProposalLoop(ctx context.Context) {
next := pb.clock.CurrentLayer().Add(1)
pb.logger.With().Info("started", log.Inline(&pb.cfg), log.Uint32("current", next.Uint32()))
for {
select {
case <-pb.ctx.Done():
Expand Down
12 changes: 9 additions & 3 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
pubsubmocks "github.com/spacemeshos/go-spacemesh/p2p/pubsub/mocks"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/activesets"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/layers"
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestBuilder_HandleLayer_MultipleProposals(t *testing.T) {
require.Equal(t, atxID, p.AtxID)
require.Equal(t, p.Layer, layerID)
require.NotNil(t, p.EpochData)
require.Equal(t, activeSet, types.ATXIDList(p.ActiveSet))
require.Empty(t, p.ActiveSet)
require.Equal(t, beacon, p.EpochData.Beacon)
require.Equal(t, ee.Slots, p.EpochData.EligibilityCount)
require.Equal(t, []types.TransactionID{tx1.ID}, p.TxIDs)
Expand Down Expand Up @@ -268,7 +269,8 @@ func TestBuilder_HandleLayer_OneProposal(t *testing.T) {
require.Equal(t, p.Layer, layerID)
require.NotNil(t, p.EpochData)
require.Equal(t, ee.Slots, p.EpochData.EligibilityCount)
require.Equal(t, activeSet, types.ATXIDList(p.ActiveSet))
require.Empty(t, p.ActiveSet)
require.Equal(t, activeSet.Hash(), p.EpochData.ActiveSetHash)
require.Equal(t, beacon, p.EpochData.Beacon)
require.Equal(t, []types.TransactionID{tx.ID}, p.TxIDs)
require.Equal(t, meshHash, p.MeshHash)
Expand Down Expand Up @@ -398,7 +400,11 @@ func TestBuilder_HandleLayer_NoRefBallot(t *testing.T) {
require.NoError(t, codec.Decode(data, &got))
require.Equal(t, types.EmptyBallotID, got.RefBallot)
require.Equal(t, types.EpochData{ActiveSetHash: activeSet.Hash(), Beacon: beacon, EligibilityCount: ee.Slots}, *got.EpochData)
require.Equal(t, activeSet, types.ATXIDList(got.ActiveSet))
require.Empty(t, got.ActiveSet)
saved, err := activesets.Get(b.cdb, activeSet.Hash())
require.NoError(t, err)
require.Equal(t, activeSet, types.ATXIDList(saved.Set))
require.Equal(t, got.Layer.GetEpoch(), saved.Epoch)
require.Equal(t, ee.Slots, got.EpochData.EligibilityCount)
return nil
})
Expand Down
3 changes: 3 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ func (app *App) initServices(ctx context.Context) error {
MaxExceptions: trtlCfg.MaxExceptions,
Hdist: trtlCfg.Hdist,
MinimalActiveSetWeight: trtlCfg.MinimalActiveSetWeight,
AllowEmptyActiveSet: trtlCfg.EmitEmptyActiveSet,
}),
)

Expand Down Expand Up @@ -808,6 +809,7 @@ func (app *App) initServices(ctx context.Context) error {
miner.WithLayerSize(layerSize),
miner.WithLayerPerEpoch(layersPerEpoch),
miner.WithMinimalActiveSetWeight(app.Config.Tortoise.MinimalActiveSetWeight),
miner.WithEmitEmptyActiveSet(app.Config.Tortoise.EmitEmptyActiveSet),
miner.WithHdist(app.Config.Tortoise.Hdist),
miner.WithNetworkDelay(app.Config.HARE.WakeupDelta),
miner.WithMinGoodAtxPct(minerGoodAtxPct),
Expand Down Expand Up @@ -888,6 +890,7 @@ func (app *App) initServices(ctx context.Context) error {
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(atxHandler.HandleSyncedAtx, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(poetDb.ValidateAndStoreMsg, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleSyncedBallot, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleActiveSet, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(blockHandler.HandleSyncedBlock, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleSyncedProposal, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(app.txHandler.HandleBlockTransaction, app.host, lg)),
Expand Down

0 comments on commit f8aea0a

Please sign in to comment.