diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index d24def7eb..1d2b14d11 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -4,6 +4,7 @@ package restore import ( "context" + "slices" "github.com/pkg/errors" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" @@ -71,6 +72,12 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat if err != nil { return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") } + if w.target.Continue { + rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") + } + } return aggregateLocationWorkload(rawWorkload), nil } @@ -114,6 +121,44 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo return rawWorkload, nil } +func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { + remoteSSTableDirToRestoredIDs := make(map[string][]string) + err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + if validateTimeIsSet(pr.RestoreCompletedAt) { + remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...) + } + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over prev run progress") + } + if len(remoteSSTableDirToRestoredIDs) == 0 { + return rawWorkload, nil + } + + var filtered []RemoteDirWorkload + for _, rw := range rawWorkload { + var filteredSSTables []RemoteSSTable + var size int64 + for _, sst := range rw.SSTables { + if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) { + filteredSSTables = append(filteredSSTables, sst) + size += sst.Size + } + } + if len(filteredSSTables) > 0 { + filtered = append(filtered, RemoteDirWorkload{ + TableName: rw.TableName, + ManifestInfo: rw.ManifestInfo, + RemoteSSTableDir: rw.RemoteSSTableDir, + Size: size, + SSTables: filteredSSTables, + }) + } + } + + return filtered, nil +} + func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) for _, rw := range rawWorkload {