Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync, blocks: request blocks when tortoise crossed local threshold #4826

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 0 additions & 33 deletions common/types/missing.go

This file was deleted.

15 changes: 14 additions & 1 deletion mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
conState conservativeState
trtl system.Tortoise

missingBlocks chan []types.BlockID

mu sync.Mutex
// latestLayer is the latest layer this node had seen from blocks
latestLayer atomic.Value
Expand All @@ -64,6 +66,7 @@
executor: exec,
conState: state,
nextProcessedLayers: make(map[types.LayerID]struct{}),
missingBlocks: make(chan []types.BlockID, 32),
}
msh.latestLayer.Store(types.LayerID(0))
msh.latestLayerInState.Store(types.LayerID(0))
Expand Down Expand Up @@ -131,6 +134,12 @@
return msh.latestLayerInState.Load().(types.LayerID)
}

// MissingBlocks returns single consumer channel.
// Consumer by contract is responsible for downloading missing blocks.
func (msh *Mesh) MissingBlocks() <-chan []types.BlockID {
return msh.missingBlocks
}

// LatestLayer - returns the latest layer we saw from the network.
func (msh *Mesh) LatestLayer() types.LayerID {
return msh.latestLayer.Load().(types.LayerID)
Expand Down Expand Up @@ -308,7 +317,11 @@
)
}
if missing := missingBlocks(results); len(missing) > 0 {
return &types.ErrorMissing{MissingData: types.MissingData{Blocks: missing}}
select {
case <-ctx.Done():

Check warning on line 321 in mesh/mesh.go

View check run for this annotation

Codecov / codecov/patch

mesh/mesh.go#L321

Added line #L321 was not covered by tests
case msh.missingBlocks <- missing:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this approach still relies on ProcessLayer being triggered by some component.

currently there are two components (proposal builder and state syncer) that calls TallyVotes(), which cause the tortoise to verify a layer. only state syncer will call ProcessLayer and cause the block to be fetched.

is it possible to tie the block fetching with TallyVote instead? tho that may delay generating block proposals.
or maybe TallyVote() should be called with OnBallot() (too expensive?) and cause the blocks to be fetched more frequently?

Copy link
Contributor Author

@dshulyak dshulyak Aug 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is also called by block builder (check ProcessPerHareOutput), i would prefer not to add it to the tortoise. maybe at some point when i remove ProcessLayer alltogether

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to tie the block fetching with TallyVote instead? tho that may delay generating block proposals.
or maybe TallyVote() should be called with OnBallot() (too expensive?) and cause the blocks to be fetched more frequently?

it makes sense to tie with local threshold, it is bigger change though. and this is needed semi-urgently

}
return fmt.Errorf("request missing blocks %v", missing)
}
if err := msh.ensureStateConsistent(ctx, results); err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
timeCfg "github.com/spacemeshos/go-spacemesh/timesync/config"
Expand Down Expand Up @@ -704,12 +705,16 @@ func (app *App) initServices(ctx context.Context) error {
blocks.WithCertifierLogger(app.addLogger(BlockCertLogger, lg)),
)

flog := app.addLogger(Fetcher, lg)
fetcher := fetch.NewFetch(app.cachedDB, msh, beaconProtocol, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(app.addLogger(Fetcher, lg)),
fetch.WithLogger(flog),
)
fetcherWrapped.Fetcher = fetcher
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

patrol := layerpatrol.New()
syncerConf := app.Config.Sync
Expand Down
3 changes: 3 additions & 0 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func collectHashes(a any) []types.Hash32 {
if b.RefBallot != types.EmptyBallotID {
hashes = append(hashes, b.RefBallot.AsHash32())
}
for _, header := range b.Votes.Support {
hashes = append(hashes, header.ID.AsHash32())
}
return hashes
}
log.Fatal("unexpected type")
Expand Down
3 changes: 3 additions & 0 deletions proposals/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,9 @@ func TestCollectHashes(t *testing.T) {
b := p.Ballot
expected := []types.Hash32{b.RefBallot.AsHash32()}
expected = append(expected, b.Votes.Base.AsHash32())
for _, header := range b.Votes.Support {
expected = append(expected, header.ID.AsHash32())
}
require.ElementsMatch(t, expected, collectHashes(b))

expected = append(expected, types.TransactionIDsToHashes(p.TxIDs)...)
Expand Down
58 changes: 58 additions & 0 deletions syncer/blockssync/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package blockssync

import (
"context"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
)

//go:generate mockgen -package=blockssync -destination=./mocks.go -source=./blocks.go

type blockFetcher interface {
GetBlocks(context.Context, []types.BlockID) error
}

// Sync requests last specified blocks in background.
func Sync(ctx context.Context, logger *zap.Logger, requests <-chan []types.BlockID, fetcher blockFetcher) error {
var (
eg errgroup.Group
lastch = make(chan map[types.BlockID]struct{})
)
eg.Go(func() error {
var (
send chan map[types.BlockID]struct{}
last map[types.BlockID]struct{}
)
for {
select {
case <-ctx.Done():
close(lastch)
return ctx.Err()
case req := <-requests:
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
if last == nil {
last = map[types.BlockID]struct{}{}
send = lastch
}
for _, id := range req {
last[id] = struct{}{}
}
case send <- last:
last = nil
send = nil
}
}
})
for batch := range lastch {
blocks := make([]types.BlockID, 0, len(batch))
for id := range batch {
blocks = append(blocks, id)
}
if err := fetcher.GetBlocks(ctx, blocks); err != nil {
logger.Warn("failed to fetch blocks", zap.Error(err))
}
}
return eg.Wait()
}
84 changes: 84 additions & 0 deletions syncer/blockssync/blocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package blockssync

import (
"context"
"errors"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
)

func TestCanBeAggregated(t *testing.T) {
var (
ctrl = gomock.NewController(t)
fetch = NewMockblockFetcher(ctrl)
req = make(chan []types.BlockID, 10)
out = make(chan []types.BlockID, 10)
ctx, cancel = context.WithCancel(context.Background())
eg errgroup.Group
)
t.Cleanup(func() {
cancel()
eg.Wait()
})
eg.Go(func() error {
return Sync(ctx, logtest.New(t).Zap(), req, fetch)
})
fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error {
out <- blocks
return nil
}).AnyTimes()
first := []types.BlockID{{1}, {2}, {3}}
second := []types.BlockID{{2}, {3}, {4}}
req <- first
req <- second
rst1 := timedRead(t, out)
require.Subset(t, rst1, first)
if len(rst1) == len(first) {
require.Subset(t, timedRead(t, out), second)
}
}

func TestErrorDoesntExit(t *testing.T) {
var (
ctrl = gomock.NewController(t)
fetch = NewMockblockFetcher(ctrl)
req = make(chan []types.BlockID, 10)
out = make(chan []types.BlockID, 10)
ctx, cancel = context.WithCancel(context.Background())
eg errgroup.Group
)
t.Cleanup(func() {
cancel()
eg.Wait()
})
eg.Go(func() error {
return Sync(ctx, logtest.New(t).Zap(), req, fetch)
})
fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error {
out <- blocks
return errors.New("test")
}).AnyTimes()
first := []types.BlockID{{1}, {2}, {3}}
req <- first
require.Subset(t, timedRead(t, out), first)
req <- first
require.Subset(t, timedRead(t, out), first)
}

func timedRead(tb testing.TB, blocks chan []types.BlockID) []types.BlockID {
delay := time.Second
select {
case v := <-blocks:
return v
case <-time.After(delay):
require.FailNow(tb, "timed out after", delay)
}
return nil
}
50 changes: 50 additions & 0 deletions syncer/blockssync/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ var (
[]string{},
).WithLabelValues()

blockRequested = metrics.NewCounter(
"block_requested",
namespace,
"number of missing block requested",
[]string{},
).WithLabelValues()

syncedLayer = metrics.NewGauge(
"layer",
namespace,
Expand Down
24 changes: 1 addition & 23 deletions syncer/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {
}
// even if it fails to fetch opinions, we still go ahead to ProcessLayer so that the tortoise
// has a chance to count ballots and form its own opinions
if err := s.processWithRetry(ctx, lid); err != nil {
if err := s.mesh.ProcessLayer(ctx, lid); err != nil {
s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err))
}
}
Expand All @@ -116,28 +116,6 @@ func (s *Syncer) processLayers(ctx context.Context) error {
return nil
}

func (s *Syncer) processWithRetry(ctx context.Context, lid types.LayerID) error {
for {
origerr := s.mesh.ProcessLayer(ctx, lid)
if origerr == nil {
return nil
}
var missing *types.ErrorMissing
if !errors.As(origerr, &missing) {
return origerr
}
s.logger.With().Debug("requesting missing blocks",
log.Context(ctx),
log.Inline(missing),
)
err := s.dataFetcher.GetBlocks(ctx, missing.Blocks)
if err != nil {
return fmt.Errorf("%w: %s", origerr, err)
}
blockRequested.Add(float64(len(missing.Blocks)))
}
}

func (s *Syncer) needCert(ctx context.Context, lid types.LayerID) (bool, error) {
cutoff := s.certCutoffLayer()
if !lid.After(cutoff) {
Expand Down