Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sector prune #308

Merged
merged 5 commits into from Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
185 changes: 88 additions & 97 deletions persist/sqlite/contracts.go
Expand Up @@ -137,26 +137,43 @@ func (u *updateContractsTxn) ContractRelevant(id types.FileContractID) (bool, er
return err == nil, err
}

func (s *Store) batchExpireContractSectors(height uint64) (removed []contractSectorRef, pruned int, err error) {
func deleteExpiredContractSectors(tx txn, height uint64) (sectorIDs []int64, err error) {
const query = `DELETE FROM contract_sector_roots
WHERE id IN (SELECT csr.id FROM contract_sector_roots csr
INNER JOIN contracts c ON (csr.contract_id=c.id)
-- past proof window or not confirmed and past the rebroadcast height
WHERE c.window_end < $1 OR c.contract_status=$2 LIMIT $3)
RETURNING sector_id;`
rows, err := tx.Query(query, height, contracts.ContractStatusRejected, sqlSectorBatchSize)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return nil, err
}
sectorIDs = append(sectorIDs, id)
}
return sectorIDs, nil
}

func (s *Store) batchExpireContractSectors(height uint64) (expired int, removed []types.Hash256, err error) {
err = s.transaction(func(tx txn) (err error) {
removed, err = expiredContractSectors(tx, height, sqlSectorBatchSize)
sectorIDs, err := deleteExpiredContractSectors(tx, height)
if err != nil {
return fmt.Errorf("failed to select sectors: %w", err)
return fmt.Errorf("failed to delete contract sectors: %w", err)
}
expired = len(sectorIDs)

refs := make([]contractSectorRootRef, 0, len(removed))
for _, sector := range removed {
refs = append(refs, contractSectorRootRef{
dbID: sector.ID,
sectorID: sector.SectorID,
})
// decrement the contract metrics
if err := incrementNumericStat(tx, metricContractSectors, -len(sectorIDs), time.Now()); err != nil {
return fmt.Errorf("failed to decrement contract sectors: %w", err)
}

pruned, err = deleteContractSectors(tx, refs)
if err != nil {
return fmt.Errorf("failed to prune sectors: %w", err)
}
return nil
removed, err = pruneSectors(tx, sectorIDs)
return err
})
return
}
Expand Down Expand Up @@ -298,9 +315,8 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, roots []types.
return fmt.Errorf("failed to trim sectors: %w", err)
}
sectors -= change.A
removed := roots[len(roots)-int(change.A):]
for _, root := range removed {
if !trimmed[root] {
for i, root := range roots[len(roots)-int(change.A):] {
if trimmed[i] != root {
return fmt.Errorf("inconsistent sector trim: expected %s to be trimmed", root)
}
}
Expand Down Expand Up @@ -519,28 +535,16 @@ func (s *Store) UpdateContractState(ccID modules.ConsensusChangeID, height uint6
// ExpireContractSectors expires all sectors that are no longer covered by an
// active contract.
func (s *Store) ExpireContractSectors(height uint64) error {
var totalRemoved int
contractExpired := make(map[types.FileContractID]int)
defer func() {
for contractID, removed := range contractExpired {
s.log.Debug("expired contract sectors", zap.Stringer("contractID", contractID), zap.Uint64("height", height), zap.Int("expired", removed))
}
if totalRemoved > 0 {
s.log.Debug("removed contract sectors", zap.Uint64("height", height), zap.Int("removed", totalRemoved))
}
}()
log := s.log.Named("ExpireContractSectors").With(zap.Uint64("height", height))
// delete in batches to avoid holding a lock on the database for too long
for i := 0; ; i++ {
expired, removed, err := s.batchExpireContractSectors(height)
if err != nil {
return fmt.Errorf("failed to prune sectors: %w", err)
} else if len(expired) == 0 {
} else if expired == 0 {
return nil
}
for _, ref := range expired {
contractExpired[ref.ContractID]++
}
totalRemoved += removed
log.Debug("removed sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i))
jitterSleep(time.Millisecond) // allow other transactions to run
}
}
Expand All @@ -561,6 +565,7 @@ func getContract(tx txn, contractID int64) (contracts.Contract, error) {
return contract, err
}

// appendSector appends a new sector root to a contract.
func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) error {
var sectorID int64
err := tx.QueryRow(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, id, $2 FROM stored_sectors WHERE sector_root=$3 RETURNING sector_id`, contractID, index, sqlHash256(root)).Scan(&sectorID)
Expand All @@ -572,6 +577,7 @@ func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) er
return nil
}

// updateSector updates a contract sector root in place and returns the old sector root
func updateSector(tx txn, contractID int64, root types.Hash256, index uint64) (types.Hash256, error) {
row := tx.QueryRow(`SELECT csr.id, csr.sector_id, ss.sector_root
FROM contract_sector_roots csr
Expand All @@ -582,26 +588,28 @@ WHERE contract_id=$1 AND root_index=$2`, contractID, index)
return types.Hash256{}, fmt.Errorf("failed to get old sector id: %w", err)
}

// update the sector ID
var newSectorID int64
err = tx.QueryRow(`WITH sector AS (
SELECT id FROM stored_sectors WHERE sector_root=$1
)
UPDATE contract_sector_roots
SET sector_id=sector.id
FROM sector
WHERE contract_sector_roots.id=$2
RETURNING sector_id;`, sqlHash256(root), ref.dbID).Scan(&newSectorID)
err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&newSectorID)
if err != nil {
return types.Hash256{}, fmt.Errorf("failed to get new sector id: %w", err)
}

// update the sector ID
err = tx.QueryRow(`UPDATE contract_sector_roots
SET sector_id=$1
WHERE id=$2
RETURNING sector_id;`, newSectorID, ref.dbID).Scan(&newSectorID)
if err != nil {
return types.Hash256{}, fmt.Errorf("failed to update sector ID: %w", err)
}
// prune the old sector ID
if _, err := pruneSectorRef(tx, ref.sectorID); err != nil {
if _, err := pruneSectors(tx, []int64{ref.sectorID}); err != nil {
return types.Hash256{}, fmt.Errorf("failed to prune old sector: %w", err)
}
return ref.root, nil
}

// swapSectors swaps two sector roots in a contract and returns the sector roots
func swapSectors(tx txn, contractID int64, i, j uint64) (map[types.Hash256]bool, error) {
if i == j {
return nil, nil
Expand Down Expand Up @@ -656,11 +664,14 @@ ORDER BY root_index ASC;`, contractID, i, j)
}, nil
}

// lastContractSectors returns the last n sector IDs for a contract.
func lastContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
// lastNContractSectors returns the last n sector IDs for a contract.
func lastNContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
INNER JOIN stored_sectors ss ON (csr.sector_id=ss.id)
WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;`
WHERE csr.contract_id=$1
ORDER BY root_index DESC
LIMIT $2;`

rows, err := tx.Query(query, contractID, n)
if err != nil {
return nil, err
Expand All @@ -677,68 +688,48 @@ WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;`
return
}

// deleteContractSectors deletes sector roots from a contract. Sectors that are
// still referenced will not be removed. Returns the number of sectors deleted.
func deleteContractSectors(tx txn, refs []contractSectorRootRef) (int, error) {
var rootIDs []int64
for _, ref := range refs {
rootIDs = append(rootIDs, ref.dbID)
// deleteContractSectorRoots deletes the contract sector roots with the given IDs.
func deleteContractSectorRoots(tx txn, ids []int64) error {
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(ids)) + `);`
res, err := tx.Exec(query, queryArgs(ids)...)
if err != nil {
return fmt.Errorf("failed to delete contract sector roots: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n != int64(len(ids)) {
return fmt.Errorf("expected %v rows affected, got %v", len(ids), n)
}
return nil
}

// delete the sector roots
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(rootIDs)) + `) RETURNING id;`
rows, err := tx.Query(query, queryArgs(rootIDs)...)
// trimSectors deletes the last n sector roots for a contract and returns the
// deleted sector roots in order.
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) {
refs, err := lastNContractSectors(tx, contractID, n)
if err != nil {
return 0, fmt.Errorf("failed to delete sectors: %w", err)
}
deleted := make(map[int64]bool)
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return 0, fmt.Errorf("failed to scan deleted sector: %w", err)
}
deleted[id] = true
}
if len(deleted) != len(rootIDs) {
return 0, errors.New("failed to delete all sectors")
}
for _, rootID := range rootIDs {
if !deleted[rootID] {
return 0, errors.New("failed to delete all sectors")
}
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}

// decrement the contract metrics
if err := incrementNumericStat(tx, metricContractSectors, -len(refs), time.Now()); err != nil {
return 0, fmt.Errorf("failed to decrement contract sectors: %w", err)
var contractSectorRootIDs []int64
roots := make([]types.Hash256, len(refs))
var sectorIDs []int64
for i, ref := range refs {
contractSectorRootIDs = append(contractSectorRootIDs, ref.dbID)
roots[len(roots)-i-1] = ref.root // reverse the order to match the contract sector roots
sectorIDs = append(sectorIDs, ref.sectorID)
}

// attempt to prune the deleted sectors
var pruned int
for _, ref := range refs {
deleted, err := pruneSectorRef(tx, ref.sectorID)
if err != nil {
return 0, fmt.Errorf("failed to prune sector ref: %w", err)
} else if deleted {
pruned++
}
if err := deleteContractSectorRoots(tx, contractSectorRootIDs); err != nil {
return nil, fmt.Errorf("failed to delete contract sector roots: %w", err)
} else if err := incrementNumericStat(tx, metricContractSectors, -len(contractSectorRootIDs), time.Now()); err != nil {
return nil, fmt.Errorf("failed to decrement contract sectors: %w", err)
}
return pruned, nil
}

// trimSectors deletes the last n sector roots for a contract.
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) (map[types.Hash256]bool, error) {
refs, err := lastContractSectors(tx, contractID, n)
removed, err := pruneSectors(tx, sectorIDs)
if err != nil {
return nil, fmt.Errorf("failed to get sector IDs: %w", err)
} else if _, err = deleteContractSectors(tx, refs); err != nil {
return nil, fmt.Errorf("failed to delete sectors: %w", err)
}

roots := make(map[types.Hash256]bool)
for _, ref := range refs {
roots[ref.root] = true
return nil, fmt.Errorf("failed to prune sectors: %w", err)
}
log.Debug("trimmed sectors", zap.Stringers("trimmed", roots), zap.Stringers("removed", removed))
return roots, nil
}

Expand Down
5 changes: 3 additions & 2 deletions persist/sqlite/contracts_test.go
Expand Up @@ -242,7 +242,8 @@ func TestReviseContract(t *testing.T) {
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
func() {
t.Log("revising contract:", test.name)
oldRoots := append([]types.Hash256(nil), roots...)
// update the expected roots
for i, change := range test.changes {
Expand Down Expand Up @@ -295,7 +296,7 @@ func TestReviseContract(t *testing.T) {
} else if err := checkConsistency(roots, test.sectors); err != nil {
t.Fatal(err)
}
})
}()
}
}

Expand Down
45 changes: 44 additions & 1 deletion persist/sqlite/migrations.go
Expand Up @@ -10,8 +10,51 @@ import (
"go.uber.org/zap"
)

// migrateVersion26 recalculates the contract and physical sectors metrics
func migrateVersion26(tx txn, log *zap.Logger) error {
// recalculate the contract sectors metric
var contractSectorCount int64
if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil {
return fmt.Errorf("failed to query contract sector count: %w", err)
} else if err := setNumericStat(tx, metricContractSectors, uint64(contractSectorCount), time.Now()); err != nil {
return fmt.Errorf("failed to set contract sectors metric: %w", err)
}

// recalculate the physical sectors metric
var physicalSectorsCount int64
volumePhysicalSectorCount := make(map[int64]int64)
rows, err := tx.Query(`SELECT volume_id, COUNT(*) FROM volume_sectors WHERE sector_id IS NOT NULL GROUP BY volume_id`)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to query volume sector count: %w", err)
}
defer rows.Close()

for rows.Next() {
var volumeID, count int64
if err := rows.Scan(&volumeID, &count); err != nil {
return fmt.Errorf("failed to scan volume sector count: %w", err)
}
volumePhysicalSectorCount[volumeID] = count
physicalSectorsCount += count
}

// update the physical sectors metric
if err := setNumericStat(tx, metricPhysicalSectors, uint64(physicalSectorsCount), time.Now()); err != nil {
return fmt.Errorf("failed to set contract sectors metric: %w", err)
}

// update the volume stats
for volumeID, count := range volumePhysicalSectorCount {
err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors = $1 WHERE id = $2 RETURNING id`, count, volumeID).Scan(&volumeID)
if err != nil {
return fmt.Errorf("failed to update volume stats: %w", err)
}
}
return nil
}

// migrateVersion25 is a no-op migration to trigger foreign key checks
func migrateVersion25(tx txn, log *zap.Logger) error {
// no-op migration to trigger foreign key checks
return nil
}

Expand Down