Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix force remove #318

Merged
merged 4 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions host/metrics/types.go
Expand Up @@ -84,6 +84,7 @@ type (
Storage struct {
TotalSectors uint64 `json:"totalSectors"`
PhysicalSectors uint64 `json:"physicalSectors"`
LostSectors uint64 `json:"lostSectors"`
ContractSectors uint64 `json:"contractSectors"`
TempSectors uint64 `json:"tempSectors"`

Expand Down
16 changes: 12 additions & 4 deletions host/storage/persist.go
@@ -1,12 +1,17 @@
package storage

import (
"context"
"errors"

"go.sia.tech/core/types"
)

type (
// MigrateFunc is a callback function that is called for each sector that
// needs to be migrated If the function returns an error, the sector should
// be skipped and migration should continue.
MigrateFunc func(location SectorLocation) error

// A VolumeStore stores and retrieves information about storage volumes.
VolumeStore interface {
Expand All @@ -23,7 +28,7 @@ type (
// RemoveVolume removes a storage volume from the volume store. If there
// are used sectors in the volume, ErrVolumeNotEmpty is returned. If
// force is true, the volume is removed even if it is not empty.
RemoveVolume(volumeID int64) error
RemoveVolume(volumeID int64, force bool) error
// GrowVolume grows a storage volume's metadata to maxSectors. If the
// number of sectors in the volume is already greater than maxSectors,
// nil is returned.
Expand All @@ -39,9 +44,9 @@ type (

// MigrateSectors returns a new location for each occupied sector of a
// volume starting at min. The sector data should be copied to the new
// location and synced to disk during migrateFn. Iteration is stopped if
// migrateFn returns an error.
MigrateSectors(volumeID int64, min uint64, migrateFn func(SectorLocation) error) error
// location and synced to disk during migrateFn. If migrateFn returns an
// error, migration will continue, but that sector is not migrated.
MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn MigrateFunc) (migrated, failed int, err error)
// StoreSector calls fn with an empty location in a writable volume. If
// the sector root already exists, fn is called with the existing
// location and exists is true. Unless exists is true, The sector must
Expand Down Expand Up @@ -74,6 +79,9 @@ type (
)

var (
// ErrMigrationFailed is returned when a volume fails to migrate all
// of its sectors.
ErrMigrationFailed = errors.New("migration failed")
// ErrNotEnoughStorage is returned when there is not enough storage space to
// store a sector.
ErrNotEnoughStorage = errors.New("not enough storage")
Expand Down
181 changes: 79 additions & 102 deletions host/storage/storage.go
Expand Up @@ -156,7 +156,7 @@ func (vm *VolumeManager) loadVolumes() error {
// migrateSector migrates a sector to a new location. The sector is read from
// its current location and written to its new location. The volume is
// immediately synced after the sector is written.
func (vm *VolumeManager) migrateSector(loc SectorLocation, log *zap.Logger) error {
func (vm *VolumeManager) migrateSector(loc SectorLocation) error {
// read the sector from the old location
sector, err := vm.Read(loc.Root)
if err != nil {
Expand Down Expand Up @@ -269,14 +269,8 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol

// migrate any sectors outside of the target range.
var migrated int
err := vm.vs.MigrateSectors(id, newMaxSectors, func(newLoc SectorLocation) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil {
migrated, failed, err := vm.vs.MigrateSectors(ctx, id, newMaxSectors, func(newLoc SectorLocation) error {
if err := vm.migrateSector(newLoc); err != nil {
return err
}
migrated++
Expand All @@ -285,9 +279,11 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol
vm.a.Register(a)
return nil
})
log.Info("migrated sectors", zap.Int("count", migrated))
log.Info("migrated sectors", zap.Int("migrated", migrated), zap.Int("failed", failed))
if err != nil {
return err
} else if failed > 0 {
return ErrMigrationFailed
}

for current := oldMaxSectors; current > newMaxSectors; {
Expand Down Expand Up @@ -337,73 +333,6 @@ func (vm *VolumeManager) volumeStats(id int64) VolumeStats {
return v.Stats()
}

func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int64, localPath string, force bool, log *zap.Logger) (int, error) {
ctx, cancel, err := vm.tg.AddContext(ctx)
if err != nil {
return 0, err
}
defer cancel()

// add an alert for the migration
a := alerts.Alert{
ID: frand.Entropy256(),
Message: "Migrating sectors",
Severity: alerts.SeverityInfo,
Data: map[string]interface{}{
"volumeID": id,
"migrated": 0,
"force": force,
},
Timestamp: time.Now(),
}
vm.a.Register(a)
// dismiss the alert when the function returns. It is the caller's
// responsibility to register a completion alert
defer vm.a.Dismiss(a.ID)

// migrate sectors to other volumes
var migrated, failed int
err = vm.vs.MigrateSectors(id, 0, func(newLoc SectorLocation) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil {
log.Error("failed to migrate sector", zap.Stringer("sectorRoot", newLoc.Root), zap.Error(err))
if force {
failed++
a.Data["failed"] = failed
return nil
}
return err
}
migrated++
// update the alert
a.Data["migrated"] = migrated
vm.a.Register(a)
return nil
})
if err != nil {
return migrated, fmt.Errorf("failed to migrate sector data: %w", err)
} else if err := vm.vs.RemoveVolume(id); err != nil {
return migrated, fmt.Errorf("failed to remove volume: %w", err)
}

vm.mu.Lock()
defer vm.mu.Unlock()
// close the volume
vm.volumes[id].Close()
// delete the volume from memory
delete(vm.volumes, id)
// remove the volume file, ignore error if the file does not exist
if err := os.Remove(localPath); err != nil && !errors.Is(err, os.ErrNotExist) {
return migrated, fmt.Errorf("failed to remove volume file: %w", err)
}
return migrated, nil
}

// Close gracefully shutsdown the volume manager.
func (vm *VolumeManager) Close() error {
// wait for all operations to stop
Expand Down Expand Up @@ -592,7 +521,7 @@ func (vm *VolumeManager) SetReadOnly(id int64, readOnly bool) error {

// RemoveVolume removes a volume from the manager.
func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, result chan<- error) error {
log := vm.log.Named("remove").With(zap.Int64("volumeID", id))
log := vm.log.Named("remove").With(zap.Int64("volumeID", id), zap.Bool("force", force))
done, err := vm.tg.Add()
if err != nil {
return err
Expand All @@ -604,7 +533,10 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool,
vm.mu.Unlock()
if !ok {
return fmt.Errorf("volume %v not found", id)
} else if err := vol.SetStatus(VolumeStatusRemoving); err != nil {
}

oldStatus := vol.Status()
if err := vol.SetStatus(VolumeStatusRemoving); err != nil {
return fmt.Errorf("failed to set volume status: %w", err)
}

Expand All @@ -618,38 +550,83 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool,
return fmt.Errorf("failed to set volume %v to read-only: %w", id, err)
}

alert := alerts.Alert{
ID: frand.Entropy256(),
Message: "Removing volume",
Severity: alerts.SeverityInfo,
Data: map[string]interface{}{
"volumeID": id,
"sectors": stat.TotalSectors,
"used": stat.UsedSectors,
"migrated": 0,
"failed": 0,
},
Timestamp: time.Now(),
}

go func() {
start := time.Now()
defer vol.SetStatus(VolumeStatusReady)
defer vol.SetStatus(oldStatus)

migrated, err := vm.migrateForRemoval(ctx, id, stat.LocalPath, force, log)
var migrated, failed int

updateRemovalAlert := func(message string, severity alerts.Severity, err error) {
alert.Message = message
alert.Severity = severity
alert.Data["migrated"] = migrated
alert.Data["failed"] = failed
if err != nil {
alert.Data["error"] = err.Error()
}
vm.a.Register(alert)
}

migrated, failed, err := vm.vs.MigrateSectors(ctx, id, 0, func(newLoc SectorLocation) error {
err := vm.migrateSector(newLoc)
if err != nil {
failed++
} else {
migrated++
}
updateRemovalAlert("Removing volume", alerts.SeverityInfo, nil) // error is ignored during migration
return err
})
if err != nil {
log.Error("failed to migrate sectors", zap.Error(err))
// update the alert
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
} else if !force && failed > 0 {
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, ErrMigrationFailed)
result <- ErrMigrationFailed
return
}

alert := alerts.Alert{
ID: frand.Entropy256(),
Data: map[string]interface{}{
"volumeID": id,
"elapsed": time.Since(start),
"migratedSectors": migrated,
},
Timestamp: time.Now(),
}
if err != nil {
alert.Message = "Volume removal failed"
alert.Severity = alerts.SeverityError
alert.Data["error"] = err.Error()
} else {
alert.Message = "Volume removed"
alert.Severity = alerts.SeverityInfo
// close the volume and remove it from memory
if err := vol.Close(); err != nil {
log.Error("failed to close volume", zap.Error(err))
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
} else if err := os.Remove(stat.LocalPath); err != nil && !errors.Is(err, os.ErrNotExist) {
log.Error("failed to remove volume file", zap.Error(err))
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
}
vm.a.Register(alert)
delete(vm.volumes, id)

select {
case result <- err:
default:
// remove the volume from the volume store
if err := vm.vs.RemoveVolume(id, force); err != nil {
log.Error("failed to remove volume", zap.Error(err))
// update the alert
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
}

updateRemovalAlert("Volume removed", alerts.SeverityInfo, nil)
result <- nil
}()

return nil
Expand Down