diff --git a/CHANGELOG.md b/CHANGELOG.md index 27d224d77100..0e1f20107ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,10 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +## Features + +* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally. + ### Improvements * (deps) [#15973](https://github.com/cosmos/cosmos-sdk/pull/15973) Bump CometBFT to [v0.34.28](https://github.com/cometbft/cometbft/blob/v0.34.28/CHANGELOG.md#v03428). diff --git a/snapshots/manager.go b/snapshots/manager.go index 7bb5cd8abf5f..b92a2b5b82d0 100644 --- a/snapshots/manager.go +++ b/snapshots/manager.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math" + "os" "sort" "sync" @@ -38,12 +39,12 @@ type Manager struct { multistore types.Snapshotter logger log.Logger - mtx sync.Mutex - operation operation - chRestore chan<- io.ReadCloser - chRestoreDone <-chan restoreDone - restoreChunkHashes [][]byte - restoreChunkIndex uint32 + mtx sync.Mutex + operation operation + chRestore chan<- uint32 + chRestoreDone <-chan restoreDone + restoreSnapshot *types.Snapshot + restoreChunkIndex uint32 } // operation represents a Manager operation. Only one operation can be in progress at a time. @@ -61,7 +62,8 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 + chunkBufferSize = 4 + chunkIDBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -131,7 +133,7 @@ func (m *Manager) endLocked() { m.chRestore = nil } m.chRestoreDone = nil - m.restoreChunkHashes = nil + m.restoreSnapshot = nil m.restoreChunkIndex = 0 } @@ -284,11 +286,18 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } // Start an asynchronous snapshot restoration, passing chunks and completion status via channels. - chChunks := make(chan io.ReadCloser, chunkBufferSize) + chChunkIDs := make(chan uint32, chunkIDBufferSize) chDone := make(chan restoreDone, 1) + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o750); err != nil { + return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs) + go func() { - err := m.restoreSnapshot(snapshot, chChunks) + err := m.doRestoreSnapshot(snapshot, chChunks) chDone <- restoreDone{ complete: err == nil, err: err, @@ -296,15 +305,38 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { close(chDone) }() - m.chRestore = chChunks + m.chRestore = chChunkIDs m.chRestoreDone = chDone - m.restoreChunkHashes = snapshot.Metadata.ChunkHashes + m.restoreSnapshot = &snapshot m.restoreChunkIndex = 0 return nil } -// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. -func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { +func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser { + chunks := make(chan io.ReadCloser, chunkBufferSize) + go func() { + defer close(chunks) + + for chunkID := range chunkIDs { + chunk, err := m.store.loadChunkFile(height, format, chunkID) + if err != nil { + m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err) + break + } + chunks <- chunk + } + }() + + return chunks +} + +// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. +func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o750); err != nil { + return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir) + } + streamReader, err := NewStreamReader(chChunks) if err != nil { return err @@ -348,7 +380,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress") } - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk") } @@ -365,19 +397,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { // Verify the chunk hash. hash := sha256.Sum256(chunk) - expected := m.restoreChunkHashes[m.restoreChunkIndex] + expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex] if !bytes.Equal(hash[:], expected) { return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch, "expected %x, got %x", hash, expected) } + if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil { + return false, sdkerrors.Wrapf(err, "save chunk content %d", m.restoreChunkIndex) + } + // Pass the chunk to the restore, and wait for completion if it was the final one. - m.chRestore <- io.NopCloser(bytes.NewReader(chunk)) + m.chRestore <- m.restoreChunkIndex m.restoreChunkIndex++ - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { close(m.chRestore) m.chRestore = nil + + // the chunks are all written into files, we can save the snapshot to the db, + // even if the restoration may not completed yet. + if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil { + return false, sdkerrors.Wrap(err, "save restoring snapshot") + } + done := <-m.chRestoreDone m.endLocked() if done.err != nil { @@ -386,6 +429,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { if !done.complete { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely") } + return true, nil } return false, nil @@ -411,7 +455,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error { } defer m.endLocked() - return m.restoreSnapshot(*snapshot, ch) + return m.doRestoreSnapshot(*snapshot, ch) } // sortedExtensionNames sort extension names for deterministic iteration. diff --git a/snapshots/manager_test.go b/snapshots/manager_test.go index b71043022827..d379f09cb9f1 100644 --- a/snapshots/manager_test.go +++ b/snapshots/manager_test.go @@ -205,6 +205,13 @@ func TestManager_Restore(t *testing.T) { assert.Equal(t, expectItems, target.items) + // The snapshot is saved in local snapshot store + snapshots, err := store.List() + require.NoError(t, err) + snapshot := snapshots[0] + require.Equal(t, uint64(3), snapshot.Height) + require.Equal(t, types.CurrentFormat, snapshot.Format) + // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ Height: 3, diff --git a/snapshots/store.go b/snapshots/store.go index 6a663e701641..254686e631b4 100644 --- a/snapshots/store.go +++ b/snapshots/store.go @@ -308,6 +308,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types return nil } +// saveChunkContent save the chunk to disk +func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error { + path := s.PathChunk(snapshot.Height, snapshot.Format, index) + return os.WriteFile(path, chunk, 0o600) +} + // saveSnapshot saves snapshot metadata to the database. func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { value, err := proto.Marshal(snapshot)