Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - fetch: requests got lost when there are no peers #4849

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 20 additions & 1 deletion fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,25 @@
}

peer2batches := f.organizeRequests(requests)
if len(peer2batches) == 0 {
f.mu.Lock()
defer f.mu.Unlock()
for _, msg := range requests {
if req, ok := f.ongoing[msg.Hash]; ok {
f.logger.WithContext(req.ctx).With().Info("delaying hash request",
log.Stringer("hash", msg.Hash),
log.String("hint", string(msg.Hint)),
)
f.unprocessed[msg.Hash] = req
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so GetBlocks will not return immediately with error if no peers, but instead we will do the request when peers are available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. essentially put those requests back to the queue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you prefer to fail all in that case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this approach is fine for me too, hard to tell what is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i prefer returning error right away. it's easier for debugging (and avoiding future bug)

} else {
f.logger.With().Error("ongoing request missing",
log.Stringer("hash", msg.Hash),
log.String("hint", string(msg.Hint)),
)
}

Check warning on line 455 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L441-L455

Added lines #L441 - L455 were not covered by tests
}
return

Check warning on line 457 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L457

Added line #L457 was not covered by tests
}

for peer, peerBatches := range peer2batches {
for _, reqs := range peerBatches {
Expand All @@ -457,7 +476,7 @@
peer2requests := make(map[p2p.Peer][]RequestMessage)
peers := f.host.GetPeers()
if len(peers) == 0 {
f.logger.Info("cannot send fetch: no peers found")
f.logger.Info("cannot send batch: no peers found")

Check warning on line 479 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L479

Added line #L479 was not covered by tests
// in loop() we will try again after the batchTimeout
return nil
}
Expand Down
46 changes: 34 additions & 12 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error {
msh.pendingUpdates.min = types.MinLayer(msh.pendingUpdates.min, results[0].Layer)
msh.pendingUpdates.max = types.MaxLayer(msh.pendingUpdates.max, results[len(results)-1].Layer)
}
if next := msh.LatestLayerInState() + 1; next < msh.pendingUpdates.min {
next := msh.LatestLayerInState() + 1
if next < msh.pendingUpdates.min {
msh.pendingUpdates.min = next
pending = true
}
Expand All @@ -318,34 +319,55 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error {
})),
)
}
if missing := missingBlocks(results); len(missing) > 0 {
applicable, missing := filterMissing(results, next)
if len(missing) > 0 {
select {
case <-ctx.Done():
case msh.missingBlocks <- missing:
}
return fmt.Errorf("%w: request missing blocks %v", ErrMissingBlock, missing)
if len(applicable) == 0 {
return fmt.Errorf("%w: request missing blocks %v", ErrMissingBlock, missing)
}
}
if err := msh.ensureStateConsistent(ctx, results); err != nil {

if err := msh.ensureStateConsistent(ctx, applicable); err != nil {
return err
}
if err := msh.applyResults(ctx, results); err != nil {
if err := msh.applyResults(ctx, applicable); err != nil {
return err
}
msh.pendingUpdates.min = 0
msh.pendingUpdates.max = 0
if len(missing) == 0 {
msh.pendingUpdates.min = 0
msh.pendingUpdates.max = 0
} else if len(applicable) > 0 {
msh.pendingUpdates.min = applicable[len(applicable)-1].Layer + 1
}
return nil
}

func missingBlocks(results []result.Layer) []types.BlockID {
var response []types.BlockID
for _, layer := range results {
func filterMissing(results []result.Layer, next types.LayerID) ([]result.Layer, []types.BlockID) {
var (
missing []types.BlockID
index = -1
)
for i, layer := range results {
for _, block := range layer.Blocks {
if (block.Valid || block.Hare || block.Local) && !block.Data {
response = append(response, block.Header.ID)
missing = append(missing, block.Header.ID)
if index == -1 {
index = i
}
}
}
}
return response
if index >= 0 {
firstMissing := results[index].Layer
if firstMissing <= next {
return nil, missing
}
return results[:index], missing
}
return results, nil
}

func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error {
Expand Down
14 changes: 14 additions & 0 deletions mesh/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,20 @@ func TestProcessLayer(t *testing.T) {
},
},
},
{
"missing but can make progress",
[]call{
{
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())),
rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())),
rlayer(start+2, rblock(idg("3"), fixture.Valid())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe extend it with one more block with data? and one more call that both got applied

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

),
executed: []types.BlockID{idg("1"), idg("2")},
applied: []types.BlockID{idg("1"), idg("2")},
},
},
},
{
"missing valid",
[]call{
Expand Down
2 changes: 1 addition & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func NewSyncer(
}

s.syncTimer = time.NewTicker(s.cfg.Interval)
s.validateTimer = time.NewTicker(s.cfg.Interval * 2)
s.validateTimer = time.NewTicker(s.cfg.Interval)
if s.dataFetcher == nil {
s.dataFetcher = NewDataFetch(mesh, fetcher, cdb, cache, s.logger)
}
Expand Down