Skip to content

Commit

Permalink
extract active set and save
Browse files Browse the repository at this point in the history
  • Loading branch information
countvonzero committed Sep 15, 2023
1 parent 3b9d9c3 commit 5031a73
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 31 deletions.
51 changes: 51 additions & 0 deletions mesh/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package mesh

import (
"context"
"errors"
"fmt"
"time"

"go.uber.org/zap"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/mesh/metrics"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/activesets"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/proposals"
"github.com/spacemeshos/go-spacemesh/sql/transactions"
Expand Down Expand Up @@ -59,3 +65,48 @@ func Prune(
}
}
}

func ExtractActiveSet(db sql.Executor) error {
latest, err := ballots.LatestLayer(db)
if err != nil {
return fmt.Errorf("extract get latest: %w", err)
}

Check warning on line 73 in mesh/janitor.go

View check run for this annotation

Codecov / codecov/patch

mesh/janitor.go#L72-L73

Added lines #L72 - L73 were not covered by tests
extracted := 0
unique := 0
log.With().Info("extracting ballots active sets",
log.Uint32("from", types.EpochID(2).FirstLayer().Uint32()),
log.Uint32("to", latest.Uint32()),
)
for lid := types.EpochID(2).FirstLayer(); lid <= latest; lid++ {
blts, err := ballots.Layer(db, lid)
if err != nil {
return fmt.Errorf("extract layer %d: %w", lid, err)
}

Check warning on line 84 in mesh/janitor.go

View check run for this annotation

Codecov / codecov/patch

mesh/janitor.go#L83-L84

Added lines #L83 - L84 were not covered by tests
for _, b := range blts {
if b.EpochData == nil {
continue
}
if len(b.ActiveSet) == 0 {
continue

Check warning on line 90 in mesh/janitor.go

View check run for this annotation

Codecov / codecov/patch

mesh/janitor.go#L90

Added line #L90 was not covered by tests
}
if err := activesets.Add(db, b.EpochData.ActiveSetHash, &types.EpochActiveSet{
Epoch: b.Layer.GetEpoch(),
Set: b.ActiveSet,
}); err != nil && !errors.Is(err, sql.ErrObjectExists) {
return fmt.Errorf("add active set %s (%s): %w", b.ID().String(), b.EpochData.ActiveSetHash.ShortString(), err)

Check warning on line 96 in mesh/janitor.go

View check run for this annotation

Codecov / codecov/patch

mesh/janitor.go#L96

Added line #L96 was not covered by tests
} else if err == nil {
unique++
}
b.ActiveSet = nil
if err := ballots.UpdateBlob(db, b.ID(), codec.MustEncode(b)); err != nil {
return fmt.Errorf("update ballot %s: %w", b.ID().String(), err)
}

Check warning on line 103 in mesh/janitor.go

View check run for this annotation

Codecov / codecov/patch

mesh/janitor.go#L102-L103

Added lines #L102 - L103 were not covered by tests
extracted++
}
}
log.With().Info("extracted active sets from ballots",
log.Int("num", extracted),
log.Int("unique", unique),
)
return nil
}
37 changes: 37 additions & 0 deletions mesh/janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/mesh/mocks"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/activesets"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/proposals"
Expand Down Expand Up @@ -94,3 +95,39 @@ func TestPrune(t *testing.T) {
cancel()
eg.Wait()
}

func TestExtractActiveSet(t *testing.T) {
db := sql.InMemory()
current := types.LayerID(20)
blts := make([]*types.Ballot, 0, current)
hashes := []types.Hash32{types.RandomHash(), types.RandomHash()}
actives := [][]types.ATXID{types.RandomActiveSet(11), types.RandomActiveSet(19)}
for lid := types.EpochID(2).FirstLayer(); lid < current; lid++ {
blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid)
if lid%3 == 0 {
blt.EpochData = &types.EpochData{
ActiveSetHash: hashes[0],
}
blt.ActiveSet = actives[0]
}
if lid%3 == 1 {
blt.EpochData = &types.EpochData{
ActiveSetHash: hashes[1],
}
blt.ActiveSet = actives[1]
}
require.NoError(t, ballots.Add(db, &blt))
blts = append(blts, &blt)
}
require.NoError(t, ExtractActiveSet(db))
for _, b := range blts {
got, err := ballots.Get(db, b.ID())
require.NoError(t, err)
require.Empty(t, got.ActiveSet)
}
for i, h := range hashes {
got, err := activesets.Get(db, h)
require.NoError(t, err)
require.Equal(t, actives[i], got.Set)
}
}
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering),
sql.WithV4PreMigration(mesh.ExtractActiveSet),
)
if err != nil {
return fmt.Errorf("open sqlite db %w", err)
Expand Down
11 changes: 11 additions & 0 deletions sql/ballots/ballots.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func Has(db sql.Executor, id types.BallotID) (bool, error) {
return rows > 0, nil
}

func UpdateBlob(db sql.Executor, bid types.BallotID, blob []byte) error {
if _, err := db.Exec(`update ballots set ballot = ?2 where id = ?1;`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, bid.Bytes())
stmt.BindBytes(2, blob[:])
}, nil); err != nil {
return fmt.Errorf("update blob %s: %w", bid.String(), err)
}

Check warning on line 80 in sql/ballots/ballots.go

View check run for this annotation

Codecov / codecov/patch

sql/ballots/ballots.go#L79-L80

Added lines #L79 - L80 were not covered by tests
return nil
}

// Get ballot with id from database.
func Get(db sql.Executor, id types.BallotID) (rst *types.Ballot, err error) {
if rows, err := db.Exec(`select pubkey, ballot, length(identities.proof)
Expand Down
21 changes: 21 additions & 0 deletions sql/ballots/ballots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
Expand Down Expand Up @@ -75,6 +76,26 @@ func TestAdd(t *testing.T) {
require.True(t, stored.IsMalicious())
}

func TestUpdateBlob(t *testing.T) {
db := sql.InMemory()
nodeID := types.RandomNodeID()
ballot := types.NewExistingBallot(types.BallotID{1}, types.RandomEdSignature(), nodeID, types.LayerID(0))
ballot.EpochData = &types.EpochData{
ActiveSetHash: types.RandomHash(),
}
ballot.ActiveSet = types.RandomActiveSet(199)
require.NoError(t, Add(db, &ballot))
got, err := Get(db, types.BallotID{1})
require.NoError(t, err)
require.Equal(t, ballot, *got)

ballot.ActiveSet = nil
require.NoError(t, UpdateBlob(db, types.BallotID{1}, codec.MustEncode(&ballot)))
got, err = Get(db, types.BallotID{1})
require.NoError(t, err)
require.Empty(t, got.ActiveSet)
}

func TestHas(t *testing.T) {
db := sql.InMemory()
ballot := types.NewExistingBallot(types.BallotID{1}, types.EmptyEdSignature, types.EmptyNodeID, types.LayerID(0))
Expand Down
28 changes: 21 additions & 7 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type conf struct {
connections int
migrations Migrations
enableLatency bool

// TODO: remove after state is pruned for majority
v4preMigration func(Executor) error
}

// WithConnections overwrites number of pooled connections.
Expand All @@ -75,6 +78,12 @@ func WithMigrations(migrations Migrations) Opt {
}
}

func WithV4PreMigration(cb func(Executor) error) Opt {
return func(c *conf) {
c.v4preMigration = cb
}
}

// WithLatencyMetering enables metric that track latency for every database query.
// Note that it will be a significant amount of data, and should not be enabled on
// multiple nodes by default.
Expand Down Expand Up @@ -115,18 +124,16 @@ func Open(uri string, opts ...Opt) (*Database, error) {
if config.enableLatency {
db.latency = newQueryLatency()
}
for i := 0; i < config.connections; i++ {
conn := pool.Get(context.Background())
if err := registerFunctions(conn); err != nil {
return nil, err
}
pool.Put(conn)
}
if config.migrations != nil {
before, err := version(db)
if err != nil {
return nil, err
}

Check warning on line 131 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L130-L131

Added lines #L130 - L131 were not covered by tests
if before == 3 && config.v4preMigration != nil {
if err := config.v4preMigration(db); err != nil {
return nil, err
}

Check warning on line 135 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L133-L135

Added lines #L133 - L135 were not covered by tests
}
tx, err := db.Tx(context.Background())
if err != nil {
return nil, err
Expand All @@ -145,6 +152,13 @@ func Open(uri string, opts ...Opt) (*Database, error) {
}

Check warning on line 152 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L150-L152

Added lines #L150 - L152 were not covered by tests
}
}
for i := 0; i < config.connections; i++ {
conn := pool.Get(context.Background())
if err := registerFunctions(conn); err != nil {
return nil, err
}
pool.Put(conn)
}
return db, nil
}

Expand Down
19 changes: 0 additions & 19 deletions sql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"fmt"

sqlite "github.com/go-llsqlite/crawshaw"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
)

func registerFunctions(conn *sqlite.Conn) error {
Expand All @@ -19,21 +16,5 @@ func registerFunctions(conn *sqlite.Conn) error {
}, nil, nil); err != nil {
return fmt.Errorf("registering add_uint64: %w", err)
}
// function to prune active set from old ballots
if err := conn.CreateFunction("prune_actives", true, 1, func(ctx sqlite.Context, values ...sqlite.Value) {
var ballot types.Ballot
if err := codec.Decode(values[0].Blob(), &ballot); err != nil {
ctx.ResultError(err)
} else {
ballot.ActiveSet = nil
if blob, err := codec.Encode(&ballot); err != nil {
ctx.ResultError(err)
} else {
ctx.ResultBlob(blob)
}
}
}, nil, nil); err != nil {
return fmt.Errorf("registering prune_actives: %w", err)
}
return nil
}
8 changes: 3 additions & 5 deletions sql/migrations/0004_next.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
DELETE FROM proposals WHERE layer < 19000;
DELETE FROM proposal_transactions WHERE layer < 19000;
UPDATE certificates SET cert = NULL WHERE layer < 19000;
UPDATE ballots SET ballot = prune_actives(ballot);

DELETE FROM proposals;
DELETE FROM proposal_transactions;
UPDATE certificates SET cert = NULL WHERE layer < 19000;

0 comments on commit 5031a73

Please sign in to comment.