diff --git a/host/metrics/types.go b/host/metrics/types.go index 1b5c6b1c..3535b0b9 100644 --- a/host/metrics/types.go +++ b/host/metrics/types.go @@ -84,6 +84,7 @@ type ( Storage struct { TotalSectors uint64 `json:"totalSectors"` PhysicalSectors uint64 `json:"physicalSectors"` + LostSectors uint64 `json:"lostSectors"` ContractSectors uint64 `json:"contractSectors"` TempSectors uint64 `json:"tempSectors"` diff --git a/host/storage/persist.go b/host/storage/persist.go index cdd285b8..b31342a5 100644 --- a/host/storage/persist.go +++ b/host/storage/persist.go @@ -1,12 +1,17 @@ package storage import ( + "context" "errors" "go.sia.tech/core/types" ) type ( + // MigrateFunc is a callback function that is called for each sector that + // needs to be migrated If the function returns an error, the sector should + // be skipped and migration should continue. + MigrateFunc func(location SectorLocation) error // A VolumeStore stores and retrieves information about storage volumes. VolumeStore interface { @@ -23,7 +28,7 @@ type ( // RemoveVolume removes a storage volume from the volume store. If there // are used sectors in the volume, ErrVolumeNotEmpty is returned. If // force is true, the volume is removed even if it is not empty. - RemoveVolume(volumeID int64) error + RemoveVolume(volumeID int64, force bool) error // GrowVolume grows a storage volume's metadata to maxSectors. If the // number of sectors in the volume is already greater than maxSectors, // nil is returned. @@ -39,9 +44,9 @@ type ( // MigrateSectors returns a new location for each occupied sector of a // volume starting at min. The sector data should be copied to the new - // location and synced to disk during migrateFn. Iteration is stopped if - // migrateFn returns an error. - MigrateSectors(volumeID int64, min uint64, migrateFn func(SectorLocation) error) error + // location and synced to disk during migrateFn. If migrateFn returns an + // error, migration will continue, but that sector is not migrated. + MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn MigrateFunc) (migrated, failed int, err error) // StoreSector calls fn with an empty location in a writable volume. If // the sector root already exists, fn is called with the existing // location and exists is true. Unless exists is true, The sector must @@ -74,6 +79,9 @@ type ( ) var ( + // ErrMigrationFailed is returned when a volume fails to migrate all + // of its sectors. + ErrMigrationFailed = errors.New("migration failed") // ErrNotEnoughStorage is returned when there is not enough storage space to // store a sector. ErrNotEnoughStorage = errors.New("not enough storage") diff --git a/host/storage/storage.go b/host/storage/storage.go index 3e7505f6..76f25ece 100644 --- a/host/storage/storage.go +++ b/host/storage/storage.go @@ -156,7 +156,7 @@ func (vm *VolumeManager) loadVolumes() error { // migrateSector migrates a sector to a new location. The sector is read from // its current location and written to its new location. The volume is // immediately synced after the sector is written. -func (vm *VolumeManager) migrateSector(loc SectorLocation, log *zap.Logger) error { +func (vm *VolumeManager) migrateSector(loc SectorLocation) error { // read the sector from the old location sector, err := vm.Read(loc.Root) if err != nil { @@ -269,14 +269,8 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol // migrate any sectors outside of the target range. var migrated int - err := vm.vs.MigrateSectors(id, newMaxSectors, func(newLoc SectorLocation) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil { + migrated, failed, err := vm.vs.MigrateSectors(ctx, id, newMaxSectors, func(newLoc SectorLocation) error { + if err := vm.migrateSector(newLoc); err != nil { return err } migrated++ @@ -285,9 +279,11 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol vm.a.Register(a) return nil }) - log.Info("migrated sectors", zap.Int("count", migrated)) + log.Info("migrated sectors", zap.Int("migrated", migrated), zap.Int("failed", failed)) if err != nil { return err + } else if failed > 0 { + return ErrMigrationFailed } for current := oldMaxSectors; current > newMaxSectors; { @@ -337,73 +333,6 @@ func (vm *VolumeManager) volumeStats(id int64) VolumeStats { return v.Stats() } -func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int64, localPath string, force bool, log *zap.Logger) (int, error) { - ctx, cancel, err := vm.tg.AddContext(ctx) - if err != nil { - return 0, err - } - defer cancel() - - // add an alert for the migration - a := alerts.Alert{ - ID: frand.Entropy256(), - Message: "Migrating sectors", - Severity: alerts.SeverityInfo, - Data: map[string]interface{}{ - "volumeID": id, - "migrated": 0, - "force": force, - }, - Timestamp: time.Now(), - } - vm.a.Register(a) - // dismiss the alert when the function returns. It is the caller's - // responsibility to register a completion alert - defer vm.a.Dismiss(a.ID) - - // migrate sectors to other volumes - var migrated, failed int - err = vm.vs.MigrateSectors(id, 0, func(newLoc SectorLocation) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil { - log.Error("failed to migrate sector", zap.Stringer("sectorRoot", newLoc.Root), zap.Error(err)) - if force { - failed++ - a.Data["failed"] = failed - return nil - } - return err - } - migrated++ - // update the alert - a.Data["migrated"] = migrated - vm.a.Register(a) - return nil - }) - if err != nil { - return migrated, fmt.Errorf("failed to migrate sector data: %w", err) - } else if err := vm.vs.RemoveVolume(id); err != nil { - return migrated, fmt.Errorf("failed to remove volume: %w", err) - } - - vm.mu.Lock() - defer vm.mu.Unlock() - // close the volume - vm.volumes[id].Close() - // delete the volume from memory - delete(vm.volumes, id) - // remove the volume file, ignore error if the file does not exist - if err := os.Remove(localPath); err != nil && !errors.Is(err, os.ErrNotExist) { - return migrated, fmt.Errorf("failed to remove volume file: %w", err) - } - return migrated, nil -} - // Close gracefully shutsdown the volume manager. func (vm *VolumeManager) Close() error { // wait for all operations to stop @@ -592,7 +521,7 @@ func (vm *VolumeManager) SetReadOnly(id int64, readOnly bool) error { // RemoveVolume removes a volume from the manager. func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, result chan<- error) error { - log := vm.log.Named("remove").With(zap.Int64("volumeID", id)) + log := vm.log.Named("remove").With(zap.Int64("volumeID", id), zap.Bool("force", force)) done, err := vm.tg.Add() if err != nil { return err @@ -604,7 +533,10 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, vm.mu.Unlock() if !ok { return fmt.Errorf("volume %v not found", id) - } else if err := vol.SetStatus(VolumeStatusRemoving); err != nil { + } + + oldStatus := vol.Status() + if err := vol.SetStatus(VolumeStatusRemoving); err != nil { return fmt.Errorf("failed to set volume status: %w", err) } @@ -618,38 +550,83 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, return fmt.Errorf("failed to set volume %v to read-only: %w", id, err) } + alert := alerts.Alert{ + ID: frand.Entropy256(), + Message: "Removing volume", + Severity: alerts.SeverityInfo, + Data: map[string]interface{}{ + "volumeID": id, + "sectors": stat.TotalSectors, + "used": stat.UsedSectors, + "migrated": 0, + "failed": 0, + }, + Timestamp: time.Now(), + } + go func() { - start := time.Now() - defer vol.SetStatus(VolumeStatusReady) + defer vol.SetStatus(oldStatus) - migrated, err := vm.migrateForRemoval(ctx, id, stat.LocalPath, force, log) + var migrated, failed int + + updateRemovalAlert := func(message string, severity alerts.Severity, err error) { + alert.Message = message + alert.Severity = severity + alert.Data["migrated"] = migrated + alert.Data["failed"] = failed + if err != nil { + alert.Data["error"] = err.Error() + } + vm.a.Register(alert) + } + + migrated, failed, err := vm.vs.MigrateSectors(ctx, id, 0, func(newLoc SectorLocation) error { + err := vm.migrateSector(newLoc) + if err != nil { + failed++ + } else { + migrated++ + } + updateRemovalAlert("Removing volume", alerts.SeverityInfo, nil) // error is ignored during migration + return err + }) if err != nil { log.Error("failed to migrate sectors", zap.Error(err)) + // update the alert + updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err) + result <- err + return + } else if !force && failed > 0 { + updateRemovalAlert("Failed to remove volume", alerts.SeverityError, ErrMigrationFailed) + result <- ErrMigrationFailed + return } - alert := alerts.Alert{ - ID: frand.Entropy256(), - Data: map[string]interface{}{ - "volumeID": id, - "elapsed": time.Since(start), - "migratedSectors": migrated, - }, - Timestamp: time.Now(), - } - if err != nil { - alert.Message = "Volume removal failed" - alert.Severity = alerts.SeverityError - alert.Data["error"] = err.Error() - } else { - alert.Message = "Volume removed" - alert.Severity = alerts.SeverityInfo + // close the volume and remove it from memory + if err := vol.Close(); err != nil { + log.Error("failed to close volume", zap.Error(err)) + updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err) + result <- err + return + } else if err := os.Remove(stat.LocalPath); err != nil && !errors.Is(err, os.ErrNotExist) { + log.Error("failed to remove volume file", zap.Error(err)) + updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err) + result <- err + return } - vm.a.Register(alert) + delete(vm.volumes, id) - select { - case result <- err: - default: + // remove the volume from the volume store + if err := vm.vs.RemoveVolume(id, force); err != nil { + log.Error("failed to remove volume", zap.Error(err)) + // update the alert + updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err) + result <- err + return } + + updateRemovalAlert("Volume removed", alerts.SeverityInfo, nil) + result <- nil }() return nil diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index f2c2772e..0ce00f98 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "testing" + "time" rhp2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" @@ -284,13 +285,13 @@ func TestRemoveVolume(t *testing.T) { } defer release() - // attempt to remove the volume. Should return ErrNotEnoughStorage since + // attempt to remove the volume. Should return ErrMigrationFailed since // there is only one volume. if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { // blocking error should be nil t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { - // async error should be ErrNotEnoughStorage + } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { + // async error should be ErrMigrationFailed t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } @@ -379,22 +380,52 @@ func TestRemoveCorrupt(t *testing.T) { defer release() } - // attempt to remove the volume. Should return ErrNotEnoughStorage since + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } + + if vol, err := vm.Volume(volume.ID); err != nil { + t.Fatal(err) + } else if vol.Status != storage.VolumeStatusReady { + t.Fatal("volume should be ready") + } else if vol.UsedSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) + } + + // attempt to remove the volume. Should return ErrMigrationFailed since // there is only one volume. if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { // blocking error should be nil t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { - // async error should be ErrNotEnoughStorage - t.Fatalf("expected ErrNotEnoughStorage, got %v", err) + } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { + // async error should be ErrMigrationFailed + t.Fatalf("expected ErrMigrationFailed, got %v", err) } - // add a second volume to the manager - _, err = vm.AddVolume(context.Background(), filepath.Join(t.TempDir(), "vol2.dat"), expectedSectors, result) - if err != nil { + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { t.Fatal(err) - } else if err := <-result; err != nil { + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } + + if vol, err := vm.Volume(volume.ID); err != nil { t.Fatal(err) + } else if vol.Status != storage.VolumeStatusReady { + t.Fatal("volume should be ready") + } else if vol.UsedSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) } f, err := os.OpenFile(volumePath, os.O_RDWR, 0) @@ -417,7 +448,46 @@ func TestRemoveCorrupt(t *testing.T) { t.Fatal(err) // blocking error should be nil } else if err := <-result; err == nil { t.Fatal("expected error when removing corrupt volume", err) + } else if !errors.Is(err, storage.ErrMigrationFailed) { + t.Fatalf("expected ErrMigrationFailed, got %v", err) + } + + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) } + + if vol, err := vm.Volume(volume.ID); err != nil { + t.Fatal(err) + } else if vol.Status != storage.VolumeStatusReady { + t.Fatal("volume should be ready") + } else if vol.UsedSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) + } + + // add a second volume to accept the data + volume2, err := vm.AddVolume(context.Background(), filepath.Join(t.TempDir(), "hostdata.dat"), expectedSectors, result) + if err != nil { + t.Fatal(err) + } else if err := <-result; err != nil { + t.Fatal(err) + } + + // check that the volume metrics doubled + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != expectedSectors*2 { + t.Fatalf("expected %v total sectors, got %v", expectedSectors*2, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } + // force remove the volume if err := vm.RemoveVolume(context.Background(), volume.ID, true, result); err != nil { t.Fatal(err) @@ -426,6 +496,27 @@ func TestRemoveCorrupt(t *testing.T) { } else if _, err := os.Stat(volumePath); !errors.Is(err, os.ErrNotExist) { t.Fatal("volume file still exists", err) } + + // check that the corrupt sector was removed from the volume metrics + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors-1, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 9 { + t.Fatalf("expected 9 used sectors, got %v", m.Storage.PhysicalSectors) + } else if m.Storage.LostSectors != 1 { + t.Fatalf("expected 1 lost sectors, got %v", m.Storage.LostSectors) + } + + if vol, err := vm.Volume(volume2.ID); err != nil { + t.Fatal(err) + } else if vol.Status != storage.VolumeStatusReady { + t.Fatal("volume should be ready") + } else if vol.UsedSectors != 9 { + t.Fatalf("expected 9 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) + } } func TestRemoveMissing(t *testing.T) { @@ -498,20 +589,31 @@ func TestRemoveMissing(t *testing.T) { defer release() } - // attempt to remove the volume. Should return ErrNotEnoughStorage since + // attempt to remove the volume. Should return ErrMigrationFailed since // there is only one volume. if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { - t.Fatalf("expected ErrNotEnoughStorage, got %v", err) + } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { + t.Fatalf("expected ErrMigrationFailed, got %v", err) } - // add a second volume to the manager - _, err = vm.AddVolume(context.Background(), filepath.Join(t.TempDir(), "vol2.dat"), expectedSectors, result) - if err != nil { + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { t.Fatal(err) - } else if err := <-result; err != nil { + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } + + if vol, err := vm.Volume(volume.ID); err != nil { t.Fatal(err) + } else if vol.Status != storage.VolumeStatusReady { + t.Fatal("volume should be ready") + } else if vol.UsedSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) } // close the volume manager @@ -531,18 +633,49 @@ func TestRemoveMissing(t *testing.T) { } defer vm.Close() - vol, err := vm.Volume(volume.ID) - if err != nil { + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } + + if vol, err := vm.Volume(volume.ID); err != nil { t.Fatal(err) } else if vol.Status != storage.VolumeStatusUnavailable { t.Fatal("volume should be unavailable") + } else if vol.UsedSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) } // remove the volume if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { t.Fatal(err) - } else if err := <-result; err == nil { - t.Fatal("expected error when removing missing volume") + } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { + t.Fatalf("expected ErrMigrationFailed, got %v", err) + } + + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } + + if vol, err := vm.Volume(volume.ID); err != nil { + t.Fatal(err) + } else if vol.Status != storage.VolumeStatusUnavailable { + t.Fatal("volume should be unavailable") + } else if vol.UsedSectors != 10 { + t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) + } else if vol.TotalSectors != expectedSectors { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) } if err := vm.RemoveVolume(context.Background(), volume.ID, true, result); err != nil { @@ -550,6 +683,21 @@ func TestRemoveMissing(t *testing.T) { } else if err := <-result; err != nil { t.Fatal(err) } + + // check that the volume metrics did not change + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != 0 { + t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != 0 { + t.Fatalf("expected 0 used sectors, got %v", m.Storage.PhysicalSectors) + } else if m.Storage.LostSectors != 10 { + t.Fatalf("expected 10 lost sectors, got %v", m.Storage.LostSectors) + } + + if _, err := vm.Volume(volume.ID); !errors.Is(err, storage.ErrVolumeNotFound) { + t.Fatalf("expected ErrVolumeNotFound, got %v", err) + } } func TestVolumeDistribution(t *testing.T) { @@ -1113,8 +1261,8 @@ func TestVolumeShrink(t *testing.T) { remainingSectors := uint64(sectors - toRemove) if err := vm.ResizeVolume(context.Background(), volume.ID, remainingSectors, result); err != nil { t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { - t.Fatalf("expected not enough storage error, got %v", err) + } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { + t.Fatalf("expected ErrMigrationFailed, got %v", err) } // remove some sectors from the beginning of the volume diff --git a/persist/sqlite/metrics.go b/persist/sqlite/metrics.go index ab9a6fc7..bfb16f4c 100644 --- a/persist/sqlite/metrics.go +++ b/persist/sqlite/metrics.go @@ -28,6 +28,7 @@ const ( // storage metricTotalSectors = "totalSectors" metricPhysicalSectors = "physicalSectors" + metricLostSectors = "lostSectors" metricContractSectors = "contractSectors" metricTempSectors = "tempSectors" metricSectorReads = "sectorReads" @@ -362,6 +363,8 @@ func mustParseMetricValue(stat string, buf []byte, m *metrics.Metrics) { m.Storage.TotalSectors = mustScanUint64(buf) case metricPhysicalSectors: m.Storage.PhysicalSectors = mustScanUint64(buf) + case metricLostSectors: + m.Storage.LostSectors = mustScanUint64(buf) case metricContractSectors: m.Storage.ContractSectors = mustScanUint64(buf) case metricTempSectors: diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 25e05ca3..666f6376 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,8 +10,8 @@ import ( "go.uber.org/zap" ) -// migrateVersion26 recalculates the contract and physical sectors metrics -func migrateVersion26(tx txn, log *zap.Logger) error { +// migrateVersion25 recalculates the contract and physical sectors metrics +func migrateVersion25(tx txn, log *zap.Logger) error { // recalculate the contract sectors metric var contractSectorCount int64 if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil { @@ -53,11 +53,6 @@ func migrateVersion26(tx txn, log *zap.Logger) error { return nil } -// migrateVersion25 is a no-op migration to trigger foreign key checks -func migrateVersion25(tx txn, log *zap.Logger) error { - return nil -} - // migrateVersion24 combines the rhp2 and rhp3 data metrics func migrateVersion24(tx txn, log *zap.Logger) error { rows, err := tx.Query(`SELECT date_created, stat, stat_value FROM host_stats WHERE stat IN (?, ?, ?, ?) ORDER BY date_created ASC`, metricRHP2Ingress, metricRHP2Egress, metricRHP3Ingress, metricRHP3Egress) @@ -738,4 +733,5 @@ var migrations = []func(tx txn, log *zap.Logger) error{ migrateVersion22, migrateVersion23, migrateVersion24, + migrateVersion25, } diff --git a/persist/sqlite/sectors.go b/persist/sqlite/sectors.go index e620bed8..53937965 100644 --- a/persist/sqlite/sectors.go +++ b/persist/sqlite/sectors.go @@ -11,11 +11,6 @@ import ( "go.uber.org/zap" ) -type tempSectorRef struct { - ID int64 - SectorID int64 -} - func deleteTempSectors(tx txn, height uint64) (sectorIDs []int64, err error) { const query = `DELETE FROM temp_storage_sector_roots WHERE id IN (SELECT id FROM temp_storage_sector_roots WHERE expiration_height <= $1 LIMIT $2) @@ -78,6 +73,8 @@ func (s *Store) RemoveSector(root types.Hash256) (err error) { // decrement volume usage and metrics if err = incrementVolumeUsage(tx, volumeID, -1); err != nil { return fmt.Errorf("failed to update volume usage: %w", err) + } else if err := incrementNumericStat(tx, metricLostSectors, 1, time.Now()); err != nil { + return fmt.Errorf("failed to update metric: %w", err) } return nil }) @@ -274,42 +271,42 @@ func pruneSectors(tx txn, ids []int64) (pruned []types.Hash256, err error) { for _, id := range ids { var contractDBID int64 err := hasContractRefStmt.QueryRow(id).Scan(&contractDBID) - if err == nil { - continue // sector has a contract reference - } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("failed to check contract references: %w", err) + } else if err == nil { + continue // sector has a contract reference } var tempDBID int64 err = hasTempRefStmt.QueryRow(id).Scan(&tempDBID) - if err == nil { - continue // sector has a temp storage reference - } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("failed to check temp references: %w", err) + } else if err == nil { + continue // sector has a temp storage reference } var lockDBID int64 err = hasLockStmt.QueryRow(id).Scan(&lockDBID) - if err == nil { - continue // sector is locked - } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("failed to check lock references: %w", err) + } else if err == nil { + continue // sector is locked } var volumeDBID int64 err = clearVolumeStmt.QueryRow(id).Scan(&volumeDBID) - if err == nil { - volumeDelta[volumeDBID]-- // sector was removed from a volume - } else if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found + if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found return nil, fmt.Errorf("failed to clear volume references: %w", err) + } else if err == nil { + volumeDelta[volumeDBID]-- // sector was removed from a volume } var root types.Hash256 err = deleteSectorStmt.QueryRow(id).Scan((*sqlHash256)(&root)) - if err == nil { - pruned = append(pruned, root) - } else if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found + if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found return nil, fmt.Errorf("failed to delete sector: %w", err) + } else if err == nil { + pruned = append(pruned, root) } } @@ -322,24 +319,6 @@ func pruneSectors(tx txn, ids []int64) (pruned []types.Hash256, err error) { return } -func expiredTempSectors(tx txn, height uint64, limit int) (sectors []tempSectorRef, _ error) { - const query = `SELECT ts.id, ts.sector_id FROM temp_storage_sector_roots ts -WHERE expiration_height <= $1 LIMIT $2;` - rows, err := tx.Query(query, height, limit) - if err != nil { - return nil, fmt.Errorf("failed to select sectors: %w", err) - } - defer rows.Close() - for rows.Next() { - var ref tempSectorRef - if err := rows.Scan(&ref.ID, &ref.SectorID); err != nil { - return nil, fmt.Errorf("failed to scan sector id: %w", err) - } - sectors = append(sectors, ref) - } - return -} - // lockSector locks a sector root. The lock must be released by calling // unlockSector. A sector must be locked when it is being read or written // to prevent it from being removed by prune sector. diff --git a/persist/sqlite/volumes.go b/persist/sqlite/volumes.go index ffe24614..1c6fbeb3 100644 --- a/persist/sqlite/volumes.go +++ b/persist/sqlite/volumes.go @@ -1,9 +1,11 @@ package sqlite import ( + "context" "database/sql" "errors" "fmt" + "math" "time" "go.sia.tech/core/types" @@ -11,24 +13,21 @@ import ( "go.uber.org/zap" ) -type volumeSectorRef struct { - ID int64 - Empty bool -} - -var errNoSectorsToMigrate = errors.New("no sectors to migrate") - -func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func(location storage.SectorLocation) error, log *zap.Logger) error { +func (s *Store) migrateSector(volumeID int64, minIndex uint64, marker int64, migrateFn storage.MigrateFunc, log *zap.Logger) (int64, bool, error) { start := time.Now() var locationLocks []int64 var sectorLock int64 var oldLoc, newLoc storage.SectorLocation err := s.transaction(func(tx txn) (err error) { - oldLoc, err = sectorForMigration(tx, volumeID, startIndex) - if err != nil { + oldLoc, err = sectorForMigration(tx, volumeID, marker) + if errors.Is(err, sql.ErrNoRows) { + marker = math.MaxInt64 + return nil + } else if err != nil { return fmt.Errorf("failed to get sector for migration: %w", err) } + marker = int64(oldLoc.Index) sectorDBID, err := sectorDBID(tx, oldLoc.Root) if err != nil { @@ -41,10 +40,10 @@ func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func( } newLoc, err = emptyLocationForMigration(tx, volumeID) - if errors.Is(err, storage.ErrNotEnoughStorage) && startIndex > 0 { + if errors.Is(err, storage.ErrNotEnoughStorage) && minIndex > 0 { // if there is no space in other volumes, try to migrate within the // same volume - newLoc, err = locationWithinVolume(tx, volumeID, startIndex) + newLoc, err = locationWithinVolume(tx, volumeID, uint64(minIndex)) if err != nil { return fmt.Errorf("failed to get empty location in volume: %w", err) } @@ -59,11 +58,14 @@ func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func( if err != nil { return fmt.Errorf("failed to lock sectors: %w", err) } - return nil }) - if err != nil { - return fmt.Errorf("failed to migrate sector: %w", err) + if errors.Is(err, storage.ErrNotEnoughStorage) { + return marker, false, nil + } else if err != nil { + return 0, false, fmt.Errorf("failed to get new location: %w", err) + } else if marker == math.MaxInt64 { + return marker, false, nil } // unlock the locations defer unlockLocations(&dbTxn{s}, locationLocks) @@ -72,7 +74,8 @@ func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func( // call the migrateFn with the new location, data should be copied to the // new location and synced to disk if err := migrateFn(newLoc); err != nil { - return fmt.Errorf("failed to migrate data: %w", err) + log.Error("failed to migrate sector data", zap.Error(err)) + return marker, false, nil } // update the sector location in a separate transaction @@ -109,51 +112,94 @@ func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func( } return nil }) + if err != nil { + return 0, false, fmt.Errorf("failed to update sector metadata: %w", err) + } log.Debug("migrated sector", zap.Uint64("oldIndex", oldLoc.Index), zap.Stringer("root", newLoc.Root), zap.Int64("newVolume", newLoc.Volume), zap.Uint64("newIndex", newLoc.Index), zap.Duration("elapsed", time.Since(start))) - return err + return marker, true, nil } -func (s *Store) batchRemoveVolume(id int64) (bool, error) { - var done bool - err := s.transaction(func(tx txn) error { - var dbID int64 - err := tx.QueryRow(`SELECT id FROM volume_sectors WHERE volume_id=$1 AND sector_id IS NOT NULL LIMIT 1;`, id).Scan(&dbID) - if err == nil { - return storage.ErrVolumeNotEmpty - } else if err != nil && !errors.Is(err, sql.ErrNoRows) { - return fmt.Errorf("failed to check if volume is empty: %w", err) - } +func forceDeleteVolumeSectors(tx txn, volumeID int64) (removed, lost int64, err error) { + const query = `DELETE FROM volume_sectors WHERE id IN (SELECT id FROM volume_sectors WHERE volume_id=$1 LIMIT $2) RETURNING sector_id IS NULL AS empty` - locations, err := volumeSectorsForDeletion(tx, id, sqlSectorBatchSize) - if err != nil { - return fmt.Errorf("failed to get volume sectors: %w", err) - } else if len(locations) == 0 { - done = true - return nil // no more sectors to remove + rows, err := tx.Query(query, volumeID, sqlSectorBatchSize) + if err != nil { + return 0, 0, fmt.Errorf("failed to remove volume sectors: %w", err) + } + defer rows.Close() + + for rows.Next() { + var empty bool + if err := rows.Scan(&empty); err != nil { + return 0, 0, fmt.Errorf("failed to scan volume sector: %w", err) } - locIDs := make([]int64, 0, len(locations)) - for _, loc := range locations { - locIDs = append(locIDs, loc.ID) + removed++ + if !empty { + lost++ } + } + err = rows.Err() + return +} - // remove the sectors - deleteQuery := `DELETE FROM volume_sectors WHERE id IN (` + queryPlaceHolders(len(locIDs)) + `)` - _, err = tx.Exec(deleteQuery, queryArgs(locIDs)...) - if err != nil { - return fmt.Errorf("failed to remove volume sectors: %w", err) +func deleteVolumeSectors(tx txn, volumeID int64) (removed int64, err error) { + // check that the volume is empty + var dummyID int64 + err = tx.QueryRow(`SELECT id FROM volume_sectors WHERE volume_id=$1 AND sector_id IS NOT NULL LIMIT 1`, volumeID).Scan(&dummyID) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return 0, fmt.Errorf("failed to check volume sectors: %w", err) + } else if err == nil { + return 0, storage.ErrVolumeNotEmpty + } + + const query = `DELETE FROM volume_sectors WHERE id IN (SELECT id FROM volume_sectors WHERE volume_id=$1 AND sector_id IS NULL LIMIT $2)` + res, err := tx.Exec(query, volumeID, sqlSectorBatchSize) + if err != nil { + return 0, fmt.Errorf("failed to remove volume sectors: %w", err) + } + removed, err = res.RowsAffected() + return +} + +func (s *Store) batchRemoveVolumeSectors(id int64, force bool) (removed, lost int64, err error) { + err = s.transaction(func(tx txn) error { + if force { + removed, lost, err = forceDeleteVolumeSectors(tx, id) + if err != nil { + return fmt.Errorf("failed to remove volume sectors: %w", err) + } + + if lost > 0 { + // special case: if the volume sectors are force deleted, any + // unmigrated sectors be deducted from the physical sector + // count. + if err := incrementNumericStat(tx, metricPhysicalSectors, -int(lost), time.Now()); err != nil { + return fmt.Errorf("failed to update physical sector metric: %w", err) + } else if err := incrementNumericStat(tx, metricLostSectors, int(lost), time.Now()); err != nil { + return fmt.Errorf("failed to update lost sector metric: %w", err) + } + } + } else { + removed, err = deleteVolumeSectors(tx, id) + if err != nil { + return fmt.Errorf("failed to remove volume sectors: %w", err) + } } const updateMetaQuery = `UPDATE storage_volumes SET total_sectors=total_sectors-$1 WHERE id=$2` - _, err = tx.Exec(updateMetaQuery, len(locIDs), id) + _, err = tx.Exec(updateMetaQuery, removed, id) if err != nil { return fmt.Errorf("failed to update volume metadata: %w", err) - } else if err := incrementNumericStat(tx, metricTotalSectors, -len(locIDs), time.Now()); err != nil { + } else if err := incrementNumericStat(tx, metricTotalSectors, -int(removed), time.Now()); err != nil { return fmt.Errorf("failed to update total sector metric: %w", err) } return nil }) - return done, err + if lost > 0 && !force { + panic("lost sectors without force delete") // dev error + } + return } // StorageUsage returns the number of sectors stored and the total number of sectors @@ -293,18 +339,41 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati } // MigrateSectors migrates each occupied sector of a volume starting at -// startIndex. The sector data should be copied to the new location and synced -// to disk during migrateFn. -func (s *Store) MigrateSectors(volumeID int64, startIndex uint64, migrateFn func(location storage.SectorLocation) error) error { +// startIndex. migrateFn will be called for each sector that needs to be migrated. +// The sector data should be copied to the new location and synced +// to disk immediately. If migrateFn returns an error, that sector will be +// considered failed and the migration will continue. If the context is +// canceled, the migration will stop and the error will be returned. The +// number of sectors migrated and failed will always be returned, even if an +// error occurs. +func (s *Store) MigrateSectors(ctx context.Context, volumeID int64, startIndex uint64, migrateFn storage.MigrateFunc) (migrated, failed int, err error) { log := s.log.Named("migrate").With(zap.Int64("oldVolume", volumeID), zap.Uint64("startIndex", startIndex)) + // the migration function is called in a loop until all sectors are migrated + // marker is used to skip sectors that tried to migrate but failed. + // when removing a volume, marker is -1 to also migrate the first sector + marker := int64(startIndex) - 1 for i := 0; ; i++ { - if err := s.migrateSector(volumeID, startIndex, migrateFn, log); err != nil { - if errors.Is(err, errNoSectorsToMigrate) { - return nil - } - return fmt.Errorf("failed to migrate sector: %w", err) + if ctx.Err() != nil { + err = ctx.Err() + return + } + + var successful bool + marker, successful, err = s.migrateSector(volumeID, startIndex, marker, migrateFn, log) + if err != nil { + err = fmt.Errorf("failed to migrate sector: %w", err) + return + } else if marker == math.MaxInt64 { + return } - if i%64 == 0 { + + if successful { + migrated++ + } else { + failed++ + } + + if i%256 == 0 { jitterSleep(time.Millisecond) // allow other transactions to run } } @@ -320,22 +389,44 @@ func (s *Store) AddVolume(localPath string, readOnly bool) (volumeID int64, err // RemoveVolume removes a storage volume from the volume store. If there // are used sectors in the volume, ErrVolumeNotEmpty is returned. If force is // true, the volume is removed regardless of whether it is empty. -func (s *Store) RemoveVolume(id int64) error { +func (s *Store) RemoveVolume(id int64, force bool) error { + log := s.log.Named("RemoveVolume").With(zap.Int64("volume", id), zap.Bool("force", force)) // remove the volume sectors in batches to avoid holding a transaction lock // for too long - for { - done, err := s.batchRemoveVolume(id) + for i := 0; ; i++ { + removed, lost, err := s.batchRemoveVolumeSectors(id, force) + log.Debug("removed volume sectors", zap.Int("batch", i), zap.Int64("removed", removed), zap.Int64("lost", lost), zap.Error(err)) if err != nil { return err - } else if done { + } else if removed == 0 { break } jitterSleep(time.Millisecond) } - if _, err := s.exec(`DELETE FROM storage_volumes WHERE id=?`, id); err != nil { - return fmt.Errorf("failed to remove volume: %w", err) - } - return nil + + return s.transaction(func(tx txn) error { + // check that the volume exists + var volumeID int64 + err := tx.QueryRow(`SELECT id FROM storage_volumes WHERE id=$1`, id).Scan(&volumeID) + if errors.Is(err, sql.ErrNoRows) { + return storage.ErrVolumeNotFound + } else if err != nil { + return fmt.Errorf("failed to check volume: %w", err) + } + + // check that the volume is empty + var volumeSectorID int64 + err = tx.QueryRow(`SELECT id FROM volume_sectors WHERE volume_id=$1 LIMIT 1`, id).Scan(&volumeSectorID) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to check volume sectors: %w", err) + } else if err == nil { + return storage.ErrVolumeNotEmpty + } + + // delete the volume + _, err = tx.Exec(`DELETE FROM storage_volumes WHERE id=$1`, id) + return err + }) } // GrowVolume grows a storage volume's metadata by n sectors. @@ -538,19 +629,16 @@ ORDER BY used_sectors ASC LIMIT 1;` } // sectorForMigration returns the location of the first occupied sector in the -// volume starting at minIndex. If there are no sectors to migrate, -// errNoSectorsToMigrate is returned. -func sectorForMigration(tx txn, volumeID int64, minIndex uint64) (loc storage.SectorLocation, err error) { +// volume starting at minIndex and greater than marker. +func sectorForMigration(tx txn, volumeID int64, marker int64) (loc storage.SectorLocation, err error) { const query = `SELECT vs.id, vs.volume_id, vs.volume_index, s.sector_root FROM volume_sectors vs INNER JOIN stored_sectors s ON (s.id=vs.sector_id) - WHERE vs.sector_id IS NOT NULL AND vs.volume_id=$1 AND vs.volume_index >= $2 + WHERE vs.sector_id IS NOT NULL AND vs.volume_id=$1 AND vs.volume_index > $2 + ORDER BY vs.volume_index ASC LIMIT 1` - err = tx.QueryRow(query, volumeID, minIndex).Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root)) - if errors.Is(err, sql.ErrNoRows) { - return storage.SectorLocation{}, errNoSectorsToMigrate - } + err = tx.QueryRow(query, volumeID, marker).Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root)) return } @@ -571,23 +659,6 @@ func locationWithinVolume(tx txn, volumeID int64, maxIndex uint64) (loc storage. return } -func volumeSectorsForDeletion(tx txn, volumeID int64, batchSize int) (locs []volumeSectorRef, err error) { - const query = `SELECT id, sector_id IS NULL AS empty FROM volume_sectors WHERE volume_id=$1 LIMIT $2` - rows, err := tx.Query(query, volumeID, batchSize) - if err != nil { - return nil, fmt.Errorf("failed to query volume sectors: %w", err) - } - defer rows.Close() - for rows.Next() { - var ref volumeSectorRef - if err := rows.Scan(&ref.ID, &ref.Empty); err != nil { - return nil, fmt.Errorf("failed to scan volume sector: %w", err) - } - locs = append(locs, ref) - } - return -} - func scanVolume(s scanner) (volume storage.Volume, err error) { err = s.Scan(&volume.ID, &volume.LocalPath, &volume.ReadOnly, &volume.Available, &volume.TotalSectors, &volume.UsedSectors) return diff --git a/persist/sqlite/volumes_test.go b/persist/sqlite/volumes_test.go index d3c92185..d11ad35a 100644 --- a/persist/sqlite/volumes_test.go +++ b/persist/sqlite/volumes_test.go @@ -1,6 +1,7 @@ package sqlite import ( + "context" "errors" "fmt" "path/filepath" @@ -390,7 +391,7 @@ func TestRemoveVolume(t *testing.T) { } // check that the empty volume can be removed - if err := db.RemoveVolume(int64(volume.ID)); err != nil { + if err := db.RemoveVolume(volume.ID, false); err != nil { t.Fatal(err) } @@ -435,7 +436,7 @@ func TestRemoveVolume(t *testing.T) { } // check that the volume cannot be removed - if err := db.RemoveVolume(volume.ID); !errors.Is(err, storage.ErrVolumeNotEmpty) { + if err := db.RemoveVolume(volume.ID, false); !errors.Is(err, storage.ErrVolumeNotEmpty) { t.Fatalf("expected ErrVolumeNotEmpty, got %v", err) } @@ -445,7 +446,7 @@ func TestRemoveVolume(t *testing.T) { } // check that the volume can be removed - if err := db.RemoveVolume(volume.ID); err != nil { + if err := db.RemoveVolume(volume.ID, false); err != nil { t.Fatal(err) } @@ -508,24 +509,27 @@ func TestMigrateSectors(t *testing.T) { roots = roots[initialSectors/2:] - var i int // migrate the remaining sectors to the first half of the volume - err = db.MigrateSectors(volume.ID, initialSectors/2, func(loc storage.SectorLocation) error { + var i int + migrated, failed, err := db.MigrateSectors(context.Background(), volume.ID, initialSectors/2, func(loc storage.SectorLocation) error { if loc.Volume != volume.ID { t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) } else if loc.Index != uint64(i) { t.Fatalf("expected sector index %v, got %v", i, loc.Index) } else if loc.Root != roots[i] { - t.Fatalf("expected sector root %v, got %v", roots[i], loc.Root) + t.Fatalf("expected sector root index %d %v, got %v", i, roots[i], loc.Root) } i++ - // note: sync to disk return nil }) if err != nil { t.Fatal(err) } else if i != 32 { t.Fatalf("expected 32 sectors, got %v", i) + } else if migrated != 32 { + t.Fatalf("expected 32 migrated sectors, got %v", migrated) + } else if failed != 0 { + t.Fatalf("expected 0 failed sectors, got %v", failed) } // check that the sector metadata has been updated @@ -548,16 +552,15 @@ func TestMigrateSectors(t *testing.T) { } // migrate the remaining sectors from the first volume; should partially complete - var n int - err = db.MigrateSectors(volume.ID, 0, func(loc storage.SectorLocation) error { - if n > initialSectors/4 { - t.Fatalf("expected only %v migrations, got %v", initialSectors/4, n) - } - n++ + migrated, failed, err = db.MigrateSectors(context.Background(), volume.ID, 0, func(loc storage.SectorLocation) error { return nil }) - if !errors.Is(err, storage.ErrNotEnoughStorage) { - t.Fatalf("expected ErrNotEnoughStorage, got %v", err) + if err != nil { + t.Fatal(err) + } else if migrated != initialSectors/4 { + t.Fatalf("expected %v migrated sectors, got %v", initialSectors/4, migrated) + } else if failed != len(roots)-(initialSectors/4) { + t.Fatalf("expected %v failed sectors, got %v", initialSectors-(initialSectors/4), failed) } // check that volume 2 is now full @@ -880,10 +883,15 @@ func BenchmarkVolumeMigrate(b *testing.B) { b.ReportMetric(float64(b.N), "sectors") // migrate all sectors from the first volume to the second - if err := db.MigrateSectors(volume1.ID, 0, func(loc storage.SectorLocation) error { + migrated, failed, err := db.MigrateSectors(context.Background(), volume1.ID, 0, func(loc storage.SectorLocation) error { return nil - }); err != nil { + }) + if err != nil { b.Fatal(err) + } else if migrated != b.N { + b.Fatalf("expected %v migrated sectors, got %v", b.N, migrated) + } else if failed != 0 { + b.Fatalf("expected 0 failed sectors, got %v", failed) } }