Skip to content

Commit

Permalink
Merge pull request #318 from SiaFoundation/nate/fix-force-remove
Browse files Browse the repository at this point in the history
Fix force remove
  • Loading branch information
n8maninger committed Mar 5, 2024
2 parents 0b73159 + e198745 commit 700905d
Show file tree
Hide file tree
Showing 9 changed files with 469 additions and 278 deletions.
1 change: 1 addition & 0 deletions host/metrics/types.go
Expand Up @@ -84,6 +84,7 @@ type (
Storage struct {
TotalSectors uint64 `json:"totalSectors"`
PhysicalSectors uint64 `json:"physicalSectors"`
LostSectors uint64 `json:"lostSectors"`
ContractSectors uint64 `json:"contractSectors"`
TempSectors uint64 `json:"tempSectors"`

Expand Down
16 changes: 12 additions & 4 deletions host/storage/persist.go
@@ -1,12 +1,17 @@
package storage

import (
"context"
"errors"

"go.sia.tech/core/types"
)

type (
// MigrateFunc is a callback function that is called for each sector that
// needs to be migrated If the function returns an error, the sector should
// be skipped and migration should continue.
MigrateFunc func(location SectorLocation) error

// A VolumeStore stores and retrieves information about storage volumes.
VolumeStore interface {
Expand All @@ -23,7 +28,7 @@ type (
// RemoveVolume removes a storage volume from the volume store. If there
// are used sectors in the volume, ErrVolumeNotEmpty is returned. If
// force is true, the volume is removed even if it is not empty.
RemoveVolume(volumeID int64) error
RemoveVolume(volumeID int64, force bool) error
// GrowVolume grows a storage volume's metadata to maxSectors. If the
// number of sectors in the volume is already greater than maxSectors,
// nil is returned.
Expand All @@ -39,9 +44,9 @@ type (

// MigrateSectors returns a new location for each occupied sector of a
// volume starting at min. The sector data should be copied to the new
// location and synced to disk during migrateFn. Iteration is stopped if
// migrateFn returns an error.
MigrateSectors(volumeID int64, min uint64, migrateFn func(SectorLocation) error) error
// location and synced to disk during migrateFn. If migrateFn returns an
// error, migration will continue, but that sector is not migrated.
MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn MigrateFunc) (migrated, failed int, err error)
// StoreSector calls fn with an empty location in a writable volume. If
// the sector root already exists, fn is called with the existing
// location and exists is true. Unless exists is true, The sector must
Expand Down Expand Up @@ -74,6 +79,9 @@ type (
)

var (
// ErrMigrationFailed is returned when a volume fails to migrate all
// of its sectors.
ErrMigrationFailed = errors.New("migration failed")
// ErrNotEnoughStorage is returned when there is not enough storage space to
// store a sector.
ErrNotEnoughStorage = errors.New("not enough storage")
Expand Down
181 changes: 79 additions & 102 deletions host/storage/storage.go
Expand Up @@ -156,7 +156,7 @@ func (vm *VolumeManager) loadVolumes() error {
// migrateSector migrates a sector to a new location. The sector is read from
// its current location and written to its new location. The volume is
// immediately synced after the sector is written.
func (vm *VolumeManager) migrateSector(loc SectorLocation, log *zap.Logger) error {
func (vm *VolumeManager) migrateSector(loc SectorLocation) error {
// read the sector from the old location
sector, err := vm.Read(loc.Root)
if err != nil {
Expand Down Expand Up @@ -269,14 +269,8 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol

// migrate any sectors outside of the target range.
var migrated int
err := vm.vs.MigrateSectors(id, newMaxSectors, func(newLoc SectorLocation) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil {
migrated, failed, err := vm.vs.MigrateSectors(ctx, id, newMaxSectors, func(newLoc SectorLocation) error {
if err := vm.migrateSector(newLoc); err != nil {
return err
}
migrated++
Expand All @@ -285,9 +279,11 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol
vm.a.Register(a)
return nil
})
log.Info("migrated sectors", zap.Int("count", migrated))
log.Info("migrated sectors", zap.Int("migrated", migrated), zap.Int("failed", failed))
if err != nil {
return err
} else if failed > 0 {
return ErrMigrationFailed
}

for current := oldMaxSectors; current > newMaxSectors; {
Expand Down Expand Up @@ -337,73 +333,6 @@ func (vm *VolumeManager) volumeStats(id int64) VolumeStats {
return v.Stats()
}

func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int64, localPath string, force bool, log *zap.Logger) (int, error) {
ctx, cancel, err := vm.tg.AddContext(ctx)
if err != nil {
return 0, err
}
defer cancel()

// add an alert for the migration
a := alerts.Alert{
ID: frand.Entropy256(),
Message: "Migrating sectors",
Severity: alerts.SeverityInfo,
Data: map[string]interface{}{
"volumeID": id,
"migrated": 0,
"force": force,
},
Timestamp: time.Now(),
}
vm.a.Register(a)
// dismiss the alert when the function returns. It is the caller's
// responsibility to register a completion alert
defer vm.a.Dismiss(a.ID)

// migrate sectors to other volumes
var migrated, failed int
err = vm.vs.MigrateSectors(id, 0, func(newLoc SectorLocation) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil {
log.Error("failed to migrate sector", zap.Stringer("sectorRoot", newLoc.Root), zap.Error(err))
if force {
failed++
a.Data["failed"] = failed
return nil
}
return err
}
migrated++
// update the alert
a.Data["migrated"] = migrated
vm.a.Register(a)
return nil
})
if err != nil {
return migrated, fmt.Errorf("failed to migrate sector data: %w", err)
} else if err := vm.vs.RemoveVolume(id); err != nil {
return migrated, fmt.Errorf("failed to remove volume: %w", err)
}

vm.mu.Lock()
defer vm.mu.Unlock()
// close the volume
vm.volumes[id].Close()
// delete the volume from memory
delete(vm.volumes, id)
// remove the volume file, ignore error if the file does not exist
if err := os.Remove(localPath); err != nil && !errors.Is(err, os.ErrNotExist) {
return migrated, fmt.Errorf("failed to remove volume file: %w", err)
}
return migrated, nil
}

// Close gracefully shutsdown the volume manager.
func (vm *VolumeManager) Close() error {
// wait for all operations to stop
Expand Down Expand Up @@ -592,7 +521,7 @@ func (vm *VolumeManager) SetReadOnly(id int64, readOnly bool) error {

// RemoveVolume removes a volume from the manager.
func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, result chan<- error) error {
log := vm.log.Named("remove").With(zap.Int64("volumeID", id))
log := vm.log.Named("remove").With(zap.Int64("volumeID", id), zap.Bool("force", force))
done, err := vm.tg.Add()
if err != nil {
return err
Expand All @@ -604,7 +533,10 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool,
vm.mu.Unlock()
if !ok {
return fmt.Errorf("volume %v not found", id)
} else if err := vol.SetStatus(VolumeStatusRemoving); err != nil {
}

oldStatus := vol.Status()
if err := vol.SetStatus(VolumeStatusRemoving); err != nil {
return fmt.Errorf("failed to set volume status: %w", err)
}

Expand All @@ -618,38 +550,83 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool,
return fmt.Errorf("failed to set volume %v to read-only: %w", id, err)
}

alert := alerts.Alert{
ID: frand.Entropy256(),
Message: "Removing volume",
Severity: alerts.SeverityInfo,
Data: map[string]interface{}{
"volumeID": id,
"sectors": stat.TotalSectors,
"used": stat.UsedSectors,
"migrated": 0,
"failed": 0,
},
Timestamp: time.Now(),
}

go func() {
start := time.Now()
defer vol.SetStatus(VolumeStatusReady)
defer vol.SetStatus(oldStatus)

migrated, err := vm.migrateForRemoval(ctx, id, stat.LocalPath, force, log)
var migrated, failed int

updateRemovalAlert := func(message string, severity alerts.Severity, err error) {
alert.Message = message
alert.Severity = severity
alert.Data["migrated"] = migrated
alert.Data["failed"] = failed
if err != nil {
alert.Data["error"] = err.Error()
}
vm.a.Register(alert)
}

migrated, failed, err := vm.vs.MigrateSectors(ctx, id, 0, func(newLoc SectorLocation) error {
err := vm.migrateSector(newLoc)
if err != nil {
failed++
} else {
migrated++
}
updateRemovalAlert("Removing volume", alerts.SeverityInfo, nil) // error is ignored during migration
return err
})
if err != nil {
log.Error("failed to migrate sectors", zap.Error(err))
// update the alert
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
} else if !force && failed > 0 {
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, ErrMigrationFailed)
result <- ErrMigrationFailed
return
}

alert := alerts.Alert{
ID: frand.Entropy256(),
Data: map[string]interface{}{
"volumeID": id,
"elapsed": time.Since(start),
"migratedSectors": migrated,
},
Timestamp: time.Now(),
}
if err != nil {
alert.Message = "Volume removal failed"
alert.Severity = alerts.SeverityError
alert.Data["error"] = err.Error()
} else {
alert.Message = "Volume removed"
alert.Severity = alerts.SeverityInfo
// close the volume and remove it from memory
if err := vol.Close(); err != nil {
log.Error("failed to close volume", zap.Error(err))
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
} else if err := os.Remove(stat.LocalPath); err != nil && !errors.Is(err, os.ErrNotExist) {
log.Error("failed to remove volume file", zap.Error(err))
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
}
vm.a.Register(alert)
delete(vm.volumes, id)

select {
case result <- err:
default:
// remove the volume from the volume store
if err := vm.vs.RemoveVolume(id, force); err != nil {
log.Error("failed to remove volume", zap.Error(err))
// update the alert
updateRemovalAlert("Failed to remove volume", alerts.SeverityError, err)
result <- err
return
}

updateRemovalAlert("Volume removed", alerts.SeverityInfo, nil)
result <- nil
}()

return nil
Expand Down

0 comments on commit 700905d

Please sign in to comment.