Skip to content

Commit

Permalink
Merge pull request #447 from cosmos/justin/fast-node-get
Browse files Browse the repository at this point in the history
Refactor Get() to work with new faster cache
  • Loading branch information
ValarDragon committed Dec 6, 2021
2 parents 75d6e98 + c004b17 commit 0944259
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ cpu*.out
mem*.out
cpu*.pdf
mem*.pdf

# IDE files
.idea/*
45 changes: 45 additions & 0 deletions fast_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package iavl

import (
"github.com/pkg/errors"
)

// NOTE: This file favors int64 as opposed to int for size/counts.
// The Tree on the other hand favors int. This is intentional.

type FastNode struct {
key []byte
versionLastUpdatedAt int64
value []byte
leafHash []byte // TODO: Look into if this would help with proof stuff.
}

// NewFastNode returns a new fast node from a value and version.
func NewFastNode(key []byte, value []byte, version int64) *FastNode {
return &FastNode{
key: key,
versionLastUpdatedAt: version,
value: value,
}
}

// DeserializeFastNode constructs an *FastNode from an encoded byte slice.
func DeserializeFastNode(buf []byte) (*FastNode, error) {
val, n, cause := decodeBytes(buf)
if cause != nil {
return nil, errors.Wrap(cause, "decoding fastnode.value")
}
buf = buf[n:]

ver, _, cause := decodeVarint(buf)
if cause != nil {
return nil, errors.Wrap(cause, "decoding fastnode.version")
}

fastNode := &FastNode{
versionLastUpdatedAt: ver,
value: val,
}

return fastNode, nil
}
26 changes: 17 additions & 9 deletions immutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,28 @@ func (t *ImmutableTree) Export() *Exporter {
// Get returns the index and value of the specified key if it exists, or nil and the next index
// otherwise. The returned value must not be modified, since it may point to data stored within
// IAVL.
// TODO: Understand what is this index? Index on its own isn't well defined
// index across all leaves?
//
// The index is the index in the list of leaf nodes sorted lexicographically by key. The leftmost leaf has index 0.
// It's neighbor has index 1 and so on.
func (t *ImmutableTree) Get(key []byte) (index int64, value []byte) {
if t.root == nil {
return 0, nil
}
// IMPLEMENT FOLLOWING PSUEDOCODE
// value, version := t.nodeDb.fastGet(key)
// if value == nil { return t.root.get(t, key)}
// if version > t.version { return t.root.get(t, key)}
// else: return value
// TODO: Figure out what index is

return t.root.get(t, key)
fastNode, err := t.ndb.GetFastNode(key)
if fastNode == nil || err != nil {
return t.root.get(t, key)
}
value = fastNode.value
if value == nil {
return t.root.get(t, key)
}

// cache node is too new, so read from historical tree
if fastNode.versionLastUpdatedAt > t.version {
return t.root.get(t, key)
}
return 0, value // TODO determine index and adjust this appropriately
}

// GetByIndex gets the key and value at the specified index.
Expand Down
3 changes: 3 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (node *Node) has(t *ImmutableTree, key []byte) (has bool) {
}

// Get a key under the node.
//
// The index is the index in the list of leaf nodes sorted lexicographically by key. The leftmost leaf has index 0.
// It's neighbor has index 1 and so on.
func (node *Node) get(t *ImmutableTree, key []byte) (index int64, value []byte) {
if node.isLeaf() {
switch bytes.Compare(node.key, key) {
Expand Down
78 changes: 70 additions & 8 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type nodeDB struct {
nodeCache map[string]*list.Element // Node cache.
nodeCacheSize int // Node cache size limit in elements.
nodeCacheQueue *list.List // LRU queue of cache elements. Used for deletion.

fastNodeCache map[string]*list.Element // FastNode cache.
fastNodeCacheSize int // FastNode cache size limit in elements.
fastNodeCacheQueue *list.List // LRU queue of cache elements. Used for deletion.
}

func newNodeDB(db dbm.DB, cacheSize int, opts *Options) *nodeDB {
Expand All @@ -64,14 +68,17 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options) *nodeDB {
opts = &o
}
return &nodeDB{
db: db,
batch: db.NewBatch(),
opts: *opts,
latestVersion: 0, // initially invalid
nodeCache: make(map[string]*list.Element),
nodeCacheSize: cacheSize,
nodeCacheQueue: list.New(),
versionReaders: make(map[int64]uint32, 8),
db: db,
batch: db.NewBatch(),
opts: *opts,
latestVersion: 0, // initially invalid
nodeCache: make(map[string]*list.Element),
nodeCacheSize: cacheSize,
nodeCacheQueue: list.New(),
fastNodeCache: make(map[string]*list.Element),
fastNodeCacheSize: cacheSize,
fastNodeCacheQueue: list.New(),
versionReaders: make(map[int64]uint32, 8),
}
}

Expand Down Expand Up @@ -113,6 +120,41 @@ func (ndb *nodeDB) GetNode(hash []byte) *Node {
return node
}

func (ndb *nodeDB) GetFastNode(key []byte) (*FastNode, error) {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()

if len(key) == 0 {
panic("nodeDB.GetFastNode() requires key")
}

// TODO make a second write lock just for fastNodeCacheQueue later
// Check the cache.
if elem, ok := ndb.fastNodeCache[string(key)]; ok {
// Already exists. Move to back of fastNodeCacheQueue.
ndb.fastNodeCacheQueue.MoveToBack(elem)
return elem.Value.(*FastNode), nil
}

// Doesn't exist, load.
buf, err := ndb.db.Get(key)
if err != nil {
return nil, fmt.Errorf("can't get fast-node %X: %v", key, err)
}
if buf == nil {
return nil, fmt.Errorf("value missing for key %x ", key)
}

fastNode, err := DeserializeFastNode(buf)
if err != nil {
return nil, fmt.Errorf("error reading FastNode. bytes: %x, error: %v ", buf, err)
}

fastNode.key = key
ndb.cacheFastNode(fastNode)
return fastNode, nil
}

// SaveNode saves a node to disk.
func (ndb *nodeDB) SaveNode(node *Node) {
ndb.mtx.Lock()
Expand Down Expand Up @@ -553,6 +595,26 @@ func (ndb *nodeDB) cacheNode(node *Node) {
}
}

func (ndb *nodeDB) uncacheFastNode(key []byte) {
if elem, ok := ndb.fastNodeCache[string(key)]; ok {
ndb.fastNodeCacheQueue.Remove(elem)
delete(ndb.fastNodeCache, string(key))
}
}

// Add a node to the cache and pop the least recently used node if we've
// reached the cache size limit.
func (ndb *nodeDB) cacheFastNode(node *FastNode) {
elem := ndb.fastNodeCacheQueue.PushBack(node)
ndb.fastNodeCache[string(node.key)] = elem

if ndb.fastNodeCacheQueue.Len() > ndb.fastNodeCacheSize {
oldest := ndb.fastNodeCacheQueue.Front()
key := ndb.fastNodeCacheQueue.Remove(oldest).(*FastNode).key
delete(ndb.fastNodeCache, string(key))
}
}

// Write to disk.
func (ndb *nodeDB) Commit() error {
ndb.mtx.Lock()
Expand Down

0 comments on commit 0944259

Please sign in to comment.