Skip to content

Commit

Permalink
Avoid races in postVerifier (#4965)
Browse files Browse the repository at this point in the history
## Motivation
Closes #4910, merge after spacemeshos/post#232.

## Changes
- Removed `Start` method, a verifier returned from `NewOffloadingPostVerifier` is ready to be used
- `Close` cancels ongoing verifications and avoids new ones being started
- Updated `node.go` to use the new API

## Test Plan
- existing tests pass

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Update documentation as needed
- [x] Update [changelog](../CHANGELOG.md) as needed
  • Loading branch information
fasmat committed Sep 7, 2023
1 parent 6433610 commit ea87bbe
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 58 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## UNRELEASED

### Upgrade information

### Highlights

### Features

### Improvements

* [#4965](https://github.com/spacemeshos/go-spacemesh/pull/4965) Updates to PoST:
* Prevent errors when shutting down the node that can result in a crash
* `postdata_metadata.json` is now updated atomically to prevent corruption of the file.

## v1.1.4

### Upgrade information
Expand Down
33 changes: 26 additions & 7 deletions activation/post_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type verifyPostJob struct {

type OffloadingPostVerifier struct {
eg errgroup.Group
stop context.CancelFunc
stopped <-chan struct{}
log log.Log
workers []*postVerifierWorker
channel chan<- *verifyPostJob
Expand Down Expand Up @@ -71,23 +73,30 @@ func NewOffloadingPostVerifier(verifiers []PostVerifier, logger log.Log) *Offloa
}
logger.With().Info("created post verifier", log.Int("num_workers", numWorkers))

return &OffloadingPostVerifier{
ctx, cancel := context.WithCancel(context.Background())
stopped := make(chan struct{})
v := &OffloadingPostVerifier{
log: logger,
workers: workers,
channel: channel,
stopped: stopped,
stop: func() {
cancel()
select {
case <-stopped:
default:
close(stopped)
}
},
}
}

func (v *OffloadingPostVerifier) Start(ctx context.Context) {
v.log.Info("starting post verifier")
for _, worker := range v.workers {
worker := worker
v.eg.Go(func() error { return worker.start(ctx) })
}
<-ctx.Done()
v.log.Info("stopping post verifier")
v.eg.Wait()
v.log.Info("stopped post verifier")
v.log.Info("started post verifier")
return v
}

func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m *shared.ProofMetadata, opts ...verifying.OptionFunc) error {
Expand All @@ -97,26 +106,36 @@ func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m
opts: opts,
result: make(chan error, 1),
}

select {
case v.channel <- job:
case <-v.stopped:
return fmt.Errorf("verifier is closed")
case <-ctx.Done():
return fmt.Errorf("submitting verifying job: %w", ctx.Err())
}

select {
case res := <-job.result:
return res
case <-v.stopped:
return fmt.Errorf("verifier is closed")
case <-ctx.Done():
return fmt.Errorf("waiting for verification result: %w", ctx.Err())
}
}

func (v *OffloadingPostVerifier) Close() error {
v.log.Info("stopping post verifier")
v.stop()
v.eg.Wait()

for _, worker := range v.workers {
if err := worker.verifier.Close(); err != nil {
return err
}
}
v.log.Info("stopped post verifier")
return nil
}

Expand Down
93 changes: 52 additions & 41 deletions activation/post_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/spacemeshos/post/shared"
"github.com/stretchr/testify/require"
Expand All @@ -26,29 +27,19 @@ func TestOffloadingPostVerifier(t *testing.T) {
[]activation.PostVerifier{verifier},
log.NewDefault(t.Name()),
)
defer offloadingVerifier.Close()
verifier.EXPECT().Close().Return(nil)

var eg errgroup.Group
eg.Go(func() error {
offloadingVerifier.Start(ctx)
return nil
})

{
verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)
}
{
verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(errors.New("invalid proof!"))
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.ErrorContains(t, err, "invalid proof!")
}
verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)

cancel()
require.NoError(t, eg.Wait())
verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(errors.New("invalid proof!"))
err = offloadingVerifier.Verify(ctx, &proof, &metadata)
require.ErrorContains(t, err, "invalid proof!")
}

func TestPostVerfierDetectsInvalidProof(t *testing.T) {
func TestPostVerifierDetectsInvalidProof(t *testing.T) {
verifier, err := activation.NewPostVerifier(activation.PostConfig{}, log.NewDefault(t.Name()))
require.NoError(t, err)
defer verifier.Close()
Expand All @@ -67,26 +58,53 @@ func TestPostVerifierVerifyAfterStop(t *testing.T) {
[]activation.PostVerifier{verifier},
log.NewDefault(t.Name()),
)
defer offloadingVerifier.Close()
verifier.EXPECT().Close().Return(nil)

verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)

// Stop the verifier
verifier.EXPECT().Close().Return(nil)
offloadingVerifier.Close()

err = offloadingVerifier.Verify(ctx, &proof, &metadata)
require.EqualError(t, err, "verifier is closed")
}

func TestPostVerifierNoRaceOnClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proof := shared.Proof{}
metadata := shared.ProofMetadata{}

verifier := activation.NewMockPostVerifier(gomock.NewController(t))
offloadingVerifier := activation.NewOffloadingPostVerifier(
[]activation.PostVerifier{verifier},
log.NewDefault(t.Name()),
)
defer offloadingVerifier.Close()
verifier.EXPECT().Close().AnyTimes().Return(nil)
verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).AnyTimes().Return(nil)

// Stop the verifier
var eg errgroup.Group
eg.Go(func() error {
offloadingVerifier.Start(ctx)
return nil
time.Sleep(50 * time.Millisecond)
return offloadingVerifier.Close()
})

{
verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(nil)
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.NoError(t, err)
for i := 0; i < 10; i++ {
ms := 10 * i
eg.Go(func() error {
time.Sleep(time.Duration(ms) * time.Millisecond)
return offloadingVerifier.Verify(ctx, &proof, &metadata)
})
}
// Stop the verifier
cancel()
require.NoError(t, eg.Wait())

{
err := offloadingVerifier.Verify(ctx, &proof, &metadata)
require.ErrorIs(t, err, context.Canceled)
}
require.EqualError(t, eg.Wait(), "verifier is closed")
}

func TestPostVerifierReturnsOnCtxCanceledWhenBlockedVerifying(t *testing.T) {
Expand All @@ -98,15 +116,8 @@ func TestPostVerifierReturnsOnCtxCanceledWhenBlockedVerifying(t *testing.T) {
// empty list of verifiers - no one will verify the proof
}, log.NewDefault(t.Name()))

var eg errgroup.Group
eg.Go(func() error {
v.Start(ctx)
return nil
})
require.NoError(t, v.Close())

cancel()
err := v.Verify(ctx, &shared.Proof{}, &shared.ProofMetadata{})
require.ErrorIs(t, err, context.Canceled)

require.NoError(t, eg.Wait())
require.EqualError(t, err, "verifier is closed")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/spacemeshos/go-scale v1.1.10
github.com/spacemeshos/merkle-tree v0.2.3
github.com/spacemeshos/poet v0.9.1
github.com/spacemeshos/post v0.9.3
github.com/spacemeshos/post v0.9.4
github.com/spf13/afero v1.9.5
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ github.com/spacemeshos/merkle-tree v0.2.3 h1:zGEgOR9nxAzJr0EWjD39QFngwFEOxfxMloE
github.com/spacemeshos/merkle-tree v0.2.3/go.mod h1:VomOcQ5pCBXz7goiWMP5hReyqOfDXGSKbrH2GB9Htww=
github.com/spacemeshos/poet v0.9.1 h1:10wiGjuuGCZq9mAuQjRvNCpE5Uv0KU6yW5eynUCIECU=
github.com/spacemeshos/poet v0.9.1/go.mod h1:ccxzHl5IRSenCSqonPI8M+5vPNwN+o3QU2X41bhbcao=
github.com/spacemeshos/post v0.9.3 h1:3khU8uSxjicOYrfg3tFP26YyTkbsmbJt1uLJN93ScH4=
github.com/spacemeshos/post v0.9.3/go.mod h1:sWxWEfxH4wc4D2KCY0kBMu4ezz/v52Km/N023SaCcNE=
github.com/spacemeshos/post v0.9.4 h1:l/KGneUnLH5imy2Uml7G9kBnJuzl4ag0ku3U0OWaTxc=
github.com/spacemeshos/post v0.9.4/go.mod h1:YjYLMcFFSpxrI86TW2rhyWCWRRFG+8LPiONgARNuaP4=
github.com/spacemeshos/sha256-simd v0.1.0 h1:G7Mfu5RYdQiuE+wu4ZyJ7I0TI74uqLhFnKblEnSpjYI=
github.com/spacemeshos/sha256-simd v0.1.0/go.mod h1:O8CClVIilId7RtuCMV2+YzMj6qjVn75JsxOxaE8vcfM=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down
11 changes: 4 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,13 +1031,6 @@ func (app *App) startServices(ctx context.Context) error {
if err := app.fetcher.Start(); err != nil {
return fmt.Errorf("failed to start fetcher: %w", err)
}
app.eg.Go(func() error {
app.postVerifier.Start(ctx)
// Start only returns when the context expires (which means the system
// is shutting down). We do the closing of the post verifier here so
// it's ensured that start has returned before we call Close.
return app.postVerifier.Close()
})
app.syncer.Start()
app.beaconProtocol.Start(ctx)

Expand Down Expand Up @@ -1210,6 +1203,10 @@ func (app *App) stopServices(ctx context.Context) {
_ = app.atxBuilder.StopSmeshing(false)
}

if app.postVerifier != nil {
app.postVerifier.Close()
}

if app.hare != nil {
app.hare.Close()
}
Expand Down

0 comments on commit ea87bbe

Please sign in to comment.