diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 8599b132..e4882f91 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -137,26 +137,43 @@ func (u *updateContractsTxn) ContractRelevant(id types.FileContractID) (bool, er return err == nil, err } -func (s *Store) batchExpireContractSectors(height uint64) (removed []contractSectorRef, pruned int, err error) { +func deleteExpiredContractSectors(tx txn, height uint64) (sectorIDs []int64, err error) { + const query = `DELETE FROM contract_sector_roots +WHERE id IN (SELECT csr.id FROM contract_sector_roots csr +INNER JOIN contracts c ON (csr.contract_id=c.id) +-- past proof window or not confirmed and past the rebroadcast height +WHERE c.window_end < $1 OR c.contract_status=$2 LIMIT $3) +RETURNING sector_id;` + rows, err := tx.Query(query, height, contracts.ContractStatusRejected, sqlSectorBatchSize) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + return nil, err + } + sectorIDs = append(sectorIDs, id) + } + return sectorIDs, nil +} + +func (s *Store) batchExpireContractSectors(height uint64) (expired int, removed []types.Hash256, err error) { err = s.transaction(func(tx txn) (err error) { - removed, err = expiredContractSectors(tx, height, sqlSectorBatchSize) + sectorIDs, err := deleteExpiredContractSectors(tx, height) if err != nil { - return fmt.Errorf("failed to select sectors: %w", err) + return fmt.Errorf("failed to delete contract sectors: %w", err) } + expired = len(sectorIDs) - refs := make([]contractSectorRootRef, 0, len(removed)) - for _, sector := range removed { - refs = append(refs, contractSectorRootRef{ - dbID: sector.ID, - sectorID: sector.SectorID, - }) + // decrement the contract metrics + if err := incrementNumericStat(tx, metricContractSectors, -len(sectorIDs), time.Now()); err != nil { + return fmt.Errorf("failed to decrement contract sectors: %w", err) } - pruned, err = deleteContractSectors(tx, refs) - if err != nil { - return fmt.Errorf("failed to prune sectors: %w", err) - } - return nil + removed, err = pruneSectors(tx, sectorIDs) + return err }) return } @@ -298,9 +315,8 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, roots []types. return fmt.Errorf("failed to trim sectors: %w", err) } sectors -= change.A - removed := roots[len(roots)-int(change.A):] - for _, root := range removed { - if !trimmed[root] { + for i, root := range roots[len(roots)-int(change.A):] { + if trimmed[i] != root { return fmt.Errorf("inconsistent sector trim: expected %s to be trimmed", root) } } @@ -519,28 +535,16 @@ func (s *Store) UpdateContractState(ccID modules.ConsensusChangeID, height uint6 // ExpireContractSectors expires all sectors that are no longer covered by an // active contract. func (s *Store) ExpireContractSectors(height uint64) error { - var totalRemoved int - contractExpired := make(map[types.FileContractID]int) - defer func() { - for contractID, removed := range contractExpired { - s.log.Debug("expired contract sectors", zap.Stringer("contractID", contractID), zap.Uint64("height", height), zap.Int("expired", removed)) - } - if totalRemoved > 0 { - s.log.Debug("removed contract sectors", zap.Uint64("height", height), zap.Int("removed", totalRemoved)) - } - }() + log := s.log.Named("ExpireContractSectors").With(zap.Uint64("height", height)) // delete in batches to avoid holding a lock on the database for too long for i := 0; ; i++ { expired, removed, err := s.batchExpireContractSectors(height) if err != nil { return fmt.Errorf("failed to prune sectors: %w", err) - } else if len(expired) == 0 { + } else if expired == 0 { return nil } - for _, ref := range expired { - contractExpired[ref.ContractID]++ - } - totalRemoved += removed + log.Debug("removed sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i)) jitterSleep(time.Millisecond) // allow other transactions to run } } @@ -561,6 +565,7 @@ func getContract(tx txn, contractID int64) (contracts.Contract, error) { return contract, err } +// appendSector appends a new sector root to a contract. func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) error { var sectorID int64 err := tx.QueryRow(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, id, $2 FROM stored_sectors WHERE sector_root=$3 RETURNING sector_id`, contractID, index, sqlHash256(root)).Scan(§orID) @@ -572,6 +577,7 @@ func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) er return nil } +// updateSector updates a contract sector root in place and returns the old sector root func updateSector(tx txn, contractID int64, root types.Hash256, index uint64) (types.Hash256, error) { row := tx.QueryRow(`SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr @@ -582,26 +588,28 @@ WHERE contract_id=$1 AND root_index=$2`, contractID, index) return types.Hash256{}, fmt.Errorf("failed to get old sector id: %w", err) } - // update the sector ID var newSectorID int64 - err = tx.QueryRow(`WITH sector AS ( - SELECT id FROM stored_sectors WHERE sector_root=$1 - ) - UPDATE contract_sector_roots - SET sector_id=sector.id - FROM sector - WHERE contract_sector_roots.id=$2 - RETURNING sector_id;`, sqlHash256(root), ref.dbID).Scan(&newSectorID) + err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&newSectorID) + if err != nil { + return types.Hash256{}, fmt.Errorf("failed to get new sector id: %w", err) + } + + // update the sector ID + err = tx.QueryRow(`UPDATE contract_sector_roots + SET sector_id=$1 + WHERE id=$2 + RETURNING sector_id;`, newSectorID, ref.dbID).Scan(&newSectorID) if err != nil { return types.Hash256{}, fmt.Errorf("failed to update sector ID: %w", err) } // prune the old sector ID - if _, err := pruneSectorRef(tx, ref.sectorID); err != nil { + if _, err := pruneSectors(tx, []int64{ref.sectorID}); err != nil { return types.Hash256{}, fmt.Errorf("failed to prune old sector: %w", err) } return ref.root, nil } +// swapSectors swaps two sector roots in a contract and returns the sector roots func swapSectors(tx txn, contractID int64, i, j uint64) (map[types.Hash256]bool, error) { if i == j { return nil, nil @@ -656,11 +664,14 @@ ORDER BY root_index ASC;`, contractID, i, j) }, nil } -// lastContractSectors returns the last n sector IDs for a contract. -func lastContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) { - const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr +// lastNContractSectors returns the last n sector IDs for a contract. +func lastNContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) { + const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr INNER JOIN stored_sectors ss ON (csr.sector_id=ss.id) -WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;` +WHERE csr.contract_id=$1 +ORDER BY root_index DESC +LIMIT $2;` + rows, err := tx.Query(query, contractID, n) if err != nil { return nil, err @@ -677,68 +688,48 @@ WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;` return } -// deleteContractSectors deletes sector roots from a contract. Sectors that are -// still referenced will not be removed. Returns the number of sectors deleted. -func deleteContractSectors(tx txn, refs []contractSectorRootRef) (int, error) { - var rootIDs []int64 - for _, ref := range refs { - rootIDs = append(rootIDs, ref.dbID) +// deleteContractSectorRoots deletes the contract sector roots with the given IDs. +func deleteContractSectorRoots(tx txn, ids []int64) error { + query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(ids)) + `);` + res, err := tx.Exec(query, queryArgs(ids)...) + if err != nil { + return fmt.Errorf("failed to delete contract sector roots: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != int64(len(ids)) { + return fmt.Errorf("expected %v rows affected, got %v", len(ids), n) } + return nil +} - // delete the sector roots - query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(rootIDs)) + `) RETURNING id;` - rows, err := tx.Query(query, queryArgs(rootIDs)...) +// trimSectors deletes the last n sector roots for a contract and returns the +// deleted sector roots in order. +func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) { + refs, err := lastNContractSectors(tx, contractID, n) if err != nil { - return 0, fmt.Errorf("failed to delete sectors: %w", err) - } - deleted := make(map[int64]bool) - for rows.Next() { - var id int64 - if err := rows.Scan(&id); err != nil { - return 0, fmt.Errorf("failed to scan deleted sector: %w", err) - } - deleted[id] = true - } - if len(deleted) != len(rootIDs) { - return 0, errors.New("failed to delete all sectors") - } - for _, rootID := range rootIDs { - if !deleted[rootID] { - return 0, errors.New("failed to delete all sectors") - } + return nil, fmt.Errorf("failed to get sector roots: %w", err) } - // decrement the contract metrics - if err := incrementNumericStat(tx, metricContractSectors, -len(refs), time.Now()); err != nil { - return 0, fmt.Errorf("failed to decrement contract sectors: %w", err) + var contractSectorRootIDs []int64 + roots := make([]types.Hash256, len(refs)) + var sectorIDs []int64 + for i, ref := range refs { + contractSectorRootIDs = append(contractSectorRootIDs, ref.dbID) + roots[len(roots)-i-1] = ref.root // reverse the order to match the contract sector roots + sectorIDs = append(sectorIDs, ref.sectorID) } - // attempt to prune the deleted sectors - var pruned int - for _, ref := range refs { - deleted, err := pruneSectorRef(tx, ref.sectorID) - if err != nil { - return 0, fmt.Errorf("failed to prune sector ref: %w", err) - } else if deleted { - pruned++ - } + if err := deleteContractSectorRoots(tx, contractSectorRootIDs); err != nil { + return nil, fmt.Errorf("failed to delete contract sector roots: %w", err) + } else if err := incrementNumericStat(tx, metricContractSectors, -len(contractSectorRootIDs), time.Now()); err != nil { + return nil, fmt.Errorf("failed to decrement contract sectors: %w", err) } - return pruned, nil -} -// trimSectors deletes the last n sector roots for a contract. -func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) (map[types.Hash256]bool, error) { - refs, err := lastContractSectors(tx, contractID, n) + removed, err := pruneSectors(tx, sectorIDs) if err != nil { - return nil, fmt.Errorf("failed to get sector IDs: %w", err) - } else if _, err = deleteContractSectors(tx, refs); err != nil { - return nil, fmt.Errorf("failed to delete sectors: %w", err) - } - - roots := make(map[types.Hash256]bool) - for _, ref := range refs { - roots[ref.root] = true + return nil, fmt.Errorf("failed to prune sectors: %w", err) } + log.Debug("trimmed sectors", zap.Stringers("trimmed", roots), zap.Stringers("removed", removed)) return roots, nil } diff --git a/persist/sqlite/contracts_test.go b/persist/sqlite/contracts_test.go index ee35222d..32af0192 100644 --- a/persist/sqlite/contracts_test.go +++ b/persist/sqlite/contracts_test.go @@ -242,7 +242,8 @@ func TestReviseContract(t *testing.T) { }, } for _, test := range tests { - t.Run(test.name, func(t *testing.T) { + func() { + t.Log("revising contract:", test.name) oldRoots := append([]types.Hash256(nil), roots...) // update the expected roots for i, change := range test.changes { @@ -295,7 +296,7 @@ func TestReviseContract(t *testing.T) { } else if err := checkConsistency(roots, test.sectors); err != nil { t.Fatal(err) } - }) + }() } } diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 84729416..25e05ca3 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,8 +10,51 @@ import ( "go.uber.org/zap" ) +// migrateVersion26 recalculates the contract and physical sectors metrics +func migrateVersion26(tx txn, log *zap.Logger) error { + // recalculate the contract sectors metric + var contractSectorCount int64 + if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil { + return fmt.Errorf("failed to query contract sector count: %w", err) + } else if err := setNumericStat(tx, metricContractSectors, uint64(contractSectorCount), time.Now()); err != nil { + return fmt.Errorf("failed to set contract sectors metric: %w", err) + } + + // recalculate the physical sectors metric + var physicalSectorsCount int64 + volumePhysicalSectorCount := make(map[int64]int64) + rows, err := tx.Query(`SELECT volume_id, COUNT(*) FROM volume_sectors WHERE sector_id IS NOT NULL GROUP BY volume_id`) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to query volume sector count: %w", err) + } + defer rows.Close() + + for rows.Next() { + var volumeID, count int64 + if err := rows.Scan(&volumeID, &count); err != nil { + return fmt.Errorf("failed to scan volume sector count: %w", err) + } + volumePhysicalSectorCount[volumeID] = count + physicalSectorsCount += count + } + + // update the physical sectors metric + if err := setNumericStat(tx, metricPhysicalSectors, uint64(physicalSectorsCount), time.Now()); err != nil { + return fmt.Errorf("failed to set contract sectors metric: %w", err) + } + + // update the volume stats + for volumeID, count := range volumePhysicalSectorCount { + err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors = $1 WHERE id = $2 RETURNING id`, count, volumeID).Scan(&volumeID) + if err != nil { + return fmt.Errorf("failed to update volume stats: %w", err) + } + } + return nil +} + +// migrateVersion25 is a no-op migration to trigger foreign key checks func migrateVersion25(tx txn, log *zap.Logger) error { - // no-op migration to trigger foreign key checks return nil } diff --git a/persist/sqlite/sectors.go b/persist/sqlite/sectors.go index ee6f14e0..e620bed8 100644 --- a/persist/sqlite/sectors.go +++ b/persist/sqlite/sectors.go @@ -16,44 +16,44 @@ type tempSectorRef struct { SectorID int64 } -func (s *Store) batchExpireTempSectors(height uint64) (refs []tempSectorRef, reclaimed int, err error) { - err = s.transaction(func(tx txn) error { - refs, err = expiredTempSectors(tx, height, sqlSectorBatchSize) - if err != nil { - return fmt.Errorf("failed to select sectors: %w", err) - } else if len(refs) == 0 { - return nil - } +func deleteTempSectors(tx txn, height uint64) (sectorIDs []int64, err error) { + const query = `DELETE FROM temp_storage_sector_roots +WHERE id IN (SELECT id FROM temp_storage_sector_roots WHERE expiration_height <= $1 LIMIT $2) +RETURNING sector_id;` + + rows, err := tx.Query(query, height, sqlSectorBatchSize) + if err != nil { + return nil, fmt.Errorf("failed to select sectors: %w", err) + } + defer rows.Close() - var tempIDs []int64 - for _, ref := range refs { - tempIDs = append(tempIDs, ref.ID) + for rows.Next() { + var sectorID int64 + if err := rows.Scan(§orID); err != nil { + return nil, fmt.Errorf("failed to scan sector id: %w", err) } + sectorIDs = append(sectorIDs, sectorID) + } + return +} - // delete the sectors - query := `DELETE FROM temp_storage_sector_roots WHERE id IN (` + queryPlaceHolders(len(tempIDs)) + `);` - res, err := tx.Exec(query, queryArgs(tempIDs)...) +func (s *Store) batchExpireTempSectors(height uint64) (expired int, pruned []types.Hash256, err error) { + err = s.transaction(func(tx txn) error { + sectorIDs, err := deleteTempSectors(tx, height) if err != nil { return fmt.Errorf("failed to delete sectors: %w", err) - } else if rows, err := res.RowsAffected(); err != nil { - return fmt.Errorf("failed to get rows affected: %w", err) - } else if rows != int64(len(tempIDs)) { - return fmt.Errorf("failed to delete all sectors: %w", err) + } else if len(sectorIDs) == 0 { + return nil } + expired = len(sectorIDs) + // decrement the temp sectors metric - if err := incrementNumericStat(tx, metricTempSectors, -len(refs), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricTempSectors, -len(sectorIDs), time.Now()); err != nil { return fmt.Errorf("failed to update metric: %w", err) } - for _, ref := range refs { - deleted, err := pruneSectorRef(tx, ref.SectorID) - if err != nil { - return fmt.Errorf("failed to prune sector: %w", err) - } else if deleted { - reclaimed++ - } - } - return nil + pruned, err = pruneSectors(tx, sectorIDs) + return err }) return } @@ -111,7 +111,7 @@ func (s *Store) SectorLocation(root types.Hash256) (storage.SectorLocation, func } unlock := func() error { return s.transaction(func(tx txn) error { - return unlockSector(tx, lockID) + return unlockSector(tx, s.log.Named("SectorLocation"), lockID) }) } return location, unlock, nil @@ -143,22 +143,16 @@ func (s *Store) AddTemporarySectors(sectors []storage.TempSector) error { // ExpireTempSectors deletes the roots of sectors that are no longer // temporarily stored on the host. func (s *Store) ExpireTempSectors(height uint64) error { - var totalExpired, totalRemoved int - defer func() { - if totalExpired > 0 || totalRemoved > 0 { - s.log.Debug("expired temp sectors", zap.Uint64("height", height), zap.Int("expired", totalExpired), zap.Int("removed", totalRemoved)) - } - }() + log := s.log.Named("ExpireTempSectors").With(zap.Uint64("height", height)) // delete in batches to avoid holding a lock on the table for too long - for { + for i := 0; ; i++ { expired, removed, err := s.batchExpireTempSectors(height) if err != nil { return fmt.Errorf("failed to expire sectors: %w", err) - } else if len(expired) == 0 { + } else if expired == 0 { return nil } - totalExpired += len(expired) - totalRemoved += removed + log.Debug("expired temp sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i)) jitterSleep(time.Millisecond) // allow other transactions to run } } @@ -245,56 +239,87 @@ func incrementVolumeUsage(tx txn, volumeID int64, delta int) error { return nil } -func clearVolumeSector(tx txn, id int64) error { - var volumeDBID int64 - err := tx.QueryRow(`UPDATE volume_sectors SET sector_id=NULL WHERE sector_id=$1 RETURNING volume_id`, id).Scan(&volumeDBID) - if errors.Is(err, sql.ErrNoRows) { - return nil - } else if err != nil { - return err +func pruneSectors(tx txn, ids []int64) (pruned []types.Hash256, err error) { + hasContractRefStmt, err := tx.Prepare(`SELECT id FROM contract_sector_roots WHERE sector_id=$1 LIMIT 1`) + if err != nil { + return nil, fmt.Errorf("failed to prepare contract reference query: %w", err) } + defer hasContractRefStmt.Close() - // decrement the volume usage - if err = incrementVolumeUsage(tx, volumeDBID, -1); err != nil { - return fmt.Errorf("failed to update volume usage: %w", err) + hasTempRefStmt, err := tx.Prepare(`SELECT id FROM temp_storage_sector_roots WHERE sector_id=$1 LIMIT 1`) + if err != nil { + return nil, fmt.Errorf("failed to prepare temp reference query: %w", err) } - return nil -} + defer hasTempRefStmt.Close() -func pruneSectorRef(tx txn, id int64) (bool, error) { - var hasReference bool - // check if the sector is referenced by a contract - err := tx.QueryRow(`SELECT EXISTS(SELECT 1 FROM contract_sector_roots WHERE sector_id=$1)`, id).Scan(&hasReference) + hasLockStmt, err := tx.Prepare(`SELECT id FROM locked_sectors WHERE sector_id=$1 LIMIT 1`) if err != nil { - return false, fmt.Errorf("failed to check contract references: %w", err) - } else if hasReference { - return false, nil + return nil, fmt.Errorf("failed to prepare lock reference query: %w", err) } - // check if the sector is referenced by temp storage - err = tx.QueryRow(`SELECT EXISTS(SELECT 1 FROM temp_storage_sector_roots WHERE sector_id=$1)`, id).Scan(&hasReference) + defer hasLockStmt.Close() + + clearVolumeStmt, err := tx.Prepare(`UPDATE volume_sectors SET sector_id=NULL WHERE sector_id=$1 RETURNING volume_id`) if err != nil { - return false, fmt.Errorf("failed to check temp references: %w", err) - } else if hasReference { - return false, nil + return nil, fmt.Errorf("failed to prepare volume reference query: %w", err) } - // check if the sector is locked - err = tx.QueryRow(`SELECT EXISTS(SELECT 1 FROM locked_sectors WHERE sector_id=$1)`, id).Scan(&hasReference) + defer clearVolumeStmt.Close() + + deleteSectorStmt, err := tx.Prepare(`DELETE FROM stored_sectors WHERE id=$1 RETURNING sector_root`) if err != nil { - return false, fmt.Errorf("failed to check lock references: %w", err) - } else if hasReference { - return false, nil + return nil, fmt.Errorf("failed to prepare delete sector query: %w", err) } + defer deleteSectorStmt.Close() + + volumeDelta := make(map[int64]int) + for _, id := range ids { + var contractDBID int64 + err := hasContractRefStmt.QueryRow(id).Scan(&contractDBID) + if err == nil { + continue // sector has a contract reference + } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to check contract references: %w", err) + } - // clear the volume sector reference - if err = clearVolumeSector(tx, id); err != nil { - return false, fmt.Errorf("failed to clear volume sector: %w", err) + var tempDBID int64 + err = hasTempRefStmt.QueryRow(id).Scan(&tempDBID) + if err == nil { + continue // sector has a temp storage reference + } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to check temp references: %w", err) + } + + var lockDBID int64 + err = hasLockStmt.QueryRow(id).Scan(&lockDBID) + if err == nil { + continue // sector is locked + } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to check lock references: %w", err) + } + + var volumeDBID int64 + err = clearVolumeStmt.QueryRow(id).Scan(&volumeDBID) + if err == nil { + volumeDelta[volumeDBID]-- // sector was removed from a volume + } else if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found + return nil, fmt.Errorf("failed to clear volume references: %w", err) + } + + var root types.Hash256 + err = deleteSectorStmt.QueryRow(id).Scan((*sqlHash256)(&root)) + if err == nil { + pruned = append(pruned, root) + } else if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found + return nil, fmt.Errorf("failed to delete sector: %w", err) + } } - // delete the sector - if _, err = tx.Exec(`DELETE FROM stored_sectors WHERE id=$1`, id); err != nil { - return false, fmt.Errorf("failed to delete sector: %w", err) + // decrement the usage of all changed volumes + for volumeDBID, delta := range volumeDelta { + if err := incrementVolumeUsage(tx, volumeDBID, delta); err != nil { + return nil, fmt.Errorf("failed to update volume usage: %w", err) + } } - return true, nil + return } func expiredTempSectors(tx txn, height uint64, limit int) (sectors []tempSectorRef, _ error) { @@ -348,7 +373,7 @@ func deleteLocks(tx txn, ids []int64) (sectorIDs []int64, err error) { } // unlockSector unlocks a sector root. -func unlockSector(txn txn, lockIDs ...int64) error { +func unlockSector(txn txn, log *zap.Logger, lockIDs ...int64) error { if len(lockIDs) == 0 { return nil } @@ -358,12 +383,11 @@ func unlockSector(txn txn, lockIDs ...int64) error { return fmt.Errorf("failed to delete locks: %w", err) } - for _, sectorID := range sectorIDs { - _, err := pruneSectorRef(txn, sectorID) - if err != nil { - return fmt.Errorf("failed to prune sector: %w", err) - } + pruned, err := pruneSectors(txn, sectorIDs) + if err != nil { + return fmt.Errorf("failed to prune sectors: %w", err) } + log.Debug("unlocked sectors", zap.Int("unlocked", len(lockIDs)), zap.Stringers("removed", pruned)) return nil } diff --git a/persist/sqlite/store.go b/persist/sqlite/store.go index 6cf051d4..2011c88e 100644 --- a/persist/sqlite/store.go +++ b/persist/sqlite/store.go @@ -155,7 +155,7 @@ func doTransaction(db *sql.DB, log *zap.Logger, fn func(tx txn) error) error { return nil } -func clearLockedSectors(tx txn) error { +func clearLockedSectors(tx txn, log *zap.Logger) error { rows, err := tx.Query(`DELETE FROM locked_sectors RETURNING sector_id`) if err != nil { return err @@ -169,11 +169,11 @@ func clearLockedSectors(tx txn) error { } } - for _, sectorID := range sectorIDs { - if _, err := pruneSectorRef(tx, sectorID); err != nil { - return fmt.Errorf("failed to prune sector %d: %w", sectorID, err) - } + removed, err := pruneSectors(tx, sectorIDs) + if err != nil { + return fmt.Errorf("failed to prune sectors: %w", err) } + log.Debug("cleared locked sectors", zap.Int("locked", len(sectorIDs)), zap.Stringers("removed", removed)) return nil } @@ -186,7 +186,7 @@ func (s *Store) clearLocks() error { return s.transaction(func(tx txn) error { if err := clearLockedLocations(tx); err != nil { return fmt.Errorf("failed to clear locked locations: %w", err) - } else if err = clearLockedSectors(tx); err != nil { + } else if err = clearLockedSectors(tx, s.log.Named("clearLockedSectors")); err != nil { return fmt.Errorf("failed to clear locked sectors: %w", err) } return nil diff --git a/persist/sqlite/volumes.go b/persist/sqlite/volumes.go index 0f375462..ffe24614 100644 --- a/persist/sqlite/volumes.go +++ b/persist/sqlite/volumes.go @@ -67,7 +67,7 @@ func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func( } // unlock the locations defer unlockLocations(&dbTxn{s}, locationLocks) - defer unlockSector(&dbTxn{s}, sectorLock) + defer unlockSector(&dbTxn{s}, log.Named("unlockSector"), sectorLock) // call the migrateFn with the new location, data should be copied to the // new location and synced to disk @@ -218,6 +218,7 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati var location storage.SectorLocation var exists bool + log := s.log.Named("StoreSector").With(zap.Stringer("root", root)) err := s.transaction(func(tx txn) error { sectorID, err := insertSectorDBID(tx, root) if err != nil { @@ -270,12 +271,13 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati if err != nil { return nil, err } - + log = log.With(zap.Int64("volume", location.Volume), zap.Uint64("index", location.Index)) + log.Debug("stored sector") unlock := func() error { return s.transaction(func(tx txn) error { if err := unlockLocations(tx, locationLocks); err != nil { return fmt.Errorf("failed to unlock sector location: %w", err) - } else if err := unlockSector(tx, sectorLockID); err != nil { + } else if err := unlockSector(tx, log.Named("unlock"), sectorLockID); err != nil { return fmt.Errorf("failed to unlock sector: %w", err) } return nil diff --git a/persist/sqlite/volumes_test.go b/persist/sqlite/volumes_test.go index 7cd22f3e..d3c92185 100644 --- a/persist/sqlite/volumes_test.go +++ b/persist/sqlite/volumes_test.go @@ -755,7 +755,7 @@ func TestPrune(t *testing.T) { } // unlock locked sectors - if err := unlockSector(&dbTxn{db}, locks...); err != nil { + if err := unlockSector(&dbTxn{db}, log.Named("unlockSector"), locks...); err != nil { t.Fatal(err) }