Skip to content

Commit

Permalink
sync: require stated synced for node to be synced (#4934)
Browse files Browse the repository at this point in the history
## Motivation
Closes #4931

## Changes
- make sure state is synced before declaring node is synced
- reduce gossip sync from 10 min to 2 min
- do not print confusing log "failed to build proposal" when node is not synced

## Test
- manually synced with mainnet

## TODO
- [ ] Update [changelog](../CHANGELOG.md)
  • Loading branch information
countvonzero committed Aug 31, 2023
1 parent b024b41 commit 67e3efc
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 141 deletions.
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func MainnetConfig() Config {
MaxStaleDuration: time.Hour,
UseNewProtocol: true,
Standalone: false,
GossipDuration: 50 * time.Second,
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func fastnet() config.Config {
conf.LayerAvgSize = 50
conf.LayerDuration = 15 * time.Second
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.LayersPerEpoch = 4

conf.Tortoise.Hdist = 4
Expand Down
27 changes: 17 additions & 10 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,17 @@ func (pb *ProposalBuilder) handleLayer(ctx context.Context, layerID types.LayerI
if !pb.syncer.IsSynced(ctx) {
return errNotSynced
}

// make sure the miner is eligible first
nonce, err := pb.nonceFetcher.VRFNonce(pb.signer.NodeID(), layerID.GetEpoch())
if err != nil {
if errors.Is(err, sql.ErrNotFound) {
pb.logger.WithContext(ctx).With().Info("miner has no valid vrf nonce, not building proposal", layerID)
return nil
}
return err
}

if beacon, err = pb.beaconProvider.GetBeacon(epoch); err != nil {
return errNoBeacon
}
Expand All @@ -386,14 +397,6 @@ func (pb *ProposalBuilder) handleLayer(ctx context.Context, layerID types.LayerI
return errDuplicateLayer
}

nonce, err := pb.nonceFetcher.VRFNonce(pb.signer.NodeID(), layerID.GetEpoch())
if err != nil {
if errors.Is(err, sql.ErrNotFound) {
pb.logger.WithContext(ctx).With().Info("miner has no valid vrf nonce, not building proposal", layerID)
return nil
}
return err
}
epochEligibility, err := pb.proposalOracle.ProposalEligibility(layerID, beacon, nonce)
if err != nil {
if errors.Is(err, errMinerHasNoATXInPreviousEpoch) {
Expand Down Expand Up @@ -470,8 +473,12 @@ func (pb *ProposalBuilder) createProposalLoop(ctx context.Context) {
}
next = current.Add(1)
lyrCtx := log.WithNewSessionID(ctx)
if err := pb.handleLayer(lyrCtx, current); err != nil && !errors.Is(err, errGenesis) {
pb.logger.WithContext(lyrCtx).With().Warning("failed to build proposal", current, log.Err(err))
if err := pb.handleLayer(lyrCtx, current); err != nil {
switch {
case errors.Is(err, errGenesis), errors.Is(err, errNotSynced):
default:
pb.logger.WithContext(lyrCtx).With().Warning("failed to build proposal", current, log.Err(err))
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func TestBuilder_HandleLayer_NoBeacon(t *testing.T) {

layerID := types.LayerID(layersPerEpoch * 3)
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true)
b.mNonce.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(22), nil)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(types.EmptyBeacon, errors.New("unknown"))

require.ErrorIs(t, b.handleLayer(context.Background(), layerID), errNoBeacon)
Expand Down Expand Up @@ -661,6 +662,7 @@ func TestBuilder_HandleLayer_Duplicate(t *testing.T) {
ballot := types.NewExistingBallot(types.BallotID{1}, types.EmptyEdSignature, b.signer.NodeID(), layerID)
require.NoError(t, ballots.Add(b.cdb, &ballot))
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true)
b.mNonce.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(22), nil)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil)
require.ErrorIs(t, b.handleLayer(context.Background(), layerID), errDuplicateLayer)
}
Expand Down
4 changes: 2 additions & 2 deletions syncer/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers
}

func (d *DataFetch) receiveMaliciousIDs(ctx context.Context, req *maliciousIDRequest, peer p2p.Peer, data []byte, peerErr error) {
logger := d.logger.WithContext(ctx).WithFields(req.lid, log.Stringer("peer", peer))
logger.Debug("received layer data from peer")
logger := d.logger.WithContext(ctx).WithFields(log.Stringer("peer", peer))
logger.Debug("received malicious id from peer")
var (
result = peerResult[fetch.MaliciousIDs]{peer: peer, err: peerErr}
malIDs fetch.MaliciousIDs
Expand Down
10 changes: 7 additions & 3 deletions syncer/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func minLayer(a, b types.LayerID) types.LayerID {

func (s *Syncer) stateSynced() bool {
current := s.ticker.CurrentLayer()
return current.Uint32() <= 1 || !s.mesh.ProcessedLayer().Before(current.Sub(1))
return current <= types.GetEffectiveGenesis() ||
(s.mesh.ProcessedLayer() >= current-1 && !s.stateErr.Load())
}

func (s *Syncer) processLayers(ctx context.Context) error {
Expand All @@ -62,7 +63,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {

// used to make sure we only resync from the same peer once during each run.
resyncPeers := make(map[p2p.Peer]struct{})
for lid := start; !lid.After(s.getLastSyncedLayer()); lid = lid.Add(1) {
for lid := start; lid <= s.getLastSyncedLayer(); lid++ {
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -90,7 +91,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {
s.logger.WithContext(ctx).With().Warning("failed to adopt peer opinions", lid, log.Err(err))
}
}
if s.stateSynced() {
if s.IsSynced(ctx) {
if err = s.checkMeshAgreement(ctx, lid, opinions); err != nil && errors.Is(err, errMeshHashDiverged) {
s.logger.WithContext(ctx).With().Debug("mesh hash diverged, trying to reach agreement",
lid,
Expand All @@ -114,6 +115,9 @@ func (s *Syncer) processLayers(ctx context.Context) error {
if !errors.Is(err, mesh.ErrMissingBlock) {
s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err))
}
s.stateErr.Store(true)
} else {
s.stateErr.Store(false)
}
}
s.logger.WithContext(ctx).With().Debug("end of state sync",
Expand Down
6 changes: 1 addition & 5 deletions syncer/state_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func TestProcessLayers_OpinionsOptional(t *testing.T) {
func TestProcessLayers_MeshHashDiverged(t *testing.T) {
ts := newTestSyncerForState(t)
ts.syncer.setATXSynced()
ts.syncer.setSyncState(context.Background(), synced)
current := types.GetEffectiveGenesis().Add(131)
ts.mTicker.advanceToLayer(current)
for lid := types.GetEffectiveGenesis().Add(1); lid.Before(current); lid = lid.Add(1) {
Expand Down Expand Up @@ -724,10 +725,5 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) {
ts.mVm.EXPECT().GetStateRoot()
}
}
// only the last layer will trigger hash resolution
for i := range opns {
ts.mForkFinder.EXPECT().NeedResync(current.Sub(1), opns[i].PrevAggHash).Return(false)
}
ts.mForkFinder.EXPECT().Purge(true)
require.NoError(t, ts.syncer.processLayers(context.Background()))
}
102 changes: 44 additions & 58 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
MaxStaleDuration time.Duration
Standalone bool
UseNewProtocol bool `mapstructure:"use-new-opn"`
GossipDuration time.Duration
}

// DefaultConfig for the syncer.
Expand All @@ -39,12 +40,12 @@ func DefaultConfig() Config {
SyncCertDistance: 10,
MaxStaleDuration: time.Second,
UseNewProtocol: true,
GossipDuration: 15 * time.Second,
}
}

const (
outOfSyncThreshold uint32 = 3 // see notSynced
numGossipSyncLayers uint32 = 2 // see gossipSync
outOfSyncThreshold uint32 = 3 // see notSynced
)

type syncState uint32
Expand Down Expand Up @@ -113,25 +114,24 @@ func withForkFinder(f forkFinder) Option {
type Syncer struct {
logger log.Log

cfg Config
cdb *datastore.CachedDB
ticker layerTicker
beacon system.BeaconGetter
mesh *mesh.Mesh
certHandler certHandler
dataFetcher fetchLogic
patrol layerPatrol
forkFinder forkFinder
syncOnce sync.Once
syncState atomic.Value
atxSyncState atomic.Value
isBusy atomic.Value
syncTimer *time.Ticker
validateTimer *time.Ticker
// targetSyncedLayer is used to signal at which layer we can set this node to synced state
targetSyncedLayer atomic.Value
lastLayerSynced atomic.Value
lastEpochSynced atomic.Value
cfg Config
cdb *datastore.CachedDB
ticker layerTicker
beacon system.BeaconGetter
mesh *mesh.Mesh
certHandler certHandler
dataFetcher fetchLogic
patrol layerPatrol
forkFinder forkFinder
syncOnce sync.Once
syncState atomic.Value
atxSyncState atomic.Value
isBusy atomic.Bool
// syncedTargetTime is used to signal at which time we can set this node to synced state
syncedTargetTime atomic.Time
lastLayerSynced atomic.Uint32
lastEpochSynced atomic.Uint32
stateErr atomic.Bool

// awaitATXSyncedCh is the list of subscribers' channels to notify when this node enters ATX synced state
awaitATXSyncedCh chan struct{}
Expand Down Expand Up @@ -167,8 +167,6 @@ func NewSyncer(
opt(s)
}

s.syncTimer = time.NewTicker(s.cfg.Interval)
s.validateTimer = time.NewTicker(s.cfg.Interval)
if s.dataFetcher == nil {
s.dataFetcher = NewDataFetch(mesh, fetcher, cdb, cache, s.logger)
}
Expand All @@ -177,17 +175,15 @@ func NewSyncer(
}
s.syncState.Store(notSynced)
s.atxSyncState.Store(notSynced)
s.isBusy.Store(0)
s.targetSyncedLayer.Store(types.LayerID(0))
s.lastLayerSynced.Store(s.mesh.ProcessedLayer())
s.lastEpochSynced.Store(types.GetEffectiveGenesis().GetEpoch() - 1)
s.isBusy.Store(false)
s.syncedTargetTime.Store(time.Time{})
s.lastLayerSynced.Store(s.mesh.LatestLayer().Uint32())
s.lastEpochSynced.Store(types.GetEffectiveGenesis().GetEpoch().Uint32() - 1)
return s
}

// Close stops the syncing process and the goroutines syncer spawns.
func (s *Syncer) Close() {
s.syncTimer.Stop()
s.validateTimer.Stop()
s.stop()
s.logger.With().Info("waiting for syncer goroutines to finish")
err := s.eg.Wait()
Expand Down Expand Up @@ -234,7 +230,7 @@ func (s *Syncer) Start() {
case <-ctx.Done():
s.logger.WithContext(ctx).Info("stopping sync to shutdown")
return fmt.Errorf("shutdown context done: %w", ctx.Err())
case <-s.syncTimer.C:
case <-time.After(s.cfg.Interval):
ok := s.synchronize(ctx)
if ok {
runSuccess.Inc()
Expand All @@ -250,7 +246,7 @@ func (s *Syncer) Start() {
select {
case <-ctx.Done():
return nil
case <-s.validateTimer.C:
case <-time.After(s.cfg.Interval):
if err := s.processLayers(ctx); err != nil {
sRunFail.Inc()
} else {
Expand Down Expand Up @@ -312,43 +308,28 @@ func (s *Syncer) setSyncState(ctx context.Context, newState syncState) {
// setSyncerBusy returns false if the syncer is already running a sync process.
// otherwise it sets syncer to be busy and returns true.
func (s *Syncer) setSyncerBusy() bool {
return s.isBusy.CompareAndSwap(0, 1)
return s.isBusy.CompareAndSwap(false, true)
}

func (s *Syncer) setSyncerIdle() {
s.isBusy.Store(0)
}

// targetSyncedLayer is used to signal at which layer we can set this node to synced state.
func (s *Syncer) setTargetSyncedLayer(ctx context.Context, layerID types.LayerID) {
oldSyncLayer := s.targetSyncedLayer.Swap(layerID).(types.LayerID)
s.logger.WithContext(ctx).With().Debug("target synced layer changed",
log.Uint32("from_layer", oldSyncLayer.Uint32()),
log.Uint32("to_layer", layerID.Uint32()),
log.Stringer("current", s.ticker.CurrentLayer()),
log.Stringer("latest", s.mesh.LatestLayer()),
log.Stringer("processed", s.mesh.ProcessedLayer()))
}

func (s *Syncer) getTargetSyncedLayer() types.LayerID {
return s.targetSyncedLayer.Load().(types.LayerID)
s.isBusy.Store(false)
}

func (s *Syncer) setLastSyncedLayer(lid types.LayerID) {
s.lastLayerSynced.Store(lid)
s.lastLayerSynced.Store(lid.Uint32())
syncedLayer.Set(float64(lid))
}

func (s *Syncer) getLastSyncedLayer() types.LayerID {
return s.lastLayerSynced.Load().(types.LayerID)
return types.LayerID(s.lastLayerSynced.Load())
}

func (s *Syncer) setLastAtxEpoch(epoch types.EpochID) {
s.lastEpochSynced.Store(epoch)
s.lastEpochSynced.Store(epoch.Uint32())
}

func (s *Syncer) lastAtxEpoch() types.EpochID {
return s.lastEpochSynced.Load().(types.EpochID)
return types.EpochID(s.lastEpochSynced.Load())
}

// synchronize sync data up to the currentLayer-1 and wait for the layers to be validated.
Expand Down Expand Up @@ -517,20 +498,25 @@ func (s *Syncer) setStateAfterSync(ctx context.Context, success bool) {
s.setSyncState(ctx, notSynced)
}
case gossipSync:
if !success || !s.dataSynced() {
if !success || !s.dataSynced() || !s.stateSynced() {
// push out the target synced layer
s.setTargetSyncedLayer(ctx, current.Add(numGossipSyncLayers))
s.syncedTargetTime.Store(time.Now().Add(s.cfg.GossipDuration))
s.logger.With().Info("extending gossip sync",
log.Bool("success", success),
log.Bool("data", s.dataSynced()),
log.Bool("state", s.stateSynced()),
)
break
}
// if we have gossip-synced to the target synced layer, we are ready to participate in consensus
if !s.getTargetSyncedLayer().After(current) {
// if we have gossip-synced long enough, we are ready to participate in consensus
if !time.Now().Before(s.syncedTargetTime.Load()) {
s.setSyncState(ctx, synced)
}
case notSynced:
if success && s.dataSynced() {
if success && s.dataSynced() && s.stateSynced() {
// wait till s.ticker.GetCurrentLayer() + numGossipSyncLayers to participate in consensus
s.setSyncState(ctx, gossipSync)
s.setTargetSyncedLayer(ctx, current.Add(numGossipSyncLayers))
s.syncedTargetTime.Store(time.Now().Add(s.cfg.GossipDuration))
}
}
}
Expand Down

0 comments on commit 67e3efc

Please sign in to comment.