Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

GC quantiles (continuation) #2077

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/googleapis/gnostic v0.0.0-20190624222214-25d8b0b66985 // indirect
github.com/gorilla/mux v1.7.3 // indirect
github.com/hashicorp/golang-lru v0.5.3
github.com/influxdata/influxdb v0.0.0-20180221223340-01288bdb0883
github.com/json-iterator/go v1.1.7 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down
72 changes: 72 additions & 0 deletions shed/field_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package shed

import (
"encoding/json"

"github.com/syndtr/goleveldb/leveldb"
)

// JSONField is a helper to store complex structure by
// encoding it in JSON format.
type JSONField struct {
db *DB
key []byte
}

// NewJSONField returns a new JSONField.
// It validates its name and type against the database schema.
func (db *DB) NewJSONField(name string) (f JSONField, err error) {
key, err := db.schemaFieldKey(name, "json")
if err != nil {
return f, err
}
return JSONField{
db: db,
key: key,
}, nil
}

// Get unmarshals data from the database to a provided val.
// If the data is not found leveldb.ErrNotFound is returned.
func (f JSONField) Get(val interface{}) (err error) {
b, err := f.db.Get(f.key)
if err != nil {
return err
}
return json.Unmarshal(b, val)
}

// Put marshals provided val and saves it to the database.
func (f JSONField) Put(val interface{}) (err error) {
b, err := json.Marshal(val)
if err != nil {
return err
}
return f.db.Put(f.key, b)
}

// PutInBatch marshals provided val and puts it into the batch.
func (f JSONField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error) {
b, err := json.Marshal(val)
if err != nil {
return err
}
batch.Put(f.key, b)
return nil
}
53 changes: 52 additions & 1 deletion shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package shed

import (
"bytes"
"errors"
"fmt"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (i Item) Merge(i2 Item) (new Item) {

// Index represents a set of LevelDB key value pairs that have common
// prefix. It holds functions for encoding and decoding keys and values
// to provide transparent actions on saved data which inclide:
// to provide transparent actions on saved data which include:
// - getting a particular Item
// - saving a particular Item
// - iterating over a sorted LevelDB keys
Expand Down Expand Up @@ -449,3 +451,52 @@ func (f Index) CountFrom(start Item) (count int, err error) {
}
return count, it.Error()
}

func (f Index) Offset(start *Item, shift int64) (i Item, err error) {
var startKey []byte
if start != nil {
startKey, err = f.encodeKeyFunc(*start)
if err != nil {
return i, err
}
} else {
startKey = f.prefix
}

it := f.db.NewIterator()
defer it.Release()

ok := it.Seek(startKey)
if !ok || !bytes.HasPrefix(it.Key(), startKey) {
return i, errors.New("start Item not found in index")
}

next := it.Next
if shift < 0 {
next = it.Prev
shift *= -1
}

key := it.Key()
for shift != 0 && next() {
key = it.Key()
if key[0] != f.prefix[0] {
break
}
shift--
}
if shift != 0 {
return i, fmt.Errorf("key not found, start: %p, shift: %d", start, shift)
}

keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...))
if err != nil {
return i, err
}
// create a copy of value byte slice not to share leveldb underlaying slice array
valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...))
if err != nil {
return i, err
}
return keyItem.Merge(valueItem), it.Error()
}
169 changes: 169 additions & 0 deletions shed/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,3 +1104,172 @@ func TestIndex_HasMulti(t *testing.T) {
t.Errorf("got %v, want %v", got, want)
}
}

func generateItems(n int) []Item {
items := make([]Item, 0, n)

for i := 0; i < n; i++ {
items = append(items, Item{
Address: []byte(fmt.Sprintf("hash%03d", i)),
Data: []byte(fmt.Sprintf("data%06d", i)),
StoreTimestamp: time.Now().UnixNano(),
})
}

return items
}

func TestIndexOffset(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()

index1, err := db.NewIndex("test1", retrievalIndexFuncs)
if err != nil {
t.Fatal(err)
}

index2, err := db.NewIndex("test2", retrievalIndexFuncs)
if err != nil {
t.Fatal(err)
}

index3, err := db.NewIndex("test3", retrievalIndexFuncs)
if err != nil {
t.Fatal(err)
}

index4, err := db.NewIndex("test4", retrievalIndexFuncs)
if err != nil {
t.Fatal(err)
}

items := generateItems(100)
for _, item := range items {
index1.Put(item)
index2.Put(item)
// index3 is intentionally empty
index4.Put(item)
}

tests := []struct {
start, offset int
}{
{0, 0},
{0, 1},
{0, 50},
{44, 0},
{10, 10},
{0, len(items) - 1},
{10, -3},
{10, -10},
{len(items) - 1, 0},
{len(items) - 2, 1},
}

for _, tc := range tests {
t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) {
item, err := index1.Offset(&items[tc.start], int64(tc.offset))
if err != nil {
tt.Error(err)
}
checkItem(tt, item, items[tc.start+tc.offset])

item, err = index2.Offset(&items[tc.start], int64(tc.offset))
if err != nil {
tt.Error(err)
}
checkItem(tt, item, items[tc.start+tc.offset])

item, err = index3.Offset(&items[tc.start], int64(tc.offset))
if err == nil {
tt.Error("expected error")
}

item, err = index4.Offset(&items[tc.start], int64(tc.offset))
if err != nil {
tt.Error(err)
}
checkItem(tt, item, items[tc.start+tc.offset])
})
}

// special cases - testing all indexes, to catch all edge cases
tests = []struct {
start, offset int
}{
{0, -1},
{len(items) - 1, 1},
{0, -1000},
{len(items) - 1, 1000},
}

for _, tc := range tests {
t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) {
_, err := index1.Offset(&items[tc.start], int64(tc.offset))
if err == nil {
tt.Error("expected error")
}
_, err = index2.Offset(&items[tc.start], int64(tc.offset))
if err == nil {
tt.Error("expected error")
}
_, err = index3.Offset(&items[tc.start], int64(tc.offset))
if err == nil {
tt.Error("expected error")
}
_, err = index4.Offset(&items[tc.start], int64(tc.offset))
if err == nil {
tt.Error("expected error")
}
})
}

t.Run("Invalid start Item", func(tt *testing.T) {
invalid := Item{
Address: []byte("out-of-index"),
Data: []byte("not so random data"),
AccessTimestamp: time.Now().UnixNano(),
}
_, err := index1.Offset(&invalid, 0)
if err == nil {
tt.Error("expected error")
}
_, err = index2.Offset(&invalid, 0)
if err == nil {
tt.Error("expected error")
}
_, err = index3.Offset(&invalid, 0)
if err == nil {
tt.Error("expected error")
}
_, err = index4.Offset(&invalid, 0)
if err == nil {
tt.Error("expected error")
}
})

t.Run("nil start Item", func(tt *testing.T) {
item, err := index1.Offset(nil, 0)
if err != nil {
t.Error(err)
}
checkItem(t, item, items[0])

item, err = index1.Offset(nil, 1)
if err != nil {
t.Error(err)
}
checkItem(t, item, items[1])

item, err = index1.Offset(nil, -1)
if err == nil {
t.Error("Error was expected!")
}

item, err = index2.Offset(nil, 10)
if err != nil {
t.Error(err)
}
checkItem(t, item, items[10])
})
}
Loading