Skip to content

Commit

Permalink
sync, blocks: request blocks when tortoise crossed local threshold (#…
Browse files Browse the repository at this point in the history
…4826)

closes: #4824

- first improvement is to register peers that supported blocks, otherwise we will be guessing from whom to request the block
- and when mesh is called by sync or block builder it may send request to fetch blocks from registered peers. previously it would do so only in sync code path, which is less robust and might be delayed by fork finder
  • Loading branch information
dshulyak committed Aug 13, 2023
1 parent 85b1ea9 commit 7668357
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 95 deletions.
33 changes: 0 additions & 33 deletions common/types/missing.go

This file was deleted.

15 changes: 14 additions & 1 deletion mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Mesh struct {
conState conservativeState
trtl system.Tortoise

missingBlocks chan []types.BlockID

mu sync.Mutex
// latestLayer is the latest layer this node had seen from blocks
latestLayer atomic.Value
Expand All @@ -64,6 +66,7 @@ func NewMesh(cdb *datastore.CachedDB, c layerClock, trtl system.Tortoise, exec *
executor: exec,
conState: state,
nextProcessedLayers: make(map[types.LayerID]struct{}),
missingBlocks: make(chan []types.BlockID, 32),
}
msh.latestLayer.Store(types.LayerID(0))
msh.latestLayerInState.Store(types.LayerID(0))
Expand Down Expand Up @@ -131,6 +134,12 @@ func (msh *Mesh) LatestLayerInState() types.LayerID {
return msh.latestLayerInState.Load().(types.LayerID)
}

// MissingBlocks returns single consumer channel.
// Consumer by contract is responsible for downloading missing blocks.
func (msh *Mesh) MissingBlocks() <-chan []types.BlockID {
return msh.missingBlocks
}

// LatestLayer - returns the latest layer we saw from the network.
func (msh *Mesh) LatestLayer() types.LayerID {
return msh.latestLayer.Load().(types.LayerID)
Expand Down Expand Up @@ -308,7 +317,11 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error {
)
}
if missing := missingBlocks(results); len(missing) > 0 {
return &types.ErrorMissing{MissingData: types.MissingData{Blocks: missing}}
select {
case <-ctx.Done():
case msh.missingBlocks <- missing:
}
return fmt.Errorf("request missing blocks %v", missing)
}
if err := msh.ensureStateConsistent(ctx, results); err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
timeCfg "github.com/spacemeshos/go-spacemesh/timesync/config"
Expand Down Expand Up @@ -703,12 +704,16 @@ func (app *App) initServices(ctx context.Context) error {
blocks.WithCertifierLogger(app.addLogger(BlockCertLogger, lg)),
)

flog := app.addLogger(Fetcher, lg)
fetcher := fetch.NewFetch(app.cachedDB, msh, beaconProtocol, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(app.addLogger(Fetcher, lg)),
fetch.WithLogger(flog),
)
fetcherWrapped.Fetcher = fetcher
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

patrol := layerpatrol.New()
syncerConf := syncer.Config{
Expand Down
3 changes: 3 additions & 0 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func collectHashes(a any) []types.Hash32 {
if b.RefBallot != types.EmptyBallotID {
hashes = append(hashes, b.RefBallot.AsHash32())
}
for _, header := range b.Votes.Support {
hashes = append(hashes, header.ID.AsHash32())
}
return hashes
}
log.Fatal("unexpected type")
Expand Down
3 changes: 3 additions & 0 deletions proposals/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,9 @@ func TestCollectHashes(t *testing.T) {
b := p.Ballot
expected := []types.Hash32{b.RefBallot.AsHash32()}
expected = append(expected, b.Votes.Base.AsHash32())
for _, header := range b.Votes.Support {
expected = append(expected, header.ID.AsHash32())
}
require.ElementsMatch(t, expected, collectHashes(b))

expected = append(expected, types.TransactionIDsToHashes(p.TxIDs)...)
Expand Down
58 changes: 58 additions & 0 deletions syncer/blockssync/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package blockssync

import (
"context"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
)

//go:generate mockgen -package=blockssync -destination=./mocks.go -source=./blocks.go

type blockFetcher interface {
GetBlocks(context.Context, []types.BlockID) error
}

// Sync requests last specified blocks in background.
func Sync(ctx context.Context, logger *zap.Logger, requests <-chan []types.BlockID, fetcher blockFetcher) error {
var (
eg errgroup.Group
lastch = make(chan map[types.BlockID]struct{})
)
eg.Go(func() error {
var (
send chan map[types.BlockID]struct{}
last map[types.BlockID]struct{}
)
for {
select {
case <-ctx.Done():
close(lastch)
return ctx.Err()
case req := <-requests:
if last == nil {
last = map[types.BlockID]struct{}{}
send = lastch
}
for _, id := range req {
last[id] = struct{}{}
}
case send <- last:
last = nil
send = nil
}
}
})
for batch := range lastch {
blocks := make([]types.BlockID, 0, len(batch))
for id := range batch {
blocks = append(blocks, id)
}
if err := fetcher.GetBlocks(ctx, blocks); err != nil {
logger.Warn("failed to fetch blocks", zap.Error(err))
}
}
return eg.Wait()
}
84 changes: 84 additions & 0 deletions syncer/blockssync/blocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package blockssync

import (
"context"
"errors"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
)

func TestCanBeAggregated(t *testing.T) {
var (
ctrl = gomock.NewController(t)
fetch = NewMockblockFetcher(ctrl)
req = make(chan []types.BlockID, 10)
out = make(chan []types.BlockID, 10)
ctx, cancel = context.WithCancel(context.Background())
eg errgroup.Group
)
t.Cleanup(func() {
cancel()
eg.Wait()
})
eg.Go(func() error {
return Sync(ctx, logtest.New(t).Zap(), req, fetch)
})
fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error {
out <- blocks
return nil
}).AnyTimes()
first := []types.BlockID{{1}, {2}, {3}}
second := []types.BlockID{{2}, {3}, {4}}
req <- first
req <- second
rst1 := timedRead(t, out)
require.Subset(t, rst1, first)
if len(rst1) == len(first) {
require.Subset(t, timedRead(t, out), second)
}
}

func TestErrorDoesntExit(t *testing.T) {
var (
ctrl = gomock.NewController(t)
fetch = NewMockblockFetcher(ctrl)
req = make(chan []types.BlockID, 10)
out = make(chan []types.BlockID, 10)
ctx, cancel = context.WithCancel(context.Background())
eg errgroup.Group
)
t.Cleanup(func() {
cancel()
eg.Wait()
})
eg.Go(func() error {
return Sync(ctx, logtest.New(t).Zap(), req, fetch)
})
fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error {
out <- blocks
return errors.New("test")
}).AnyTimes()
first := []types.BlockID{{1}, {2}, {3}}
req <- first
require.Subset(t, timedRead(t, out), first)
req <- first
require.Subset(t, timedRead(t, out), first)
}

func timedRead(tb testing.TB, blocks chan []types.BlockID) []types.BlockID {
delay := time.Second
select {
case v := <-blocks:
return v
case <-time.After(delay):
require.FailNow(tb, "timed out after", delay)
}
return nil
}
50 changes: 50 additions & 0 deletions syncer/blockssync/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ var (
[]string{},
).WithLabelValues()

blockRequested = metrics.NewCounter(
"block_requested",
namespace,
"number of missing block requested",
[]string{},
).WithLabelValues()

syncedLayer = metrics.NewGauge(
"layer",
namespace,
Expand Down
24 changes: 1 addition & 23 deletions syncer/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {
}
// even if it fails to fetch opinions, we still go ahead to ProcessLayer so that the tortoise
// has a chance to count ballots and form its own opinions
if err := s.processWithRetry(ctx, lid); err != nil {
if err := s.mesh.ProcessLayer(ctx, lid); err != nil {
s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err))
}
}
Expand All @@ -116,28 +116,6 @@ func (s *Syncer) processLayers(ctx context.Context) error {
return nil
}

func (s *Syncer) processWithRetry(ctx context.Context, lid types.LayerID) error {
for {
origerr := s.mesh.ProcessLayer(ctx, lid)
if origerr == nil {
return nil
}
var missing *types.ErrorMissing
if !errors.As(origerr, &missing) {
return origerr
}
s.logger.With().Debug("requesting missing blocks",
log.Context(ctx),
log.Inline(missing),
)
err := s.dataFetcher.GetBlocks(ctx, missing.Blocks)
if err != nil {
return fmt.Errorf("%w: %s", origerr, err)
}
blockRequested.Add(float64(len(missing.Blocks)))
}
}

func (s *Syncer) needCert(ctx context.Context, lid types.LayerID) (bool, error) {
cutoff := s.certCutoffLayer()
if !lid.After(cutoff) {
Expand Down

0 comments on commit 7668357

Please sign in to comment.