Skip to content

Commit

Permalink
# This is a combination of 5 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

Enable locking kv store

# The commit message #2 will be skipped:

# Fix some lock orderings

# The commit message #3 will be skipped:

# Fix minor typo

# The commit message #4 will be skipped:

# Ensure that writes happen in a deterministic order.
#
# Ensure that reads are also done all the time, remove this if it doesn't impact gas.

# The commit message #5 will be skipped:

# Remove locking for now for lockingkv.Get/Has
  • Loading branch information
lcwik committed Feb 26, 2024
1 parent 6eacec6 commit 064329f
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 3 deletions.
40 changes: 39 additions & 1 deletion store/cachemulti/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cachemulti

import (
"cosmossdk.io/store/lockingkv"
"fmt"
"io"

Expand Down Expand Up @@ -57,7 +58,11 @@ func NewFromKVStore(

store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx)
}
cms.stores[key] = cachekv.NewStore(store.(types.KVStore))
if kvStoreKey, ok := key.(*types.KVStoreKey); ok && kvStoreKey.IsLocking() {
cms.stores[key] = lockingkv.NewStore(store.(types.KVStore))
} else {
cms.stores[key] = cachekv.NewStore(store.(types.KVStore))
}
}

return cms
Expand Down Expand Up @@ -142,6 +147,39 @@ func (cms Store) CacheMultiStore() types.CacheMultiStore {
return newCacheMultiStoreFromCMS(cms)
}

// Implements MultiStore.
func (cms Store) CacheMultiStoreWithLocking(storeLocks map[types.StoreKey][][]byte) types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range cms.stores {
stores[k] = v
}

cms2 := Store{
db: cachekv.NewStore(cms.db),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
keys: cms.keys,
traceWriter: cms.traceWriter,
traceContext: cms.traceContext,
}

for key, store := range stores {
if rowLocks, ok := storeLocks[key]; ok {
cms2.stores[key] = store.(types.LockingCacheWrapper).CacheWrapWithLocks(rowLocks)
} else {
if cms.TracingEnabled() {
tctx := cms.traceContext.Clone().Merge(types.TraceContext{
storeNameCtxKey: key.Name(),
})

store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx)
}
cms2.stores[key] = cachekv.NewStore(store.(types.KVStore))
}
}

return cms2
}

// CacheMultiStoreWithVersion implements the MultiStore interface. It will panic
// as an already cached multi-store cannot load previous versions.
//
Expand Down
253 changes: 253 additions & 0 deletions store/lockingkv/lockingkv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package lockingkv

import (
"cosmossdk.io/store/cachekv"
"cosmossdk.io/store/tracekv"
storetypes "cosmossdk.io/store/types"
"golang.org/x/exp/slices"
"io"
"sort"
"sync"
)

var _ storetypes.CacheKVStore = &Store{}
var _ storetypes.LockingCacheWrapper = &Store{}
var _ storetypes.CacheKVStore = &lockedkv{}

func NewStore(parent storetypes.KVStore) *Store {
return &Store{
parent: parent,
locks: &sync.Map{},
}
}

type lockAndValue struct {
lock sync.Mutex
value []byte
}

type Store struct {
parent storetypes.KVStore
locks *sync.Map /* map from string key to lockAndValue. */
}

// getSortedKeys returns the keys of the map in sorted order.
func getSortedKeys[R interface {
~[]K
sort.Interface
}, K comparable, V any](m map[K]V) []K {
keys := make([]K, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(R(keys))
return keys
}

func (s *Store) Write() {
values := make(map[string]*lockAndValue)
s.locks.Range(func(key, value any) bool {
lv := value.(*lockAndValue)
values[key.(string)] = lv
return true
})

// We need to make the mutations to the parent in a deterministic order to ensure a deterministic hash.
for _, sortedKey := range getSortedKeys[sort.StringSlice](values) {
lv := values[sortedKey]
lv.lock.Lock()
defer lv.lock.Unlock()
if lv.value == nil {
s.parent.Delete([]byte(sortedKey))
} else {
s.parent.Set([]byte(sortedKey), lv.value)
}
}
}

func (s *Store) GetStoreType() storetypes.StoreType {
return s.parent.GetStoreType()
}

func (s *Store) CacheWrap() storetypes.CacheWrap {
return cachekv.NewStore(s)
}

func (s *Store) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap {
return cachekv.NewStore(tracekv.NewStore(s, w, tc))
}

func (s *Store) CacheWrapWithLocks(keys [][]byte) storetypes.CacheWrap {
stringKeys := make([]string, len(keys))
for i, key := range keys {
stringKeys[i] = string(key)
}
// Ensure that we always operate in a deterministic ordering when acquiring locks to prevent deadlock.
slices.Sort(stringKeys)
for _, stringKey := range stringKeys {
// If we created this instance holding the lock otherwise attempt to acquire the lock on the previous instance.
lv := &lockAndValue{
// TODO: Does this reading into the parent store impact gas calculations? If not let us only do it if it
// is required.
value: s.parent.Get([]byte(stringKey)),
}
lv.lock.Lock()
v, loaded := s.locks.LoadOrStore(stringKey, lv)
if loaded {
lv = v.(*lockAndValue)
lv.lock.Lock()
}
}

return &lockedkv{
parent: s,
sortedKeys: stringKeys,
mutations: make(map[string][]byte),
}
}

func (s *Store) Get(key []byte) []byte {
stringKey := string(key)
v, loaded := s.locks.Load(stringKey)
if loaded {
lv := v.(*lockAndValue)
// Do we need to lock here?, currently causes deadlock due to lockedkv calls s.parent.Get(...)
//lv.lock.Lock()
//defer lv.lock.Unlock()
return lv.value
}

return s.parent.Get(key)
}

func (s *Store) Has(key []byte) bool {
// TODO: Can we see if we only use this store during check state and then we wouldn't have to implement get/set/...
stringKey := string(key)
v, loaded := s.locks.Load(stringKey)
if loaded {
lv := v.(*lockAndValue)
// Do we need to lock here?, currently causes deadlock due to lockedkv calls s.parent.Has(...)
//lv.lock.Lock()
//defer lv.lock.Unlock()
return lv.value != nil
}
return s.parent.Has(key)
}

func (s *Store) Set(key, value []byte) {
stringKey := string(key)
v, loaded := s.locks.LoadOrStore(stringKey, &lockAndValue{
value: value,
})
if loaded {
lv := v.(*lockAndValue)
lv.lock.Lock()
defer lv.lock.Unlock()
lv.value = value
}
}

func (s *Store) Delete(key []byte) {
s.Set(key, nil)
}

func (s *Store) Iterator(start, end []byte) storetypes.Iterator {
panic("This store does not support iteration.")
}

func (s *Store) ReverseIterator(start, end []byte) storetypes.Iterator {
panic("This store does not support iteration.")
}

func (s *Store) writeAndUnlock(keys []string, mutations map[string][]byte) {
for _, key := range keys {
v, ok := s.locks.Load(key)
if !ok {
panic("Key not found")
}
lv := v.(*lockAndValue)

// Update the value if it was mutated while the lock was held.
if newValue, wasMutated := mutations[key]; wasMutated {
lv.value = newValue
}

// Unlock the row lock.
lv.lock.Unlock()
}
}

type lockedkv struct {
parent *Store

sortedKeys []string
mutations map[string][]byte
}

func (s *lockedkv) Write() {
s.parent.writeAndUnlock(s.sortedKeys, s.mutations)
}

func (s *lockedkv) GetStoreType() storetypes.StoreType {
return s.parent.GetStoreType()
}

func (s *lockedkv) CacheWrap() storetypes.CacheWrap {
return cachekv.NewStore(s)
}

func (s *lockedkv) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap {
return cachekv.NewStore(tracekv.NewStore(s, w, tc))
}

func (s *lockedkv) Get(key []byte) []byte {
if key == nil {
panic("nil key")
}
stringKey := string(key)

if value, ok := s.mutations[stringKey]; ok {
return value
}

return s.parent.Get(key)
}

func (s *lockedkv) Has(key []byte) bool {
if key == nil {
panic("nil key")
}
stringKey := string(key)

if value, ok := s.mutations[stringKey]; ok {
return value != nil
}

return s.parent.Has(key)
}

func (s *lockedkv) Set(key, value []byte) {
if key == nil {
panic("nil key")
}
stringKey := string(key)

i := sort.SearchStrings(s.sortedKeys, stringKey)
if i < len(s.sortedKeys) && s.sortedKeys[i] != stringKey {
panic("Setting value without locking being held for key, did you mean to use CacheWrapWithLockedKeys(keys)?")
}

s.mutations[stringKey] = value
}

func (s *lockedkv) Delete(key []byte) {
s.Set(key, nil)
}

func (s *lockedkv) Iterator(start, end []byte) storetypes.Iterator {
panic("This store does not support iteration.")
}

func (s *lockedkv) ReverseIterator(start, end []byte) storetypes.Iterator {
panic("This store does not support iteration.")
}
35 changes: 33 additions & 2 deletions store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ type CacheWrapper interface {
CacheWrapWithTrace(w io.Writer, tc TraceContext) CacheWrap
}

type LockingCacheWrapper interface {
CacheWrapper

// CacheWrapWithLocks branches a store with the specific locks acquired.
CacheWrapWithLocks(rowLocks [][]byte) CacheWrap
}

func (cid CommitID) IsZero() bool {
return cid.Version == 0 && len(cid.Hash) == 0
}
Expand Down Expand Up @@ -377,7 +384,8 @@ type CapabilityKey StoreKey
// KVStoreKey is used for accessing substores.
// Only the pointer value should ever be used - it functions as a capabilities key.
type KVStoreKey struct {
name string
name string
locking bool
}

// NewKVStoreKey returns a new pointer to a KVStoreKey.
Expand All @@ -391,6 +399,18 @@ func NewKVStoreKey(name string) *KVStoreKey {
}
}

// NewKVStoreKey returns a new pointer to a KVStoreKey.
// Use a pointer so keys don't collide.
func NewLockingKVStoreKey(name string) *KVStoreKey {
if name == "" {
panic("empty key name not allowed")
}
return &KVStoreKey{
name: name,
locking: true,
}
}

// NewKVStoreKeys returns a map of new pointers to KVStoreKey's.
// The function will panic if there is a potential conflict in names (see `assertNoPrefix`
// function for more details).
Expand All @@ -409,7 +429,18 @@ func (key *KVStoreKey) Name() string {
}

func (key *KVStoreKey) String() string {
return fmt.Sprintf("KVStoreKey{%p, %s}", key, key.name)
return fmt.Sprintf("KVStoreKey{%p, %s, locking: %t}", key, key.name, key.locking)
}

func (key *KVStoreKey) IsLocking() bool {
return key.locking
}

func (key *KVStoreKey) WithLocking() *KVStoreKey {
return &KVStoreKey{
name: key.name,
locking: true,
}
}

// TransientStoreKey is used for indexing transient stores in a MultiStore
Expand Down

0 comments on commit 064329f

Please sign in to comment.