From f06648ecd36a4f1d677c4ca831bce8870bbe218e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 24 Sep 2024 16:18:55 +0200 Subject: [PATCH] feat(restore): add primitive batching using indexed workload This is a temporary implementation used for integrating workload indexing with the rest of the code. It will be improved as a part of the #3979. --- pkg/service/restore/batch.go | 173 +++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 pkg/service/restore/batch.go diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go new file mode 100644 index 000000000..21de81231 --- /dev/null +++ b/pkg/service/restore/batch.go @@ -0,0 +1,173 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "slices" + "sync" + + "github.com/pkg/errors" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" +) + +type batchDispatcher struct { + mu sync.Mutex + workload []LocationWorkload + batchSize int + locationHosts map[Location][]string +} + +func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher { + return &batchDispatcher{ + mu: sync.Mutex{}, + workload: workload, + batchSize: batchSize, + locationHosts: locationHosts, + } +} + +type batch struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +func (b batch) Files() []string { + var files []string + for _, sst := range b.SSTables { + files = append(files, sst.Files...) + } + return files +} + +func (b batch) VersionedFiles() []string { + var files []string + for _, sst := range b.SSTables { + if sst.Versioned { + files = append(files, sst.Files...) + } + } + return files +} + +func (b batch) VersionedSize() int64 { + var size int64 + for _, sst := range b.SSTables { + if sst.Versioned { + size += sst.Size + } + } + return size +} + +func (b batch) IDs() []string { + var ids []string + for _, sst := range b.SSTables { + ids = append(ids, sst.ID) + } + return ids +} + +// ValidateAllDispatched returns error if not all sstables were dispatched. +func (b *batchDispatcher) ValidateAllDispatched() error { + for _, lw := range b.workload { + if lw.Size != 0 { + for _, tw := range lw.Tables { + if tw.Size != 0 { + for _, dw := range tw.RemoteDirs { + if dw.Size != 0 || len(dw.SSTables) != 0 { + return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)", + dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)", + tw.Location, tw.Keyspace, tw.Table, tw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)", + lw.Location, lw.Size) + } + } + return nil +} + +// DispatchBatch batch to be restored or false when there is no more work to do. +func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) { + b.mu.Lock() + defer b.mu.Unlock() + + l := b.chooseLocation(host) + if l == nil { + return batch{}, false + } + t := b.chooseTable(l) + if t == nil { + return batch{}, false + } + dir := b.chooseRemoteDir(t) + if dir == nil { + return batch{}, false + } + out := b.createBatch(l, t, dir) + return out, true +} + +// Returns location for which batch should be created. +func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload { + for i := range b.workload { + if b.workload[i].Size == 0 { + continue + } + if slices.Contains(b.locationHosts[b.workload[i].Location], host) { + return &b.workload[i] + } + } + return nil +} + +// Returns table for which batch should be created. +func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { + for i := range location.Tables { + if location.Tables[i].Size == 0 { + continue + } + return &location.Tables[i] + } + return nil +} + +// Return remote dir for which batch should be created. +func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { + for i := range table.RemoteDirs { + if table.RemoteDirs[i].Size == 0 { + continue + } + return &table.RemoteDirs[i] + } + return nil +} + +// Returns batch and updates RemoteDirWorkload and its parents. +func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch { + i := min(b.batchSize, len(dir.SSTables)) + sstables := dir.SSTables[:i] + dir.SSTables = dir.SSTables[i:] + + var size int64 + for _, sst := range sstables { + size += sst.Size + } + dir.Size -= size + t.Size -= size + l.Size -= size + return batch{ + TableName: dir.TableName, + ManifestInfo: dir.ManifestInfo, + RemoteSSTableDir: dir.RemoteSSTableDir, + Size: size, + SSTables: sstables, + } +}