Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Avoid races in postVerifier #4965

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## UNRELEASED

### Upgrade information

### Highlights

### Features

### Improvements

* [#4965](https://github.com/spacemeshos/go-spacemesh/pull/4965) Updates to PoST:
* Prevent errors when shutting down the node that can result in a crash
* `postdata_metadata.json` is now updated atomically to prevent corruption of the file.

## v1.1.4

### Upgrade information
Expand Down
33 changes: 26 additions & 7 deletions activation/post_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type verifyPostJob struct {

type OffloadingPostVerifier struct {
eg errgroup.Group
stop context.CancelFunc
stopped <-chan struct{}
log log.Log
workers []*postVerifierWorker
channel chan<- *verifyPostJob
Expand Down Expand Up @@ -71,23 +73,30 @@ func NewOffloadingPostVerifier(verifiers []PostVerifier, logger log.Log) *Offloa
}
logger.With().Info("created post verifier", log.Int("num_workers", numWorkers))

return &OffloadingPostVerifier{
ctx, cancel := context.WithCancel(context.Background())
stopped := make(chan struct{})
v := &OffloadingPostVerifier{
log: logger,
workers: workers,
channel: channel,
stopped: stopped,
stop: func() {
cancel()
select {
case <-stopped:
default:
close(stopped)
}
},
}
}

func (v *OffloadingPostVerifier) Start(ctx context.Context) {
v.log.Info("starting post verifier")
for _, worker := range v.workers {
worker := worker
v.eg.Go(func() error { return worker.start(ctx) })
}
<-ctx.Done()
v.log.Info("stopping post verifier")
v.eg.Wait()
v.log.Info("stopped post verifier")
v.log.Info("started post verifier")
return v
}

func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m *shared.ProofMetadata, opts ...verifying.OptionFunc) error {
Expand All @@ -97,26 +106,36 @@ func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m
opts: opts,
result: make(chan error, 1),
}

select {
case v.channel <- job:
case <-v.stopped:
return fmt.Errorf("verifier is closed")
case <-ctx.Done():
return fmt.Errorf("submitting verifying job: %w", ctx.Err())
}

select {
case res := <-job.result:
return res
case <-v.stopped:
return fmt.Errorf("verifier is closed")
case <-ctx.Done():
return fmt.Errorf("waiting for verification result: %w", ctx.Err())
}
}

func (v *OffloadingPostVerifier) Close() error {
v.log.Info("stopping post verifier")
v.stop()
v.eg.Wait()

for _, worker := range v.workers {
if err := worker.verifier.Close(); err != nil {
return err
}
}
v.log.Info("stopped post verifier")
return nil
}

Expand Down
93 changes: 52 additions & 41 deletions activation/post_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/spacemeshos/post/shared"
"github.com/stretchr/testify/require"
Expand All @@ -26,29 +27,19 @@ func TestOffloadingPostVerifier(t *testing.T) {
[]activation.PostVerifier{verifier},
log.NewDefault(t.Name()),
)
defer offloadingVerifier.Close()
verifier.EXPECT().Close().Return(nil)

var eg errgroup.Group
eg.Go(func() error {
offloadingVerifier.Start(ctx)
return nil
})

{
verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)
}
{
verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(errors.New("invalid proof!"))
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.ErrorContains(t, err, "invalid proof!")
}
verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)

cancel()
require.NoError(t, eg.Wait())
verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(errors.New("invalid proof!"))
err = offloadingVerifier.Verify(ctx, &proof, &metadata)
require.ErrorContains(t, err, "invalid proof!")
}

func TestPostVerfierDetectsInvalidProof(t *testing.T) {
func TestPostVerifierDetectsInvalidProof(t *testing.T) {
verifier, err := activation.NewPostVerifier(activation.PostConfig{}, log.NewDefault(t.Name()))
require.NoError(t, err)
defer verifier.Close()
Expand All @@ -67,26 +58,53 @@ func TestPostVerifierVerifyAfterStop(t *testing.T) {
[]activation.PostVerifier{verifier},
log.NewDefault(t.Name()),
)
defer offloadingVerifier.Close()
verifier.EXPECT().Close().Return(nil)

verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)

// Stop the verifier
verifier.EXPECT().Close().Return(nil)
offloadingVerifier.Close()

err = offloadingVerifier.Verify(ctx, &proof, &metadata)
require.EqualError(t, err, "verifier is closed")
}

func TestPostVerifierNoRaceOnClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proof := shared.Proof{}
metadata := shared.ProofMetadata{}

verifier := activation.NewMockPostVerifier(gomock.NewController(t))
offloadingVerifier := activation.NewOffloadingPostVerifier(
[]activation.PostVerifier{verifier},
log.NewDefault(t.Name()),
)
defer offloadingVerifier.Close()
verifier.EXPECT().Close().AnyTimes().Return(nil)
verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).AnyTimes().Return(nil)

// Stop the verifier
var eg errgroup.Group
eg.Go(func() error {
offloadingVerifier.Start(ctx)
return nil
time.Sleep(50 * time.Millisecond)
return offloadingVerifier.Close()
})

{
verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)
for i := 0; i < 10; i++ {
ms := 10 * i
eg.Go(func() error {
time.Sleep(time.Duration(ms) * time.Millisecond)
return offloadingVerifier.Verify(ctx, &proof, &metadata)
})
}
// Stop the verifier
cancel()
require.NoError(t, eg.Wait())

{
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.ErrorIs(t, err, context.Canceled)
}
require.EqualError(t, eg.Wait(), "verifier is closed")
}

func TestPostVerifierReturnsOnCtxCanceledWhenBlockedVerifying(t *testing.T) {
Expand All @@ -98,15 +116,8 @@ func TestPostVerifierReturnsOnCtxCanceledWhenBlockedVerifying(t *testing.T) {
// empty list of verifiers - no one will verify the proof
}, log.NewDefault(t.Name()))

var eg errgroup.Group
eg.Go(func() error {
v.Start(ctx)
return nil
})
require.NoError(t, v.Close())

cancel()
err := v.Verify(ctx, &shared.Proof{}, &shared.ProofMetadata{})
require.ErrorIs(t, err, context.Canceled)

require.NoError(t, eg.Wait())
require.EqualError(t, err, "verifier is closed")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/spacemeshos/go-scale v1.1.10
github.com/spacemeshos/merkle-tree v0.2.3
github.com/spacemeshos/poet v0.9.1
github.com/spacemeshos/post v0.9.3
github.com/spacemeshos/post v0.9.4
github.com/spf13/afero v1.9.5
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ github.com/spacemeshos/merkle-tree v0.2.3 h1:zGEgOR9nxAzJr0EWjD39QFngwFEOxfxMloE
github.com/spacemeshos/merkle-tree v0.2.3/go.mod h1:VomOcQ5pCBXz7goiWMP5hReyqOfDXGSKbrH2GB9Htww=
github.com/spacemeshos/poet v0.9.1 h1:10wiGjuuGCZq9mAuQjRvNCpE5Uv0KU6yW5eynUCIECU=
github.com/spacemeshos/poet v0.9.1/go.mod h1:ccxzHl5IRSenCSqonPI8M+5vPNwN+o3QU2X41bhbcao=
github.com/spacemeshos/post v0.9.3 h1:3khU8uSxjicOYrfg3tFP26YyTkbsmbJt1uLJN93ScH4=
github.com/spacemeshos/post v0.9.3/go.mod h1:sWxWEfxH4wc4D2KCY0kBMu4ezz/v52Km/N023SaCcNE=
github.com/spacemeshos/post v0.9.4 h1:l/KGneUnLH5imy2Uml7G9kBnJuzl4ag0ku3U0OWaTxc=
github.com/spacemeshos/post v0.9.4/go.mod h1:YjYLMcFFSpxrI86TW2rhyWCWRRFG+8LPiONgARNuaP4=
github.com/spacemeshos/sha256-simd v0.1.0 h1:G7Mfu5RYdQiuE+wu4ZyJ7I0TI74uqLhFnKblEnSpjYI=
github.com/spacemeshos/sha256-simd v0.1.0/go.mod h1:O8CClVIilId7RtuCMV2+YzMj6qjVn75JsxOxaE8vcfM=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down
11 changes: 4 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,13 +1031,6 @@ func (app *App) startServices(ctx context.Context) error {
if err := app.fetcher.Start(); err != nil {
return fmt.Errorf("failed to start fetcher: %w", err)
}
app.eg.Go(func() error {
app.postVerifier.Start(ctx)
// Start only returns when the context expires (which means the system
// is shutting down). We do the closing of the post verifier here so
// it's ensured that start has returned before we call Close.
return app.postVerifier.Close()
})
app.syncer.Start()
app.beaconProtocol.Start(ctx)

Expand Down Expand Up @@ -1210,6 +1203,10 @@ func (app *App) stopServices(ctx context.Context) {
_ = app.atxBuilder.StopSmeshing(false)
}

if app.postVerifier != nil {
app.postVerifier.Close()
}

if app.hare != nil {
app.hare.Close()
}
Expand Down