From 8985cf8b87f0f5af277e1d5965bf2e73f36f3ddd Mon Sep 17 00:00:00 2001 From: yihuang Date: Thu, 18 Aug 2022 15:33:55 +0800 Subject: [PATCH] feat: Make extension snapshotter interface safer to use (#11825) * Make extension snapshotter interface safer to use Closes: #11824 Solution: - Use new methods `SnapshotExtension`/`RestoreExtension` to handle payload stream specifically. - Improve unit tests. * update changelog * Update snapshots/types/util.go * changelog * go linter * Update CHANGELOG.md Co-authored-by: Aleksandr Bezobchuk --- docs/architecture/adr-049-state-sync-hooks.md | 20 ++++- snapshots/helpers_test.go | 77 +++++++++++++++++-- snapshots/manager.go | 40 ++++++++-- snapshots/manager_test.go | 18 +++-- snapshots/types/snapshotter.go | 20 +++-- snapshots/types/util.go | 6 +- 6 files changed, 151 insertions(+), 30 deletions(-) diff --git a/docs/architecture/adr-049-state-sync-hooks.md b/docs/architecture/adr-049-state-sync-hooks.md index e1616c2265ba..5cc2b684c4df 100644 --- a/docs/architecture/adr-049-state-sync-hooks.md +++ b/docs/architecture/adr-049-state-sync-hooks.md @@ -3,10 +3,11 @@ ## Changelog - Jan 19, 2022: Initial Draft +- Apr 29, 2022: Safer extension snapshotter interface ## Status -Draft, Under Implementation +Implemented ## Abstract @@ -107,11 +108,16 @@ func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) e On top of the existing `Snapshotter` interface for the `multistore`, we add `ExtensionSnapshotter` interface for the extension snapshotters. Three more function signatures: `SnapshotFormat()`, `SupportedFormats()` and `SnapshotName()` are added to `ExtensionSnapshotter`. ```go +// ExtensionPayloadReader read extension payloads, +// it returns io.EOF when reached either end of stream or the extension boundaries. +type ExtensionPayloadReader = func() ([]byte, error) + +// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream. +type ExtensionPayloadWriter = func([]byte) error + // ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream. // ExtensionSnapshotter has an unique name and manages it's own internal formats. type ExtensionSnapshotter interface { - Snapshotter - // SnapshotName returns the name of snapshotter, it should be unique in the manager. SnapshotName() string @@ -120,6 +126,14 @@ type ExtensionSnapshotter interface { // SupportedFormats returns a list of formats it can restore from. SupportedFormats() []uint32 + + // SnapshotExtension writes extension payloads into the underlying protobuf stream. + SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error + + // RestoreExtension restores an extension state snapshot, + // the payload reader returns `io.EOF` when reached the extension boundaries. + RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error + } ``` diff --git a/snapshots/helpers_test.go b/snapshots/helpers_test.go index 412b7de6caac..6e2dc6d26bb9 100644 --- a/snapshots/helpers_test.go +++ b/snapshots/helpers_test.go @@ -18,6 +18,7 @@ import ( "github.com/cosmos/cosmos-sdk/snapshots" snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" "github.com/cosmos/cosmos-sdk/testutil" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) @@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte { } // snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks. -func snapshotItems(items [][]byte) [][]byte { +func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte { // copy the same parameters from the code snapshotChunkSize := uint64(10e6) snapshotBufferSize := int(snapshotChunkSize) @@ -74,8 +75,20 @@ func snapshotItems(items [][]byte) [][]byte { zWriter, _ := zlib.NewWriterLevel(bufWriter, 7) protoWriter := protoio.NewDelimitedWriter(zWriter) for _, item := range items { - _ = snapshottypes.WriteExtensionItem(protoWriter, item) + _ = snapshottypes.WriteExtensionPayload(protoWriter, item) } + // write extension metadata + _ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Extension{ + Extension: &snapshottypes.SnapshotExtensionMeta{ + Name: ext.SnapshotName(), + Format: ext.SnapshotFormat(), + }, + }, + }) + _ = ext.SnapshotExtension(0, func(payload []byte) error { + return snapshottypes.WriteExtensionPayload(protoWriter, payload) + }) _ = protoWriter.Close() _ = bufWriter.Flush() _ = chunkWriter.Close() @@ -109,10 +122,11 @@ func (m *mockSnapshotter) Restore( return snapshottypes.SnapshotItem{}, errors.New("already has contents") } + var item snapshottypes.SnapshotItem m.items = [][]byte{} for { - item := &snapshottypes.SnapshotItem{} - err := protoReader.ReadMsg(item) + item.Reset() + err := protoReader.ReadMsg(&item) if err == io.EOF { break } else if err != nil { @@ -120,17 +134,17 @@ func (m *mockSnapshotter) Restore( } payload := item.GetExtensionPayload() if payload == nil { - return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message") + break } m.items = append(m.items, payload.Payload) } - return snapshottypes.SnapshotItem{}, nil + return item, nil } func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { for _, item := range m.items { - if err := snapshottypes.WriteExtensionItem(protoWriter, item); err != nil { + if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil { return err } } @@ -215,3 +229,52 @@ func (m *hungSnapshotter) Restore( ) (snapshottypes.SnapshotItem, error) { panic("not implemented") } + +type extSnapshotter struct { + state []uint64 +} + +func newExtSnapshotter(count int) *extSnapshotter { + state := make([]uint64, 0, count) + for i := 0; i < count; i++ { + state = append(state, uint64(i)) + } + return &extSnapshotter{ + state, + } +} + +func (s *extSnapshotter) SnapshotName() string { + return "mock" +} + +func (s *extSnapshotter) SnapshotFormat() uint32 { + return 1 +} + +func (s *extSnapshotter) SupportedFormats() []uint32 { + return []uint32{1} +} + +func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error { + for _, i := range s.state { + if err := payloadWriter(sdk.Uint64ToBigEndian(uint64(i))); err != nil { + return err + } + } + return nil +} + +func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error { + for { + payload, err := payloadReader() + if err == io.EOF { + break + } else if err != nil { + return err + } + s.state = append(s.state, sdk.BigEndianToUint64(payload)) + } + // finalize restoration + return nil +} diff --git a/snapshots/manager.go b/snapshots/manager.go index 05cbad3f5b30..efc123e9e498 100644 --- a/snapshots/manager.go +++ b/snapshots/manager.go @@ -84,6 +84,9 @@ func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snaps // RegisterExtensions register extension snapshotters to manager func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error { + if m.extensions == nil { + m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions)) + } for _, extension := range extensions { name := extension.SnapshotName() if _, ok := m.extensions[name]; ok { @@ -215,7 +218,10 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) { streamWriter.CloseWithError(err) return } - if err := extension.Snapshot(height, streamWriter); err != nil { + payloadWriter := func(payload []byte) error { + return types.WriteExtensionPayload(streamWriter, payload) + } + if err := extension.SnapshotExtension(height, payloadWriter); err != nil { streamWriter.CloseWithError(err) return } @@ -305,24 +311,40 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { // restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + var nextItem types.SnapshotItem + streamReader, err := NewStreamReader(chChunks) if err != nil { return err } defer streamReader.Close() - next, err := m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) + // payloadReader reads an extension payload for extension snapshotter, it returns `io.EOF` at extension boundaries. + payloadReader := func() ([]byte, error) { + nextItem.Reset() + if err := streamReader.ReadMsg(&nextItem); err != nil { + return nil, err + } + payload := nextItem.GetExtensionPayload() + if payload == nil { + return nil, io.EOF + } + return payload.Payload, nil + } + + nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) if err != nil { return sdkerrors.Wrap(err, "multistore restore") } + for { - if next.Item == nil { + if nextItem.Item == nil { // end of stream break } - metadata := next.GetExtension() + metadata := nextItem.GetExtension() if metadata == nil { - return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", next.Item) + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", nextItem.Item) } extension, ok := m.extensions[metadata.Name] if !ok { @@ -331,10 +353,14 @@ func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.Re if !IsFormatSupported(extension, metadata.Format) { return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name) } - next, err = extension.Restore(snapshot.Height, metadata.Format, streamReader) - if err != nil { + + if err := extension.RestoreExtension(snapshot.Height, metadata.Format, payloadReader); err != nil { return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name) } + + if nextItem.GetExtensionPayload() != nil { + return sdkerrors.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name) + } } return nil } diff --git a/snapshots/manager_test.go b/snapshots/manager_test.go index 8f8e8d3da8d7..ee4c3b471189 100644 --- a/snapshots/manager_test.go +++ b/snapshots/manager_test.go @@ -68,11 +68,15 @@ func TestManager_Take(t *testing.T) { items: items, prunedHeights: make(map[int64]struct{}), } - expectChunks := snapshotItems(items) + extSnapshotter := newExtSnapshotter(10) + + expectChunks := snapshotItems(items, extSnapshotter) manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) // nil manager should return error - _, err := (*snapshots.Manager)(nil).Create(1) + _, err = (*snapshots.Manager)(nil).Create(1) require.Error(t, err) // creating a snapshot at a lower height than the latest should error @@ -91,7 +95,7 @@ func TestManager_Take(t *testing.T) { Height: 5, Format: snapshotter.SnapshotFormat(), Chunks: 1, - Hash: []uint8{0xcf, 0xd8, 0x16, 0xd2, 0xf8, 0x11, 0xe8, 0x90, 0x92, 0xf1, 0xfe, 0x3b, 0xea, 0xd2, 0x94, 0xfc, 0xfa, 0x4f, 0x9e, 0x2a, 0x91, 0xbe, 0xb0, 0x50, 0x83, 0xe9, 0x28, 0x62, 0x48, 0x6a, 0x4b, 0x4}, + Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, Metadata: types.Metadata{ ChunkHashes: checksums(expectChunks), }, @@ -133,7 +137,10 @@ func TestManager_Restore(t *testing.T) { target := &mockSnapshotter{ prunedHeights: make(map[int64]struct{}), } + extSnapshotter := newExtSnapshotter(0) manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger()) + err := manager.RegisterExtensions(extSnapshotter) + require.NoError(t, err) expectItems := [][]byte{ {1, 2, 3}, @@ -141,10 +148,10 @@ func TestManager_Restore(t *testing.T) { {7, 8, 9}, } - chunks := snapshotItems(expectItems) + chunks := snapshotItems(expectItems, newExtSnapshotter(10)) // Restore errors on invalid format - err := manager.Restore(types.Snapshot{ + err = manager.Restore(types.Snapshot{ Height: 3, Format: 0, Hash: []byte{1, 2, 3}, @@ -204,6 +211,7 @@ func TestManager_Restore(t *testing.T) { } assert.Equal(t, expectItems, target.items) + assert.Equal(t, 10, len(extSnapshotter.state)) // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ diff --git a/snapshots/types/snapshotter.go b/snapshots/types/snapshotter.go index 76f800484a49..1641042a6233 100644 --- a/snapshots/types/snapshotter.go +++ b/snapshots/types/snapshotter.go @@ -22,17 +22,20 @@ type Snapshotter interface { // to determine which heights to retain until after the snapshot is complete. SetSnapshotInterval(snapshotInterval uint64) - // Restore restores a state snapshot, taking snapshot chunk readers as input. - // If the ready channel is non-nil, it returns a ready signal (by being closed) once the - // restorer is ready to accept chunks. + // Restore restores a state snapshot, taking the reader of protobuf message stream as input. Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error) } +// ExtensionPayloadReader read extension payloads, +// it returns io.EOF when reached either end of stream or the extension boundaries. +type ExtensionPayloadReader = func() ([]byte, error) + +// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream. +type ExtensionPayloadWriter = func([]byte) error + // ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream. // ExtensionSnapshotter has an unique name and manages it's own internal formats. type ExtensionSnapshotter interface { - Snapshotter - // SnapshotName returns the name of snapshotter, it should be unique in the manager. SnapshotName() string @@ -43,4 +46,11 @@ type ExtensionSnapshotter interface { // SupportedFormats returns a list of formats it can restore from. SupportedFormats() []uint32 + + // SnapshotExtension writes extension payloads into the underlying protobuf stream. + SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error + + // RestoreExtension restores an extension state snapshot, + // the payload reader returns `io.EOF` when reached the extension boundaries. + RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error } diff --git a/snapshots/types/util.go b/snapshots/types/util.go index 348b5057682d..e2d4949bf5c2 100644 --- a/snapshots/types/util.go +++ b/snapshots/types/util.go @@ -4,12 +4,12 @@ import ( protoio "github.com/gogo/protobuf/io" ) -// WriteExtensionItem writes an item payload for current extension snapshotter. -func WriteExtensionItem(protoWriter protoio.Writer, item []byte) error { +// WriteExtensionPayload writes an extension payload for current extension snapshotter. +func WriteExtensionPayload(protoWriter protoio.Writer, payload []byte) error { return protoWriter.WriteMsg(&SnapshotItem{ Item: &SnapshotItem_ExtensionPayload{ ExtensionPayload: &SnapshotExtensionPayload{ - Payload: item, + Payload: payload, }, }, })