Skip to content

Commit

Permalink
storage: check for ErrMigrationFailed in TestRemoveVolume
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 2, 2024
1 parent 1375927 commit e198745
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
8 changes: 7 additions & 1 deletion host/storage/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
)

type (
// MigrateFunc is a callback function that is called for each sector that
// needs to be migrated If the function returns an error, the sector should
// be skipped and migration should continue.
MigrateFunc func(location SectorLocation) error

// A VolumeStore stores and retrieves information about storage volumes.
VolumeStore interface {
Expand Down Expand Up @@ -42,7 +46,7 @@ type (
// volume starting at min. The sector data should be copied to the new
// location and synced to disk during migrateFn. If migrateFn returns an
// error, migration will continue, but that sector is not migrated.
MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn func(SectorLocation) error) (migrated, failed int, err error)
MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn MigrateFunc) (migrated, failed int, err error)
// StoreSector calls fn with an empty location in a writable volume. If
// the sector root already exists, fn is called with the existing
// location and exists is true. Unless exists is true, The sector must
Expand Down Expand Up @@ -75,6 +79,8 @@ type (
)

var (
// ErrMigrationFailed is returned when a volume fails to migrate all
// of its sectors.
ErrMigrationFailed = errors.New("migration failed")
// ErrNotEnoughStorage is returned when there is not enough storage space to
// store a sector.
Expand Down
21 changes: 15 additions & 6 deletions host/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@ func TestRemoveVolume(t *testing.T) {
}
defer release()

// attempt to remove the volume. Should return ErrNotEnoughStorage since
// attempt to remove the volume. Should return ErrMigrationFailed since
// there is only one volume.
if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil {
// blocking error should be nil
t.Fatal(err)
} else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) {
// async error should be ErrNotEnoughStorage
} else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) {
// async error should be ErrMigrationFailed
t.Fatalf("expected ErrNotEnoughStorage, got %v", err)
}

Expand Down Expand Up @@ -479,6 +479,15 @@ func TestRemoveCorrupt(t *testing.T) {
t.Fatal(err)
}

// check that the volume metrics doubled
if m, err := db.Metrics(time.Now()); err != nil {
t.Fatal(err)
} else if m.Storage.TotalSectors != expectedSectors*2 {
t.Fatalf("expected %v total sectors, got %v", expectedSectors*2, m.Storage.TotalSectors)
} else if m.Storage.PhysicalSectors != 10 {
t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors)
}

// force remove the volume
if err := vm.RemoveVolume(context.Background(), volume.ID, true, result); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -678,7 +687,7 @@ func TestRemoveMissing(t *testing.T) {
// check that the volume metrics did not change
if m, err := db.Metrics(time.Now()); err != nil {
t.Fatal(err)
} else if m.Storage.TotalSectors != expectedSectors {
} else if m.Storage.TotalSectors != 0 {
t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors)
} else if m.Storage.PhysicalSectors != 0 {
t.Fatalf("expected 0 used sectors, got %v", m.Storage.PhysicalSectors)
Expand Down Expand Up @@ -1252,8 +1261,8 @@ func TestVolumeShrink(t *testing.T) {
remainingSectors := uint64(sectors - toRemove)
if err := vm.ResizeVolume(context.Background(), volume.ID, remainingSectors, result); err != nil {
t.Fatal(err)
} else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) {
t.Fatalf("expected not enough storage error, got %v", err)
} else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) {
t.Fatalf("expected ErrMigrationFailed, got %v", err)
}

// remove some sectors from the beginning of the volume
Expand Down
7 changes: 4 additions & 3 deletions persist/sqlite/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.uber.org/zap"
)

func (s *Store) migrateSector(volumeID int64, minIndex uint64, marker int64, migrateFn func(location storage.SectorLocation) error, log *zap.Logger) (int64, bool, error) {
func (s *Store) migrateSector(volumeID int64, minIndex uint64, marker int64, migrateFn storage.MigrateFunc, log *zap.Logger) (int64, bool, error) {
start := time.Now()

var locationLocks []int64
Expand Down Expand Up @@ -74,6 +74,7 @@ func (s *Store) migrateSector(volumeID int64, minIndex uint64, marker int64, mig
// call the migrateFn with the new location, data should be copied to the
// new location and synced to disk
if err := migrateFn(newLoc); err != nil {
log.Error("failed to migrate sector data", zap.Error(err))
return marker, false, nil
}

Expand Down Expand Up @@ -342,10 +343,10 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati
// The sector data should be copied to the new location and synced
// to disk immediately. If migrateFn returns an error, that sector will be
// considered failed and the migration will continue. If the context is
// canceled, the migration will stop and the error will be returned along. The
// canceled, the migration will stop and the error will be returned. The
// number of sectors migrated and failed will always be returned, even if an
// error occurs.
func (s *Store) MigrateSectors(ctx context.Context, volumeID int64, startIndex uint64, migrateFn func(location storage.SectorLocation) error) (migrated, failed int, err error) {
func (s *Store) MigrateSectors(ctx context.Context, volumeID int64, startIndex uint64, migrateFn storage.MigrateFunc) (migrated, failed int, err error) {
log := s.log.Named("migrate").With(zap.Int64("oldVolume", volumeID), zap.Uint64("startIndex", startIndex))
// the migration function is called in a loop until all sectors are migrated
// marker is used to skip sectors that tried to migrate but failed.
Expand Down

0 comments on commit e198745

Please sign in to comment.