Skip to content

Commit

Permalink
feat(restore): index, support resume
Browse files Browse the repository at this point in the history
Indexed workload won't contain sstables that were already
restored during previous restore run.
  • Loading branch information
Michal-Leszczynski committed Sep 25, 2024
1 parent 94cb77b commit 8edd8ab
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package restore

import (
"context"
"slices"

"github.com/pkg/errors"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
Expand Down Expand Up @@ -71,6 +72,12 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat
if err != nil {
return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads")
}
if w.target.Continue {
rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload)
if err != nil {
return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables")
}
}
return aggregateLocationWorkload(rawWorkload), nil
}

Expand Down Expand Up @@ -114,6 +121,44 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo
return rawWorkload, nil
}

func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) {
remoteSSTableDirToRestoredIDs := make(map[string][]string)
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) {
if validateTimeIsSet(pr.RestoreCompletedAt) {
remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...)
}
})
if err != nil {
return nil, errors.Wrap(err, "iterate over prev run progress")
}
if len(remoteSSTableDirToRestoredIDs) == 0 {
return rawWorkload, nil
}

var filtered []RemoteDirWorkload
for _, rw := range rawWorkload {
var filteredSSTables []RemoteSSTable
var size int64
for _, sst := range rw.SSTables {
if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) {
filteredSSTables = append(filteredSSTables, sst)
size += sst.Size
}
}
if len(filteredSSTables) > 0 {
filtered = append(filtered, RemoteDirWorkload{
TableName: rw.TableName,
ManifestInfo: rw.ManifestInfo,
RemoteSSTableDir: rw.RemoteSSTableDir,
Size: size,
SSTables: filteredSSTables,
})
}
}

return filtered, nil
}

func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload {
remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload)
for _, rw := range rawWorkload {
Expand Down

0 comments on commit 8edd8ab

Please sign in to comment.