Skip to content

Commit

Permalink
refactor: make cachekv store thread-safe again (cosmos#14378)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
  • Loading branch information
2 people authored and mmsqe committed Dec 27, 2022
1 parent b9563e3 commit b22b8bf
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 83 deletions.
127 changes: 127 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ require (
github.com/tendermint/go-amino v0.16.0
github.com/tendermint/tendermint v0.34.24
github.com/tendermint/tm-db v0.6.7
github.com/tidwall/btree v1.5.2
github.com/tidwall/btree v1.6.0
golang.org/x/crypto v0.2.0
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,8 @@ github.com/tendermint/tendermint v0.34.24 h1:879MKKJWYYPJEMMKME+DWUTY4V9f/FBpnZD
github.com/tendermint/tendermint v0.34.24/go.mod h1:rXVrl4OYzmIa1I91av3iLv2HS0fGSiucyW9J4aMTpKI=
github.com/tendermint/tm-db v0.6.7 h1:fE00Cbl0jayAoqlExN6oyQJ7fR/ZtoVOmvPJ//+shu8=
github.com/tendermint/tm-db v0.6.7/go.mod h1:byQDzFkZV1syXr/ReXS808NxA2xvyuuVgXOJ/088L6I=
github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w=
github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.14.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
Expand Down
38 changes: 24 additions & 14 deletions store/cachekv/internal/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/tidwall/btree"
)

Expand All @@ -21,46 +22,55 @@ var errKeyEmpty = errors.New("key cannot be empty")
//
// We choose tidwall/btree over google/btree here because it provides API to implement step iterator directly.
type BTree struct {
tree btree.BTreeG[item]
tree *btree.BTreeG[item]
}

// NewBTree creates a wrapper around `btree.BTreeG`.
func NewBTree() *BTree {
return &BTree{tree: *btree.NewBTreeGOptions(byKeys, btree.Options{
Degree: bTreeDegree,
// Contract: cachekv store must not be called concurrently
NoLocks: true,
})}
func NewBTree() BTree {
return BTree{
tree: btree.NewBTreeGOptions(byKeys, btree.Options{
Degree: bTreeDegree,
NoLocks: false,
}),
}
}

func (bt *BTree) Set(key, value []byte) {
func (bt BTree) Set(key, value []byte) {
bt.tree.Set(newItem(key, value))
}

func (bt *BTree) Get(key []byte) []byte {
func (bt BTree) Get(key []byte) []byte {
i, found := bt.tree.Get(newItem(key, nil))
if !found {
return nil
}
return i.value
}

func (bt *BTree) Delete(key []byte) {
func (bt BTree) Delete(key []byte) {
bt.tree.Delete(newItem(key, nil))
}

func (bt *BTree) Iterator(start, end []byte) (*memIterator, error) {
func (bt BTree) Iterator(start, end []byte) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}
return NewMemIterator(start, end, bt, make(map[string]struct{}), true), nil
return newMemIterator(start, end, bt, true), nil
}

func (bt *BTree) ReverseIterator(start, end []byte) (*memIterator, error) {
func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}
return NewMemIterator(start, end, bt, make(map[string]struct{}), false), nil
return newMemIterator(start, end, bt, false), nil
}

// Copy the tree. This is a copy-on-write operation and is very fast because
// it only performs a shadowed copy.
func (bt BTree) Copy() BTree {
return BTree{
tree: bt.tree.Copy(),
}
}

// item is a btree item with byte slices as keys and values
Expand Down
3 changes: 2 additions & 1 deletion store/cachekv/internal/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"testing"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestDBIterator(t *testing.T) {
verifyIterator(t, ritr, nil, "reverse iterator with empty db")
}

func verifyIterator(t *testing.T, itr *memIterator, expected []int64, msg string) {
func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg string) {
i := 0
for itr.Valid() {
key := itr.Key()
Expand Down
26 changes: 4 additions & 22 deletions store/cachekv/internal/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@ import (
var _ types.Iterator = (*memIterator)(nil)

// memIterator iterates over iterKVCache items.
// if key is nil, means it was deleted.
// if value is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
iter btree.GenericIter[item]
iter btree.IterG[item]

start []byte
end []byte
ascending bool
lastKey []byte
deleted map[string]struct{}
valid bool
}

func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}, ascending bool) *memIterator {
func newMemIterator(start, end []byte, items BTree, ascending bool) *memIterator {
iter := items.tree.Iter()
var valid bool
if ascending {
Expand Down Expand Up @@ -52,8 +50,6 @@ func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}
start: start,
end: end,
ascending: ascending,
lastKey: nil,
deleted: deleted,
valid: valid,
}

Expand Down Expand Up @@ -113,21 +109,7 @@ func (mi *memIterator) Key() []byte {
}

func (mi *memIterator) Value() []byte {
item := mi.iter.Item()
key := item.key
// We need to handle the case where deleted is modified and includes our current key
// We handle this by maintaining a lastKey object in the iterator.
// If the current key is the same as the last key (and last key is not nil / the start)
// then we are calling value on the same thing as last time.
// Therefore we don't check the mi.deleted to see if this key is included in there.
if _, ok := mi.deleted[string(key)]; ok {
if mi.lastKey == nil || !bytes.Equal(key, mi.lastKey) {
// not re-calling on old last key
return nil
}
}
mi.lastKey = key
return item.value
return mi.iter.Item().value
}

func (mi *memIterator) assertValid() {
Expand Down
70 changes: 27 additions & 43 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ type cValue struct {
}

// Store wraps an in-memory cache around an underlying types.KVStore.
// If a cached value is nil but deleted is defined for the corresponding key,
// it means the parent doesn't have the key. (No need to delete upon Write())
type Store struct {
mtx sync.Mutex
cache map[string]*cValue
deleted map[string]struct{}
unsortedCache map[string]struct{}
sortedCache *internal.BTree // always ascending sorted
sortedCache internal.BTree // always ascending sorted
parent types.KVStore
}

Expand All @@ -41,7 +38,6 @@ var _ types.CacheKVStore = (*Store)(nil)
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
deleted: make(map[string]struct{}),
unsortedCache: make(map[string]struct{}),
sortedCache: internal.NewBTree(),
parent: parent,
Expand All @@ -63,7 +59,7 @@ func (store *Store) Get(key []byte) (value []byte) {
cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)]
if !ok {
value = store.parent.Get(key)
store.setCacheValue(key, value, false, false)
store.setCacheValue(key, value, false)
} else {
value = cacheValue.value
}
Expand All @@ -79,7 +75,7 @@ func (store *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)

store.setCacheValue(key, value, false, true)
store.setCacheValue(key, value, true)
}

// Has implements types.KVStore.
Expand All @@ -94,15 +90,15 @@ func (store *Store) Delete(key []byte) {
defer store.mtx.Unlock()

types.AssertValidKey(key)
store.setCacheValue(key, nil, true, true)
store.setCacheValue(key, nil, true)
}

// Implements Cachetypes.KVStore.
func (store *Store) Write() {
store.mtx.Lock()
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.deleted) == 0 && len(store.unsortedCache) == 0 {
if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
store.sortedCache = internal.NewBTree()
return
}
Expand All @@ -122,19 +118,16 @@ func (store *Store) Write() {
// TODO: Consider allowing usage of Batch, which would allow the write to
// at least happen atomically.
for _, key := range keys {
if store.isDeleted(key) {
// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
store.parent.Delete([]byte(key))
continue
}

// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
cacheValue := store.cache[key]
if cacheValue.value != nil {
// It already exists in the parent, hence delete it.
// It already exists in the parent, hence update it.
store.parent.Set([]byte(key), cacheValue.value)
} else {
store.parent.Delete([]byte(key))
}
}

Expand All @@ -144,9 +137,6 @@ func (store *Store) Write() {
for key := range store.cache {
delete(store.cache, key)
}
for key := range store.deleted {
delete(store.deleted, key)
}
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
Expand Down Expand Up @@ -180,16 +170,24 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()

var parent, cache types.Iterator
store.dirtyItems(start, end)
isoSortedCache := store.sortedCache.Copy()

var (
err error
parent, cache types.Iterator
)

if ascending {
parent = store.parent.Iterator(start, end)
cache, err = isoSortedCache.Iterator(start, end)
} else {
parent = store.parent.ReverseIterator(start, end)
cache, err = isoSortedCache.ReverseIterator(start, end)
}
if err != nil {
panic(err)
}

store.dirtyItems(start, end)
cache = internal.NewMemIterator(start, end, store.sortedCache, store.deleted, ascending)

return internal.NewCacheMergeIterator(parent, cache, ascending)
}
Expand Down Expand Up @@ -364,12 +362,7 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
}

for _, item := range unsorted {
if item.Value == nil {
// deleted element, tracked by store.deleted
// setting arbitrary value
store.sortedCache.Set(item.Key, []byte{})
continue
}
// sortedCache is able to store `nil` value to represent deleted items.
store.sortedCache.Set(item.Key, item.Value)
}
}
Expand All @@ -378,23 +371,14 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
// etc

// Only entrypoint to mutate store.cache.
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
// A `nil` value means a deletion.
func (store *Store) setCacheValue(key, value []byte, dirty bool) {
keyStr := conv.UnsafeBytesToStr(key)
store.cache[keyStr] = &cValue{
value: value,
dirty: dirty,
}
if deleted {
store.deleted[keyStr] = struct{}{}
} else {
delete(store.deleted, keyStr)
}
if dirty {
store.unsortedCache[keyStr] = struct{}{}
}
}

func (store *Store) isDeleted(key string) bool {
_, ok := store.deleted[key]
return ok
}

0 comments on commit b22b8bf

Please sign in to comment.