Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - miner oracle use ref ballot for active set #4876

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
162 changes: 102 additions & 60 deletions miner/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,55 +97,60 @@
return ee, nil
}

func (o *Oracle) activesFromFirstBlock(targetEpoch types.EpochID) (uint64, uint64, []types.ATXID, error) {
func (o *Oracle) activesFromFirstBlock(targetEpoch types.EpochID, ownAtx types.ATXID, ownWeight uint64) (uint64, []types.ATXID, error) {
if ownAtx == types.EmptyATXID || ownWeight == 0 {
o.log.Fatal("invalid miner atx")
}

Check warning on line 103 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L102-L103

Added lines #L102 - L103 were not covered by tests

activeSet, err := ActiveSetFromEpochFirstBlock(o.cdb, targetEpoch)
if err != nil {
return 0, 0, nil, err
return 0, nil, err

Check warning on line 107 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L107

Added line #L107 was not covered by tests
}
own, err := o.cdb.GetEpochAtx(targetEpoch-1, o.cfg.nodeID)
_, totalWeight, own, err := infoFromActiveSet(o.cdb, o.vrfSigner.NodeID(), activeSet)
if err != nil {
return 0, 0, nil, err
return 0, nil, err
}

Check warning on line 112 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L111-L112

Added lines #L111 - L112 were not covered by tests
if own == types.EmptyATXID {
// miner is not included in the active set derived from the epoch's first block
activeSet = append(activeSet, ownAtx)
totalWeight += ownWeight
}
var total uint64
var final []types.ATXID
return totalWeight, activeSet, nil
}

func infoFromActiveSet(cdb *datastore.CachedDB, nodeID types.NodeID, activeSet []types.ATXID) (uint64, uint64, types.ATXID, error) {
var (
total, ownWeight uint64
ownAtx types.ATXID
)
for _, id := range activeSet {
hdr, err := o.cdb.GetAtxHeader(id)
hdr, err := cdb.GetAtxHeader(id)
if err != nil {
return 0, 0, nil, err
return 0, 0, types.EmptyATXID, fmt.Errorf("new ndoe get atx hdr: %w", err)

Check warning on line 129 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L129

Added line #L129 was not covered by tests
}
if id != own.ID {
final = append(final, id)
if hdr.NodeID == nodeID {
ownAtx = id
ownWeight = hdr.GetWeight()
}
total += hdr.GetWeight()
}
// put miner's own ATXID last
final = append(final, own.ID)
o.log.With().Info("active set selected for proposal",
log.Stringer("epoch", targetEpoch),
log.Int("num_atx", len(final)),
)
return own.GetWeight(), total, final, nil
return ownWeight, total, ownAtx, nil
}

func (o *Oracle) activeSet(targetEpoch types.EpochID) (uint64, uint64, []types.ATXID, error) {
if !o.syncState.SyncedBefore(targetEpoch - 1) {
// if the node is not synced prior to `targetEpoch-1`, it doesn't have the correct receipt timestamp
// for all the atx and malfeasance proof, and cannot use atx grading for active set.
o.log.With().Info("node not synced before prior epoch, getting active set from first block",
log.Stringer("epoch", targetEpoch),
)
return o.activesFromFirstBlock(targetEpoch)
}

func (o *Oracle) activeSet(targetEpoch types.EpochID) (uint64, uint64, types.ATXID, []types.ATXID, error) {
var (
minerWeight, totalWeight uint64
minerID types.ATXID
atxids []types.ATXID
ownWeight, totalWeight uint64
ownAtx types.ATXID
atxids []types.ATXID
)

epochStart := o.clock.LayerToTime(targetEpoch.FirstLayer())
numOmitted := 0
if err := o.cdb.IterateEpochATXHeaders(targetEpoch, func(header *types.ActivationTxHeader) error {
if header.NodeID == o.cfg.nodeID {
ownWeight = header.GetWeight()
ownAtx = header.ID
}
grade, err := GradeAtx(o.cdb, header.NodeID, header.Received, epochStart, o.cfg.networkDelay)
if err != nil {
return err
Expand All @@ -155,73 +160,110 @@
header.ID,
log.Int("grade", int(grade)),
log.Stringer("smesher", header.NodeID),
log.Bool("own", header.NodeID == o.cfg.nodeID),
log.Time("received", header.Received),
log.Time("epoch_start", epochStart),
)
numOmitted++
return nil
}
totalWeight += header.GetWeight()
if header.NodeID == o.cfg.nodeID {
minerWeight = header.GetWeight()
minerID = header.ID
} else {
atxids = append(atxids, header.ID)
}
atxids = append(atxids, header.ID)
return nil
}); err != nil {
return 0, 0, nil, err
return 0, 0, types.EmptyATXID, nil, err

Check warning on line 174 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L174

Added line #L174 was not covered by tests
}
// put miner's own ATXID last
if minerID != types.EmptyATXID {
atxids = append(atxids, minerID)

if ownAtx == types.EmptyATXID || ownWeight == 0 {
return 0, 0, types.EmptyATXID, nil, errMinerHasNoATXInPreviousEpoch
}
o.log.With().Info("active set selected for proposal",
log.Int("num_atx", len(atxids)),
log.Int("num_omitted", numOmitted),
)
return minerWeight, totalWeight, atxids, nil

if total := numOmitted + len(atxids); total == 0 {
return 0, 0, types.EmptyATXID, nil, errEmptyActiveSet

Check warning on line 182 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L182

Added line #L182 was not covered by tests
} else if numOmitted*100/total > 100-o.cfg.syncedPct {
// if the node is not synced during `targetEpoch-1`, it doesn't have the correct receipt timestamp
// for all the atx and malfeasance proof. this active set is not usable.
// TODO: change after timing info of ATXs and malfeasance proofs is sync'ed from peers as well
var err error
totalWeight, atxids, err = o.activesFromFirstBlock(targetEpoch, ownAtx, ownWeight)
if err != nil {
return 0, 0, types.EmptyATXID, nil, err
}

Check warning on line 191 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L190-L191

Added lines #L190 - L191 were not covered by tests
o.log.With().Info("miner not synced during prior epoch, active set from first block",
log.Int("num atx", len(atxids)),
log.Int("num omitted", numOmitted),
log.Int("num block atx", len(atxids)),
)
} else {
o.log.With().Info("active set selected for proposal",
log.Int("num atx", len(atxids)),
log.Int("num omitted", numOmitted),
)
}
return ownWeight, totalWeight, ownAtx, atxids, nil
}

func refBallot(db sql.Executor, epoch types.EpochID, nodeID types.NodeID) (*types.Ballot, error) {
ref, err := ballots.GetRefBallot(db, epoch, nodeID)
if errors.Is(err, sql.ErrNotFound) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get refballot: %w", err)
}

Check warning on line 213 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L212-L213

Added lines #L212 - L213 were not covered by tests
ballot, err := ballots.Get(db, ref)
if err != nil {
return nil, fmt.Errorf("failed to get ballot: %w", err)
}

Check warning on line 217 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L216-L217

Added lines #L216 - L217 were not covered by tests
return ballot, nil
}

// calcEligibilityProofs calculates the eligibility proofs of proposals for the miner in the given epoch
// and returns the proofs along with the epoch's active set.
func (o *Oracle) calcEligibilityProofs(lid types.LayerID, epoch types.EpochID, beacon types.Beacon, nonce types.VRFPostIndex) (*EpochEligibility, error) {
// get the previous epoch's total weight
minerWeight, totalWeight, activeSet, err := o.activeSet(epoch)
ref, err := refBallot(o.cdb, epoch, o.vrfSigner.NodeID())
if err != nil {
return nil, fmt.Errorf("failed to get epoch %v weight: %w", epoch, err)
return nil, err
}

Check warning on line 227 in miner/oracle.go

View check run for this annotation

Codecov / codecov/patch

miner/oracle.go#L226-L227

Added lines #L226 - L227 were not covered by tests

var (
minerWeight, totalWeight uint64
ownAtx types.ATXID
activeSet []types.ATXID
)
if ref == nil {
minerWeight, totalWeight, ownAtx, activeSet, err = o.activeSet(epoch)
} else {
activeSet = ref.ActiveSet
minerWeight, totalWeight, ownAtx, err = infoFromActiveSet(o.cdb, o.vrfSigner.NodeID(), activeSet)
o.log.With().Info("use active set from ref ballot",
ref.ID(),
log.Int("num atx", len(activeSet)),
)
}
if minerWeight == 0 {
return nil, errMinerHasNoATXInPreviousEpoch
if err != nil {
return nil, err
}
if totalWeight == 0 {
return nil, errZeroEpochWeight
}
if len(activeSet) == 0 {
return nil, errEmptyActiveSet
}
ownAtx := activeSet[len(activeSet)-1]
o.log.With().Debug("calculating eligibility",
epoch,
beacon,
log.Uint64("weight", minerWeight),
log.Uint64("total weight", totalWeight),
)
var numEligibleSlots uint32
ref, err := ballots.GetRefBallot(o.cdb, epoch, o.vrfSigner.NodeID())
if errors.Is(err, sql.ErrNotFound) {
if ref == nil {
numEligibleSlots, err = proposals.GetLegacyNumEligible(lid, minerWeight, o.cfg.minActiveSetWeight, totalWeight, o.cfg.layerSize, o.cfg.layersPerEpoch)
if err != nil {
return nil, fmt.Errorf("oracle get num slots: %w", err)
}
} else if err != nil {
return nil, fmt.Errorf("failed to get refballot: %w", err)
} else {
ballot, err := ballots.Get(o.cdb, ref)
if err != nil {
return nil, fmt.Errorf("failed to get ballot: %w", err)
}
numEligibleSlots = ballot.EpochData.EligibilityCount
numEligibleSlots = ref.EpochData.EligibilityCount
}

eligibilityProofs := map[types.LayerID][]types.VotingEligibility{}
Expand Down
16 changes: 6 additions & 10 deletions miner/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func testMinerOracleAndProposalValidator(t *testing.T, layerSize, layersPerEpoch
endLayer := types.LayerID(numberOfEpochsToTest * layersPerEpoch).Add(startLayer)
counterValuesSeen := map[uint32]int{}
epochStart := time.Now()
o.mSync.EXPECT().SyncedBefore(gomock.Any()).Return(true).AnyTimes()
o.mClock.EXPECT().LayerToTime(gomock.Any()).Return(epochStart).AnyTimes()
received := epochStart.Add(-5 * networkDelay)
epochInfo := genATXForTargetEpochs(t, o.cdb, types.EpochID(startEpoch), types.EpochID(startEpoch+numberOfEpochsToTest), o.edSigner, layersPerEpoch, received)
Expand Down Expand Up @@ -233,7 +232,6 @@ func TestOracle_OwnATXNotFound(t *testing.T) {
layersPerEpoch := uint32(20)
o := createTestOracle(t, avgLayerSize, layersPerEpoch, 0)
lid := types.LayerID(layersPerEpoch * 3)
o.mSync.EXPECT().SyncedBefore(types.EpochID(2)).Return(true)
o.mClock.EXPECT().LayerToTime(lid).Return(time.Now())
ee, err := o.ProposalEligibility(lid, types.RandomBeacon(), types.VRFPostIndex(1))
require.ErrorIs(t, err, errMinerHasNoATXInPreviousEpoch)
Expand All @@ -250,7 +248,6 @@ func TestOracle_EligibilityCached(t *testing.T) {
info, ok := epochInfo[lid.GetEpoch()]
require.True(t, ok)
o.mClock.EXPECT().LayerToTime(lid).Return(received.Add(time.Hour)).AnyTimes()
o.mSync.EXPECT().SyncedBefore(types.EpochID(2)).Return(true).AnyTimes()
ee1, err := o.ProposalEligibility(lid, info.beacon, types.VRFPostIndex(1))
require.NoError(t, err)
require.NotNil(t, ee1)
Expand All @@ -273,7 +270,6 @@ func TestOracle_MinimalActiveSetWeight(t *testing.T) {
info, ok := epochInfo[lid.GetEpoch()]
require.True(t, ok)

o.mSync.EXPECT().SyncedBefore(types.EpochID(2)).Return(true).AnyTimes()
o.mClock.EXPECT().LayerToTime(lid).Return(received.Add(time.Hour)).AnyTimes()
ee1, err := o.ProposalEligibility(lid, info.beacon, types.VRFPostIndex(1))
require.NoError(t, err)
Expand All @@ -294,7 +290,6 @@ func TestOracle_ATXGrade(t *testing.T) {
o := createTestOracle(t, avgLayerSize, layersPerEpoch, 0)
lid := types.LayerID(layersPerEpoch * 3)
epochStart := time.Now()
o.mSync.EXPECT().SyncedBefore(types.EpochID(2)).Return(true)
o.mClock.EXPECT().LayerToTime(lid).Return(epochStart)

goodTime := epochStart.Add(-4*networkDelay - time.Nanosecond)
Expand Down Expand Up @@ -362,7 +357,7 @@ func createBallots(tb testing.TB, cdb *datastore.CachedDB, lid types.LayerID, nu
return result
}

func TestOracle_NotSyncedBeforeLastEpoch(t *testing.T) {
func TestOracle_NewNode(t *testing.T) {
for _, tc := range []struct {
desc string
ownAtxInBlock bool
Expand All @@ -380,8 +375,9 @@ func TestOracle_NotSyncedBeforeLastEpoch(t *testing.T) {
avgLayerSize := uint32(10)
lyrsPerEpoch := uint32(20)
o := createTestOracle(t, avgLayerSize, lyrsPerEpoch, 0)
o.cfg.syncedPct = 90
lid := types.LayerID(lyrsPerEpoch * 3)

o.mClock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
common := types.RandomActiveSet(100)
blts := createBallots(t, o.cdb, lid, 20, common)
expected := common
Expand Down Expand Up @@ -418,7 +414,6 @@ func TestOracle_NotSyncedBeforeLastEpoch(t *testing.T) {
expected = append(expected, ownAtx)
}

o.mSync.EXPECT().SyncedBefore(types.EpochID(2)).Return(false)
ee, err := o.ProposalEligibility(lid, types.RandomBeacon(), types.VRFPostIndex(1))
require.NoError(t, err)
require.NotNil(t, ee)
Expand All @@ -432,8 +427,6 @@ func TestRefBallot(t *testing.T) {
avgLayerSize := uint32(10)
lyrsPerEpoch := uint32(20)
o := createTestOracle(t, avgLayerSize, lyrsPerEpoch, 0)
o.mSync.EXPECT().SyncedBefore(gomock.Any()).Return(true)
o.mClock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())

layer := types.LayerID(100)

Expand All @@ -457,8 +450,11 @@ func TestRefBallot(t *testing.T) {
ballot.SetID(types.BallotID{1})
require.NoError(t, ballots.Add(o.cdb, &ballot))

genATXForTargetEpochs(t, o.cdb, layer.GetEpoch(), layer.GetEpoch()+1, o.edSigner, layersPerEpoch, time.Now().Add(-1*time.Hour))
ee, err := o.calcEligibilityProofs(layer, layer.GetEpoch(), types.Beacon{}, types.VRFPostIndex(101))
require.NoError(t, err)
require.NotEmpty(t, ee)
require.Equal(t, 1, int(ee.Slots))
require.Equal(t, atx.ID(), ee.Atx)
require.ElementsMatch(t, ballot.ActiveSet, ee.ActiveSet)
}
4 changes: 4 additions & 0 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type config struct {
minActiveSetWeight uint64
nodeID types.NodeID
networkDelay time.Duration

// used to determine whether a node has enough information on the active set this epoch
syncedPct int
}

type defaultFetcher struct {
Expand Down Expand Up @@ -173,6 +176,7 @@ func NewProposalBuilder(
for _, opt := range opts {
opt(pb)
}
pb.cfg.syncedPct = 90
if pb.proposalOracle == nil {
pb.proposalOracle = newMinerOracle(pb.cfg, clock, cdb, vrfSigner, syncer, pb.logger)
}
Expand Down
5 changes: 0 additions & 5 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,6 @@ func (s *Syncer) IsSynced(ctx context.Context) bool {
return s.getSyncState() == synced
}

// SyncedBefore returns true if the node became synced before `epoch` starts.
func (s *Syncer) SyncedBefore(epoch types.EpochID) bool {
return s.getSyncState() == synced && s.getTargetSyncedLayer() < epoch.FirstLayer()
}

func (s *Syncer) IsBeaconSynced(epoch types.EpochID) bool {
_, err := s.beacon.GetBeacon(epoch)
return err == nil
Expand Down
14 changes: 0 additions & 14 deletions system/mocks/sync.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion system/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ import (
type SyncStateProvider interface {
IsSynced(context.Context) bool
IsBeaconSynced(types.EpochID) bool
SyncedBefore(types.EpochID) bool
}