Skip to content

Commit

Permalink
feat: Make extension snapshotter interface safer to use (#11825)
Browse files Browse the repository at this point in the history
* Make extension snapshotter interface safer to use

Closes: #11824
Solution:
- Use new methods `SnapshotExtension`/`RestoreExtension` to handle payload stream specifically.
- Improve unit tests.

* update changelog

* Update snapshots/types/util.go

* changelog

* go linter

* Update CHANGELOG.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
  • Loading branch information
yihuang and alexanderbez committed Aug 18, 2022
1 parent 0ed7360 commit e397434
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
A SendEnabled query has been added to both GRPC and CLI.
* (appModule) Remove `Route`, `QuerierRoute` and `LegacyQuerierHandler` from AppModule Interface.
* (x/modules) Remove all LegacyQueries and related code from modules
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.

### CLI Breaking Changes

Expand Down
20 changes: 17 additions & 3 deletions docs/architecture/adr-049-state-sync-hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
## Changelog

- Jan 19, 2022: Initial Draft
- Apr 29, 2022: Safer extension snapshotter interface

## Status

Draft, Under Implementation
Implemented

## Abstract

Expand Down Expand Up @@ -107,11 +108,16 @@ func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) e
On top of the existing `Snapshotter` interface for the `multistore`, we add `ExtensionSnapshotter` interface for the extension snapshotters. Three more function signatures: `SnapshotFormat()`, `SupportedFormats()` and `SnapshotName()` are added to `ExtensionSnapshotter`.

```go
// ExtensionPayloadReader read extension payloads,
// it returns io.EOF when reached either end of stream or the extension boundaries.
type ExtensionPayloadReader = func() ([]byte, error)

// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream.
type ExtensionPayloadWriter = func([]byte) error

// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream.
// ExtensionSnapshotter has an unique name and manages it's own internal formats.
type ExtensionSnapshotter interface {
Snapshotter

// SnapshotName returns the name of snapshotter, it should be unique in the manager.
SnapshotName() string

Expand All @@ -120,6 +126,14 @@ type ExtensionSnapshotter interface {

// SupportedFormats returns a list of formats it can restore from.
SupportedFormats() []uint32

// SnapshotExtension writes extension payloads into the underlying protobuf stream.
SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error

// RestoreExtension restores an extension state snapshot,
// the payload reader returns `io.EOF` when reached the extension boundaries.
RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error

}
```

Expand Down
77 changes: 70 additions & 7 deletions snapshots/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/testutil"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

Expand Down Expand Up @@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
}

// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)
Expand All @@ -74,8 +75,20 @@ func snapshotItems(items [][]byte) [][]byte {
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
_ = snapshottypes.WriteExtensionItem(protoWriter, item)
_ = snapshottypes.WriteExtensionPayload(protoWriter, item)
}
// write extension metadata
_ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Extension{
Extension: &snapshottypes.SnapshotExtensionMeta{
Name: ext.SnapshotName(),
Format: ext.SnapshotFormat(),
},
},
})
_ = ext.SnapshotExtension(0, func(payload []byte) error {
return snapshottypes.WriteExtensionPayload(protoWriter, payload)
})
_ = protoWriter.Close()
_ = zWriter.Close()
_ = bufWriter.Flush()
Expand Down Expand Up @@ -110,28 +123,29 @@ func (m *mockSnapshotter) Restore(
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}

var item snapshottypes.SnapshotItem
m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
item.Reset()
err := protoReader.ReadMsg(&item)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
break
}
m.items = append(m.items, payload.Payload)
}

return snapshottypes.SnapshotItem{}, nil
return item, nil
}

func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := snapshottypes.WriteExtensionItem(protoWriter, item); err != nil {
if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil {
return err
}
}
Expand Down Expand Up @@ -216,3 +230,52 @@ func (m *hungSnapshotter) Restore(
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

type extSnapshotter struct {
state []uint64
}

func newExtSnapshotter(count int) *extSnapshotter {
state := make([]uint64, 0, count)
for i := 0; i < count; i++ {
state = append(state, uint64(i))
}
return &extSnapshotter{
state,
}
}

func (s *extSnapshotter) SnapshotName() string {
return "mock"
}

func (s *extSnapshotter) SnapshotFormat() uint32 {
return 1
}

func (s *extSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}

func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error {
for _, i := range s.state {
if err := payloadWriter(sdk.Uint64ToBigEndian(uint64(i))); err != nil {
return err
}
}
return nil
}

func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error {
for {
payload, err := payloadReader()
if err == io.EOF {
break
} else if err != nil {
return err
}
s.state = append(s.state, sdk.BigEndianToUint64(payload))
}
// finalize restoration
return nil
}
40 changes: 33 additions & 7 deletions snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snaps

// RegisterExtensions register extension snapshotters to manager
func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error {
if m.extensions == nil {
m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions))
}
for _, extension := range extensions {
name := extension.SnapshotName()
if _, ok := m.extensions[name]; ok {
Expand Down Expand Up @@ -215,7 +218,10 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
streamWriter.CloseWithError(err)
return
}
if err := extension.Snapshot(height, streamWriter); err != nil {
payloadWriter := func(payload []byte) error {
return types.WriteExtensionPayload(streamWriter, payload)
}
if err := extension.SnapshotExtension(height, payloadWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
Expand Down Expand Up @@ -305,24 +311,40 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {

// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
var nextItem types.SnapshotItem

streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()

next, err := m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
// payloadReader reads an extension payload for extension snapshotter, it returns `io.EOF` at extension boundaries.
payloadReader := func() ([]byte, error) {
nextItem.Reset()
if err := streamReader.ReadMsg(&nextItem); err != nil {
return nil, err
}
payload := nextItem.GetExtensionPayload()
if payload == nil {
return nil, io.EOF
}
return payload.Payload, nil
}

nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
if err != nil {
return sdkerrors.Wrap(err, "multistore restore")
}

for {
if next.Item == nil {
if nextItem.Item == nil {
// end of stream
break
}
metadata := next.GetExtension()
metadata := nextItem.GetExtension()
if metadata == nil {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", next.Item)
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", nextItem.Item)
}
extension, ok := m.extensions[metadata.Name]
if !ok {
Expand All @@ -331,10 +353,14 @@ func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.Re
if !IsFormatSupported(extension, metadata.Format) {
return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name)
}
next, err = extension.Restore(snapshot.Height, metadata.Format, streamReader)
if err != nil {

if err := extension.RestoreExtension(snapshot.Height, metadata.Format, payloadReader); err != nil {
return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name)
}

if nextItem.GetExtensionPayload() != nil {
return sdkerrors.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name)
}
}
return nil
}
Expand Down
18 changes: 13 additions & 5 deletions snapshots/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ func TestManager_Take(t *testing.T) {
items: items,
prunedHeights: make(map[int64]struct{}),
}
expectChunks := snapshotItems(items)
extSnapshotter := newExtSnapshotter(10)

expectChunks := snapshotItems(items, extSnapshotter)
manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger())
err := manager.RegisterExtensions(extSnapshotter)
require.NoError(t, err)

// nil manager should return error
_, err := (*snapshots.Manager)(nil).Create(1)
_, err = (*snapshots.Manager)(nil).Create(1)
require.Error(t, err)

// creating a snapshot at a lower height than the latest should error
Expand All @@ -91,7 +95,7 @@ func TestManager_Take(t *testing.T) {
Height: 5,
Format: snapshotter.SnapshotFormat(),
Chunks: 1,
Hash: []uint8{0xcd, 0x17, 0x9e, 0x7f, 0x28, 0xb6, 0x82, 0x90, 0xc7, 0x25, 0xf3, 0x42, 0xac, 0x65, 0x73, 0x50, 0xaa, 0xa0, 0x10, 0x5c, 0x40, 0x8c, 0xd5, 0x1, 0xed, 0x82, 0xb5, 0xca, 0x8b, 0xe0, 0x83, 0xa2},
Hash: []uint8{0x89, 0xfa, 0x18, 0xbc, 0x5a, 0xe3, 0xdc, 0x36, 0xa6, 0x95, 0x5, 0x17, 0xf9, 0x2, 0x1a, 0x55, 0x36, 0x16, 0x5d, 0x4b, 0x8b, 0x2b, 0x3d, 0xfd, 0xe, 0x2f, 0xb6, 0x40, 0x6b, 0xc3, 0xbc, 0x23},
Metadata: types.Metadata{
ChunkHashes: checksums(expectChunks),
},
Expand Down Expand Up @@ -133,18 +137,21 @@ func TestManager_Restore(t *testing.T) {
target := &mockSnapshotter{
prunedHeights: make(map[int64]struct{}),
}
extSnapshotter := newExtSnapshotter(0)
manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger())
err := manager.RegisterExtensions(extSnapshotter)
require.NoError(t, err)

expectItems := [][]byte{
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
}

chunks := snapshotItems(expectItems)
chunks := snapshotItems(expectItems, newExtSnapshotter(10))

// Restore errors on invalid format
err := manager.Restore(types.Snapshot{
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 0,
Hash: []byte{1, 2, 3},
Expand Down Expand Up @@ -204,6 +211,7 @@ func TestManager_Restore(t *testing.T) {
}

assert.Equal(t, expectItems, target.items)
assert.Equal(t, 10, len(extSnapshotter.state))

// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{
Expand Down
20 changes: 15 additions & 5 deletions snapshots/types/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ type Snapshotter interface {
// to determine which heights to retain until after the snapshot is complete.
SetSnapshotInterval(snapshotInterval uint64)

// Restore restores a state snapshot, taking snapshot chunk readers as input.
// If the ready channel is non-nil, it returns a ready signal (by being closed) once the
// restorer is ready to accept chunks.
// Restore restores a state snapshot, taking the reader of protobuf message stream as input.
Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error)
}

// ExtensionPayloadReader read extension payloads,
// it returns io.EOF when reached either end of stream or the extension boundaries.
type ExtensionPayloadReader = func() ([]byte, error)

// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream.
type ExtensionPayloadWriter = func([]byte) error

// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream.
// ExtensionSnapshotter has an unique name and manages it's own internal formats.
type ExtensionSnapshotter interface {
Snapshotter

// SnapshotName returns the name of snapshotter, it should be unique in the manager.
SnapshotName() string

Expand All @@ -43,4 +46,11 @@ type ExtensionSnapshotter interface {

// SupportedFormats returns a list of formats it can restore from.
SupportedFormats() []uint32

// SnapshotExtension writes extension payloads into the underlying protobuf stream.
SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error

// RestoreExtension restores an extension state snapshot,
// the payload reader returns `io.EOF` when reached the extension boundaries.
RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error
}
6 changes: 3 additions & 3 deletions snapshots/types/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
protoio "github.com/gogo/protobuf/io"
)

// WriteExtensionItem writes an item payload for current extension snapshotter.
func WriteExtensionItem(protoWriter protoio.Writer, item []byte) error {
// WriteExtensionPayload writes an extension payload for current extension snapshotter.
func WriteExtensionPayload(protoWriter protoio.Writer, payload []byte) error {
return protoWriter.WriteMsg(&SnapshotItem{
Item: &SnapshotItem_ExtensionPayload{
ExtensionPayload: &SnapshotExtensionPayload{
Payload: item,
Payload: payload,
},
},
})
Expand Down

0 comments on commit e397434

Please sign in to comment.