Skip to content

Commit

Permalink
Try #4826:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Aug 12, 2023
2 parents fd9046c + df45cd7 commit c347ad4
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 95 deletions.
33 changes: 0 additions & 33 deletions common/types/missing.go

This file was deleted.

15 changes: 14 additions & 1 deletion mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Mesh struct {
conState conservativeState
trtl system.Tortoise

missingBlocks chan []types.BlockID

mu sync.Mutex
// latestLayer is the latest layer this node had seen from blocks
latestLayer atomic.Value
Expand All @@ -64,6 +66,7 @@ func NewMesh(cdb *datastore.CachedDB, c layerClock, trtl system.Tortoise, exec *
executor: exec,
conState: state,
nextProcessedLayers: make(map[types.LayerID]struct{}),
missingBlocks: make(chan []types.BlockID, 32),
}
msh.latestLayer.Store(types.LayerID(0))
msh.latestLayerInState.Store(types.LayerID(0))
Expand Down Expand Up @@ -131,6 +134,12 @@ func (msh *Mesh) LatestLayerInState() types.LayerID {
return msh.latestLayerInState.Load().(types.LayerID)
}

// MissingBlocks returns single consumer channel.
// Consumer by contract is responsible for downloading missing blocks.
func (msh *Mesh) MissingBlocks() <-chan []types.BlockID {
return msh.missingBlocks
}

// LatestLayer - returns the latest layer we saw from the network.
func (msh *Mesh) LatestLayer() types.LayerID {
return msh.latestLayer.Load().(types.LayerID)
Expand Down Expand Up @@ -308,7 +317,11 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error {
)
}
if missing := missingBlocks(results); len(missing) > 0 {
return &types.ErrorMissing{MissingData: types.MissingData{Blocks: missing}}
select {
case <-ctx.Done():
case msh.missingBlocks <- missing:
}
return fmt.Errorf("request missing blocks %v", missing)
}
if err := msh.ensureStateConsistent(ctx, results); err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
timeCfg "github.com/spacemeshos/go-spacemesh/timesync/config"
Expand Down Expand Up @@ -704,12 +705,16 @@ func (app *App) initServices(ctx context.Context) error {
blocks.WithCertifierLogger(app.addLogger(BlockCertLogger, lg)),
)

flog := app.addLogger(Fetcher, lg)
fetcher := fetch.NewFetch(app.cachedDB, msh, beaconProtocol, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(app.addLogger(Fetcher, lg)),
fetch.WithLogger(flog),
)
fetcherWrapped.Fetcher = fetcher
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

patrol := layerpatrol.New()
syncerConf := app.Config.Sync
Expand Down
3 changes: 3 additions & 0 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func collectHashes(a any) []types.Hash32 {
if b.RefBallot != types.EmptyBallotID {
hashes = append(hashes, b.RefBallot.AsHash32())
}
for _, header := range b.Votes.Support {
hashes = append(hashes, header.ID.AsHash32())
}
return hashes
}
log.Fatal("unexpected type")
Expand Down
3 changes: 3 additions & 0 deletions proposals/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,9 @@ func TestCollectHashes(t *testing.T) {
b := p.Ballot
expected := []types.Hash32{b.RefBallot.AsHash32()}
expected = append(expected, b.Votes.Base.AsHash32())
for _, header := range b.Votes.Support {
expected = append(expected, header.ID.AsHash32())
}
require.ElementsMatch(t, expected, collectHashes(b))

expected = append(expected, types.TransactionIDsToHashes(p.TxIDs)...)
Expand Down
49 changes: 49 additions & 0 deletions syncer/blockssync/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package blockssync

import (
"context"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type blockFetcher interface {
GetBlocks(context.Context, []types.BlockID) error
}

// Sync requests last specified blocks in background.
func Sync(ctx context.Context, logger *zap.Logger, requests <-chan []types.BlockID, fetcher blockFetcher) error {
var (
eg errgroup.Group
lastch = make(chan []types.BlockID)
)
eg.Go(func() error {
var (
send chan []types.BlockID
last []types.BlockID
)
for {
select {
case <-ctx.Done():
close(lastch)
return ctx.Err()
case req := <-requests:
if last == nil {
send = lastch
}
last = req
case send <- last:
last = nil
send = nil
}
}
})
for batch := range lastch {
if err := fetcher.GetBlocks(ctx, batch); err != nil {
logger.Warn("failed to fetch blocks", zap.Error(err))
}
}
return eg.Wait()
}
7 changes: 0 additions & 7 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ var (
[]string{},
).WithLabelValues()

blockRequested = metrics.NewCounter(
"block_requested",
namespace,
"number of missing block requested",
[]string{},
).WithLabelValues()

syncedLayer = metrics.NewGauge(
"layer",
namespace,
Expand Down
24 changes: 1 addition & 23 deletions syncer/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {
}
// even if it fails to fetch opinions, we still go ahead to ProcessLayer so that the tortoise
// has a chance to count ballots and form its own opinions
if err := s.processWithRetry(ctx, lid); err != nil {
if err := s.mesh.ProcessLayer(ctx, lid); err != nil {
s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err))
}
}
Expand All @@ -116,28 +116,6 @@ func (s *Syncer) processLayers(ctx context.Context) error {
return nil
}

func (s *Syncer) processWithRetry(ctx context.Context, lid types.LayerID) error {
for {
origerr := s.mesh.ProcessLayer(ctx, lid)
if origerr == nil {
return nil
}
var missing *types.ErrorMissing
if !errors.As(origerr, &missing) {
return origerr
}
s.logger.With().Debug("requesting missing blocks",
log.Context(ctx),
log.Inline(missing),
)
err := s.dataFetcher.GetBlocks(ctx, missing.Blocks)
if err != nil {
return fmt.Errorf("%w: %s", origerr, err)
}
blockRequested.Add(float64(len(missing.Blocks)))
}
}

func (s *Syncer) needCert(ctx context.Context, lid types.LayerID) (bool, error) {
cutoff := s.certCutoffLayer()
if !lid.After(cutoff) {
Expand Down
30 changes: 0 additions & 30 deletions syncer/state_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,33 +429,3 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) {
ts.mForkFinder.EXPECT().Purge(true)
require.NoError(t, ts.syncer.processLayers(context.Background()))
}

func TestProcessLayers_SucceedOnRetry(t *testing.T) {
ts := newSyncerWithoutSyncTimer(t)
ts.syncer.setATXSynced()
current := types.GetEffectiveGenesis().Add(1)
ts.mTicker.advanceToLayer(current)
ts.syncer.setLastSyncedLayer(current)

ts.mLyrPatrol.EXPECT().IsHareInCharge(gomock.Any()).Return(false).AnyTimes()
ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), gomock.Any()).AnyTimes()
ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), gomock.Any()).AnyTimes()
ts.mVm.EXPECT().Apply(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
ts.mVm.EXPECT().GetStateRoot().AnyTimes()
ts.mConState.EXPECT().UpdateCache(gomock.Any(), gomock.Any(), gomock.Any(), nil, nil).AnyTimes()

missing := fixture.RLayers(fixture.RLayer(current,
fixture.RBlock(types.BlockID{1}, fixture.Hare()),
fixture.RBlock(types.BlockID{2}, fixture.Valid()),
))
ts.mTortoise.EXPECT().Updates().Return(missing)
ts.mTortoise.EXPECT().Updates().Return(nil)
ts.mTortoise.EXPECT().Results(gomock.Any(), gomock.Any()).Return(missing, nil)
ts.mDataFetcher.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, got []types.BlockID) error {
missing[0].Blocks[0].Data = true
missing[0].Blocks[1].Data = true
return nil
})
require.NoError(t, ts.syncer.processLayers(context.Background()))
}

0 comments on commit c347ad4

Please sign in to comment.