Skip to content

Commit

Permalink
Try #5109:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Oct 4, 2023
2 parents 81f79f5 + 20b8b3b commit 1b2dceb
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 151 deletions.
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (app *App) initServices(ctx context.Context) error {
trtl, err := tortoise.Recover(
ctx,
app.cachedDB,
app.clock.CurrentLayer(), beaconProtocol, trtlopts...,
app.clock.CurrentLayer(), trtlopts...,
)
if err != nil {
return fmt.Errorf("can't recover tortoise state: %w", err)
Expand Down
38 changes: 38 additions & 0 deletions tortoise/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ func New(opts ...Opt) (*Tortoise, error) {
return t, nil
}

func (t *Tortoise) RecoverFrom(lid types.LayerID, opinion, prev types.Hash32) {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("recover from",
zap.Uint32("lid", lid.Uint32()),
log.ZShortStringer("opinion", opinion),
log.ZShortStringer("prev opinion", prev),
)
t.trtl.evicted = lid - 1
t.trtl.pending = lid
t.trtl.verified = lid
t.trtl.processed = lid
t.trtl.last = lid
layer := t.trtl.layer(lid)
layer.opinion = opinion
layer.prevOpinion = &prev
}

// LatestComplete returns the latest verified layer.
func (t *Tortoise) LatestComplete() types.LayerID {
t.mu.Lock()
Expand Down Expand Up @@ -300,6 +318,21 @@ func (t *Tortoise) OnBallot(ballot *types.BallotTortoiseData) {
if t.tracer != nil {
t.tracer.On(&BallotTrace{Ballot: ballot})
}
}

// OnRecoveredBallot is called for ballots recovered from database.
//
// For recovered ballots base ballot is not required to be in state therefore
// opinion is not recomputed, but instead recovered from database state.
func (t *Tortoise) OnRecoveredBallot(ballot *types.BallotTortoiseData) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.trtl.onRecoveredBallot(ballot); err != nil {
errorsCounter.Inc()
t.logger.Error("failed to save state from recovered ballot",
zap.Stringer("ballot", ballot.ID),
zap.Error(err))
}
if t.tracer != nil {
t.tracer.On(&BallotTrace{Ballot: ballot})
}
Expand Down Expand Up @@ -536,6 +569,11 @@ func (t *Tortoise) Mode() Mode {
// pending layer to the layer above equal layer.
// this method is meant to be used only in recovery from disk codepath.
func (t *Tortoise) resetPending(lid types.LayerID, opinion types.Hash32) bool {
t.logger.Debug("reset pending",
zap.Uint32("lid", lid.Uint32()),
log.ZShortStringer("computed", t.trtl.layer(lid).opinion),
log.ZShortStringer("stored", opinion),
)
if t.trtl.layer(lid).opinion == opinion {
t.trtl.pending = lid + 1
return true
Expand Down
46 changes: 12 additions & 34 deletions tortoise/model/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/beacons"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/layers"
Expand All @@ -32,13 +33,12 @@ func newCore(rng *rand.Rand, id string, logger log.Log) *core {
panic(err)
}
c := &core{
id: id,
logger: logger,
rng: rng,
cdb: cdb,
beacons: newBeaconStore(),
units: units,
signer: sig,
id: id,
logger: logger,
rng: rng,
cdb: cdb,
units: units,
signer: sig,
}
cfg := tortoise.DefaultConfig()
cfg.LayerSize = layerSize
Expand All @@ -59,7 +59,6 @@ type core struct {
rng *rand.Rand

cdb *datastore.CachedDB
beacons *beaconStore
tortoise *tortoise.Tortoise

// generated on setup
Expand Down Expand Up @@ -107,11 +106,11 @@ func (c *core) OnMessage(m Messenger, event Message) {
if c.refBallot != nil {
ballot.RefBallot = *c.refBallot
} else {
beacon, err := c.beacons.GetBeacon(ev.LayerID.GetEpoch())
beacon, err := beacons.Get(c.cdb, ev.LayerID.GetEpoch())
if err != nil {
beacon = types.Beacon{}
c.rng.Read(beacon[:])
c.beacons.StoreBeacon(ev.LayerID.GetEpoch(), beacon)
beacons.Set(c.cdb, ev.LayerID.GetEpoch(), beacon)
}
ballot.EpochData = &types.EpochData{
ActiveSetHash: types.Hash32{1, 2, 3},
Expand All @@ -129,7 +128,8 @@ func (c *core) OnMessage(m Messenger, event Message) {
m.Send(MessageBallot{Ballot: ballot})
case MessageLayerEnd:
if ev.LayerID.After(types.GetEffectiveGenesis()) {
tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb, c.beacons, ev.LayerID, ev.LayerID, ev.LayerID, ev.LayerID)
tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb, ev.LayerID, c.tortoise.OnBallot)
c.tortoise.TallyVotes(context.Background(), ev.LayerID)
m.Notify(EventVerified{ID: c.id, Verified: c.tortoise.LatestComplete(), Layer: ev.LayerID})
}

Expand Down Expand Up @@ -172,30 +172,8 @@ func (c *core) OnMessage(m Messenger, event Message) {
}
atxs.Add(c.cdb, vAtx)
case MessageBeacon:
c.beacons.StoreBeacon(ev.EpochID, ev.Beacon)
beacons.Add(c.cdb, ev.EpochID+1, ev.Beacon)
case MessageCoinflip:
layers.SetWeakCoin(c.cdb, ev.LayerID, ev.Coinflip)
}
}

func newBeaconStore() *beaconStore {
return &beaconStore{
beacons: map[types.EpochID]types.Beacon{},
}
}

type beaconStore struct {
beacons map[types.EpochID]types.Beacon
}

func (b *beaconStore) GetBeacon(eid types.EpochID) (types.Beacon, error) {
beacon, exist := b.beacons[eid-1]
if !exist {
return types.Beacon{}, sql.ErrNotFound
}
return beacon, nil
}

func (b *beaconStore) StoreBeacon(eid types.EpochID, beacon types.Beacon) {
b.beacons[eid] = beacon
}
82 changes: 51 additions & 31 deletions tortoise/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/beacons"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/system"
)

// Recover tortoise state from database.
func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID, beacon system.BeaconGetter, opts ...Opt) (*Tortoise, error) {
func Recover(ctx context.Context, db *datastore.CachedDB, current types.LayerID, opts ...Opt) (*Tortoise, error) {
trtl, err := New(opts...)
if err != nil {
return nil, err
Expand All @@ -29,6 +29,23 @@ func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID,
return nil, fmt.Errorf("failed to load latest known layer: %w", err)
}

applied, err := layers.GetLastApplied(db)
if err != nil {
return nil, fmt.Errorf("get last applied: %w", err)
}
start := types.GetEffectiveGenesis() + 1
if applied > types.LayerID(trtl.cfg.WindowSize) {
window := applied - types.LayerID(trtl.cfg.WindowSize)
window = window.GetEpoch().FirstLayer() // windback to the start of the epoch to load ref ballots
prev, err1 := layers.GetAggregatedHash(db, window-1)
opinion, err2 := layers.GetAggregatedHash(db, window)
if err1 == nil && err2 == nil {
// tortoise will need reference to previous layer
trtl.RecoverFrom(window, opinion, prev)
start = window
}
}

malicious, err := identities.GetMalicious(db)
if err != nil {
return nil, fmt.Errorf("recover malicious %w", err)
Expand All @@ -39,7 +56,7 @@ func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID,

if types.GetEffectiveGenesis() != types.FirstEffectiveGenesis() {
// need to load the golden atxs after a checkpoint recovery
if err := recoverEpoch(types.GetEffectiveGenesis().Add(1).GetEpoch(), trtl, db, beacon); err != nil {
if err := recoverEpoch(types.GetEffectiveGenesis().Add(1).GetEpoch(), trtl, db); err != nil {
return nil, err
}
}
Expand All @@ -51,42 +68,62 @@ func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID,
epoch++ // recoverEpoch expects target epoch, rather than publish
if last.GetEpoch() != epoch {
for eid := last.GetEpoch(); eid <= epoch; eid++ {
if err := recoverEpoch(eid, trtl, db, beacon); err != nil {
if err := recoverEpoch(eid, trtl, db); err != nil {
return nil, err
}
}
}
start := types.GetEffectiveGenesis().Add(1)
for lid := start; !lid.After(last); lid = lid.Add(1) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err := RecoverLayer(ctx, trtl, db, beacon, start, lid, last, min(last, latest)); err != nil {
if err := RecoverLayer(ctx, trtl, db, lid, trtl.OnRecoveredBallot); err != nil {
return nil, fmt.Errorf("failed to load tortoise state at layer %d: %w", lid, err)
}
}
if last == 0 {
last = current
} else {
last = min(last, current)
}
trtl.TallyVotes(ctx, last)
// find topmost layer that was already applied and reset pending
// so that result for that layer is not returned
for prev := last - 1; prev >= start; prev-- {
opinion, err := layers.GetAggregatedHash(db, prev)
if err == nil && opinion != types.EmptyLayerHash {
if trtl.resetPending(prev, opinion) {
break
}
}
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, fmt.Errorf("check opinion %w", err)
}
}
return trtl, nil
}

func recoverEpoch(epoch types.EpochID, trtl *Tortoise, db *datastore.CachedDB, beacondb system.BeaconGetter) error {
func recoverEpoch(epoch types.EpochID, trtl *Tortoise, db *datastore.CachedDB) error {
if err := db.IterateEpochATXHeaders(epoch, func(header *types.ActivationTxHeader) error {
trtl.OnAtx(header.ToData())
return nil
}); err != nil {
return err
}
beacon, err := beacondb.GetBeacon(epoch)
if err == nil {
beacon, err := beacons.Get(db, epoch)
if err == nil && beacon != types.EmptyBeacon {
trtl.OnBeacon(epoch, beacon)
}
return nil
}

func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, beacon system.BeaconGetter, start, lid, last, current types.LayerID) error {
type ballotFunc func(*types.BallotTortoiseData)

func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, lid types.LayerID, onBallot ballotFunc) error {
if lid.FirstInEpoch() {
if err := recoverEpoch(lid.GetEpoch(), trtl, db, beacon); err != nil {
if err := recoverEpoch(lid.GetEpoch(), trtl, db); err != nil {
return err
}
}
Expand Down Expand Up @@ -119,36 +156,19 @@ func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, b
}
for _, ballot := range ballotsrst {
if ballot.EpochData != nil {
trtl.OnBallot(ballot.ToTortoiseData())
onBallot(ballot.ToTortoiseData())
}
}
for _, ballot := range ballotsrst {
if ballot.EpochData == nil {
trtl.OnBallot(ballot.ToTortoiseData())
onBallot(ballot.ToTortoiseData())
}
}
coin, err := layers.GetWeakCoin(db, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return err
}
if err == nil {
} else if err == nil {
trtl.OnWeakCoin(lid, coin)
}
if lid <= current && (lid%types.LayerID(trtl.cfg.WindowSize) == 0 || lid == last) {
trtl.TallyVotes(ctx, lid)
// find topmost layer that was already applied and reset pending
// so that result for that layer is not returned
for prev := lid - 1; prev >= start; prev-- {
opinion, err := layers.GetAggregatedHash(db, prev)
if err == nil {
if trtl.resetPending(prev, opinion) {
return nil
}
} else if !errors.Is(err, sql.ErrNotFound) {
return fmt.Errorf("check opinion %w", err)
}
}

}
return nil
}

0 comments on commit 1b2dceb

Please sign in to comment.