From 65c65ec12688a87af9f08031897901e07afebdb8 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 4 Nov 2019 16:23:11 +0100 Subject: [PATCH 01/18] storage/localstore: gc quantiles poc --- go.mod | 1 + shed/field_json.go | 72 +++++++++++++++++++++ shed/index.go | 42 ++++++++++++ storage/localstore/gc.go | 50 +++++++++++++-- storage/localstore/localstore.go | 106 +++++++++++++++++++++++++++++++ storage/localstore/mode_set.go | 38 ++++++++++- swarm.go | 20 ++++++ 7 files changed, 324 insertions(+), 5 deletions(-) create mode 100644 shed/field_json.go diff --git a/go.mod b/go.mod index a64125cf9a..53a0a5d33c 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/googleapis/gnostic v0.0.0-20190624222214-25d8b0b66985 // indirect github.com/gorilla/mux v1.7.3 // indirect github.com/hashicorp/golang-lru v0.5.3 + github.com/influxdata/influxdb v0.0.0-20180221223340-01288bdb0883 github.com/json-iterator/go v1.1.7 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect diff --git a/shed/field_json.go b/shed/field_json.go new file mode 100644 index 0000000000..56938d67d5 --- /dev/null +++ b/shed/field_json.go @@ -0,0 +1,72 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package shed + +import ( + "encoding/json" + + "github.com/syndtr/goleveldb/leveldb" +) + +// JSONField is a helper to store complex structure by +// encoding it in JSON format. +type JSONField struct { + db *DB + key []byte +} + +// NewJSONField returns a new JSONField. +// It validates its name and type against the database schema. +func (db *DB) NewJSONField(name string) (f JSONField, err error) { + key, err := db.schemaFieldKey(name, "json") + if err != nil { + return f, err + } + return JSONField{ + db: db, + key: key, + }, nil +} + +// Get unmarshals data from the database to a provided val. +// If the data is not found leveldb.ErrNotFound is returned. +func (f JSONField) Get(val interface{}) (err error) { + b, err := f.db.Get(f.key) + if err != nil { + return err + } + return json.Unmarshal(b, val) +} + +// Put marshals provided val and saves it to the database. +func (f JSONField) Put(val interface{}) (err error) { + b, err := json.Marshal(val) + if err != nil { + return err + } + return f.db.Put(f.key, b) +} + +// PutInBatch marshals provided val and puts it into the batch. +func (f JSONField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error) { + b, err := json.Marshal(val) + if err != nil { + return err + } + batch.Put(f.key, b) + return nil +} diff --git a/shed/index.go b/shed/index.go index 8dbb74bb1e..618e0a23da 100644 --- a/shed/index.go +++ b/shed/index.go @@ -18,6 +18,7 @@ package shed import ( "bytes" + "errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -449,3 +450,44 @@ func (f Index) CountFrom(start Item) (count int, err error) { } return count, it.Error() } + +func (f Index) Offset(start *Item, shift int64) (i Item, err error) { + var startKey []byte + if start != nil { + startKey, err = f.encodeKeyFunc(*start) + if err != nil { + return i, err + } + } + it := f.db.NewIterator() + defer it.Release() + + next := it.Next + if shift < 0 { + next = it.Prev + shift *= -1 + } + + var key []byte + for ok := it.Seek(startKey); ok && shift > 0; ok = next() { + key = it.Key() + if key[0] != f.prefix[0] { + break + } + shift-- + } + if key == nil { + return i, errors.New("key not found") + } + + keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...)) + if err != nil { + return i, err + } + // create a copy of value byte slice not to share leveldb underlaying slice array + valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...)) + if err != nil { + return i, err + } + return keyItem.Merge(valueItem), it.Error() +} diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index 6373ecc7d7..5254a845dc 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -62,7 +62,7 @@ func (db *DB) collectGarbageWorker() { db.triggerGarbageCollection() } - if testHookCollectGarbage != nil { + if collectedCount > 0 && testHookCollectGarbage != nil { testHookCollectGarbage(collectedCount) } case <-db.close: @@ -90,7 +90,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { batch := new(leveldb.Batch) target := db.gcTarget() - // protect database from changing idexes and gcSize + // protect database from changing indexes and gcSize db.batchMu.Lock() defer db.batchMu.Unlock() @@ -124,7 +124,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { db.gcIndex.DeleteInBatch(batch, item) collectedCount++ if collectedCount >= gcBatchSize { - // bach size limit reached, + // batch size limit reached, // another gc run is needed done = false return true, nil @@ -143,7 +143,9 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { metrics.GetOrRegisterCounter(metricName+"/writebatch/err", nil).Inc(1) return 0, false, err } - return collectedCount, done, nil + + err = db.updateGCQuantiles() + return collectedCount, done, err } // removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex. @@ -258,6 +260,46 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { return nil } +// updateGCQuantiles adjusts gc quantiles based on the +// current gc size, by comparing stored quantile +// positions with the ones calculated from the current +// gc index size. +func (db *DB) updateGCQuantiles() (err error) { + var gcQuantiles quantiles + if err = db.gcQuantiles.Get(&gcQuantiles); err != nil { + return err + } + gcSize, err := db.gcSize.Get() + if err != nil { + return err + } + var newQuantiles quantiles + for _, q := range gcQuantiles { + item, position, found := gcQuantiles.Get(q.fraction) + if !found { + continue + } + newPosition := quantilePosition(gcSize, q.Numerator, q.Denominator) + diff := uint64Diff(position, newPosition) + if diff == 0 { + continue + } + newItem, err := db.gcIndex.Offset(&item, diff) + if err != nil { + return err + } + newQuantiles.Set(q.fraction, newItem, newPosition) + } + return db.gcQuantiles.Put(newQuantiles) +} + +func uint64Diff(one, two uint64) int64 { + if one > two { + return int64(one - two) + } + return -1 * int64(two-one) +} + // testHookCollectGarbage is a hook that can provide // information when a garbage collection run is done // and how many items it removed. diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index fb17ffe2c6..b0e3b460c4 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -19,8 +19,10 @@ package localstore import ( "encoding/binary" "errors" + "math" "os" "runtime/pprof" + "sort" "sync" "time" @@ -92,6 +94,10 @@ type DB struct { // field that stores number of intems in gc index gcSize shed.Uint64Field + // field that stores all positions (item and index) + // of all gc quantiles + gcQuantiles shed.JSONField + // garbage collection is triggered when gcSize exceeds // the capacity value capacity uint64 @@ -107,6 +113,9 @@ type DB struct { // are done before closing the database updateGCWG sync.WaitGroup + responsibilityRadius int + responsibilityRadiusMu sync.RWMutex + baseKey []byte batchMu sync.Mutex @@ -386,6 +395,8 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if err != nil { return nil, err } + // create a bucket for gc index quantiles + db.gcQuantiles, err = db.shed.NewJSONField("gc-quantiles") // Create a index structure for storing pinned chunks and their pin counts db.pinIndex, err = db.shed.NewIndex("Hash->PinCounter", shed.IndexFuncs{ @@ -494,6 +505,20 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) { return indexInfo, err } +func (db *DB) SetResponsibilityRadius(r int) { + db.responsibilityRadiusMu.Lock() + defer db.responsibilityRadiusMu.Unlock() + + db.responsibilityRadius = r +} + +func (db *DB) getResponsibilityRadius() (r int) { + db.responsibilityRadiusMu.RLock() + defer db.responsibilityRadiusMu.RUnlock() + + return db.responsibilityRadius +} + // chunkToItem creates new Item with data provided by the Chunk. func chunkToItem(ch chunk.Chunk) shed.Item { return shed.Item{ @@ -542,3 +567,84 @@ func totalTimeMetric(name string, start time.Time) { totalTime := time.Since(start) metrics.GetOrRegisterResettingTimer(name+"/total-time", nil).Update(totalTime) } + +type fraction struct { + Numerator uint64 + Denominator uint64 +} + +func (f fraction) Decimal() float64 { + return float64(f.Numerator) / float64(f.Denominator) +} + +type quantile struct { + fraction + Item shed.Item + Position uint64 +} + +type quantiles []quantile + +func (q quantiles) Len() int { + return len(q) +} + +func (q quantiles) Less(i, j int) bool { + return q[i].fraction.Decimal() < q[j].fraction.Decimal() +} + +func (q quantiles) Swap(i, j int) { + q[i], q[j] = q[j], q[i] +} + +func (q quantiles) Get(f fraction) (item shed.Item, position uint64, found bool) { + for _, x := range q { + if x.fraction == f { + return x.Item, x.Position, true + } + } + return item, 0, false +} + +func (q quantiles) Closest(f fraction) (closest *quantile) { + for _, x := range q { + if x.fraction == f { + return &x + } + if closest == nil || math.Abs(x.Decimal()-f.Decimal()) < math.Abs(closest.Decimal()-f.Decimal()) { + closest = &x + } + } + return closest +} + +func (q quantiles) Set(f fraction, item shed.Item, position uint64) { + for i := range q { + if q[i].fraction == f { + q[i].Item = item + q[i].Position = position + return + } + } + q = append(q, quantile{ + fraction: f, + Item: item, + }) + sort.Sort(q) +} + +func quantilePosition(total, numerator, denominator uint64) uint64 { + return total / denominator * numerator +} + +// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks +// TODO: review and document exact quantiles for chunks +func chunkQuantileFraction(po, responsibilityRadius int) fraction { + if po < responsibilityRadius { + // More Distant Chunks + n := uint64(responsibilityRadius - po) + return fraction{Numerator: n, Denominator: n + 1} + } + // Most Proximate Chunks + return fraction{Numerator: 1, Denominator: 3} +} diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 7703532fdc..b8f288fcd9 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -306,10 +306,46 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS return 0, err } if !ok { - err = db.gcIndex.PutInBatch(batch, item) + // set access timestamp based on quantile calculated from + // chunk proximity and db.responsibilityRadius + f := chunkQuantileFraction(int(db.po(item.Address)), db.getResponsibilityRadius()) + gcSize, err := db.gcSize.Get() if err != nil { return 0, err } + position := quantilePosition(gcSize, f.Numerator, f.Denominator) + var gcQuantiles quantiles + err = db.gcQuantiles.Get(&gcQuantiles) + if err != nil && err != leveldb.ErrNotFound { + return 0, err + } + var found bool + for _, q := range gcQuantiles { + if q.fraction == f { + item.AccessTimestamp = q.Item.AccessTimestamp + 1 + found = true + break + } + } + if !found { + var shift int64 + if closest := gcQuantiles.Closest(f); closest == nil { + shift = int64(position) + } else { + shift = int64(closest.Position) - int64(position) + } + i, err = db.gcIndex.Offset(nil, shift) + if err != nil { + return 0, err + } + item.AccessTimestamp = i.AccessTimestamp + 1 + } + gcQuantiles.Set(f, item, position) + db.gcQuantiles.PutInBatch(batch, gcQuantiles) + + db.retrievalAccessIndex.PutInBatch(batch, item) + db.pushIndex.DeleteInBatch(batch, item) + db.gcIndex.PutInBatch(batch, item) gcSizeChange++ } diff --git a/swarm.go b/swarm.go index ff109c6bcc..47697a0e13 100644 --- a/swarm.go +++ b/swarm.go @@ -242,6 +242,26 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.retrieval = retrieval.New(to, self.netStore, bzzconfig.Address, self.swap) self.netStore.RemoteGet = self.retrieval.RequestFromPeers + // update localstore responsibility radius on kademlia + // neighbourhood depth change. + go func() { + c, unsbscribe := to.SubscribeToNeighbourhoodDepthChange() + self.cleanupFuncs = append(self.cleanupFuncs, func() error { + unsbscribe() + return nil + }) + + for { + select { + case _, ok := <-c: + if !ok { + return + } + localStore.SetResponsibilityRadius(to.NeighbourhoodDepth()) + } + } + }() + feedsHandler.SetStore(self.netStore) syncing := true From 233c3db1eed5f817499deebd8306ade78ec7ca9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 16 Jan 2020 12:36:55 +0100 Subject: [PATCH 02/18] storage/localstore: GC quantiles refactoring --- storage/localstore/gc.go | 2 +- storage/localstore/localstore.go | 83 ------------------------------ storage/localstore/mode_set.go | 2 +- storage/localstore/quantile.go | 88 ++++++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 85 deletions(-) create mode 100644 storage/localstore/quantile.go diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index 5254a845dc..2c4e4d065c 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -279,7 +279,7 @@ func (db *DB) updateGCQuantiles() (err error) { if !found { continue } - newPosition := quantilePosition(gcSize, q.Numerator, q.Denominator) + newPosition := quantilePosition(gcSize, q.numerator, q.denominator) diff := uint64Diff(position, newPosition) if diff == 0 { continue diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index b0e3b460c4..d6d8a65de1 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -19,10 +19,8 @@ package localstore import ( "encoding/binary" "errors" - "math" "os" "runtime/pprof" - "sort" "sync" "time" @@ -567,84 +565,3 @@ func totalTimeMetric(name string, start time.Time) { totalTime := time.Since(start) metrics.GetOrRegisterResettingTimer(name+"/total-time", nil).Update(totalTime) } - -type fraction struct { - Numerator uint64 - Denominator uint64 -} - -func (f fraction) Decimal() float64 { - return float64(f.Numerator) / float64(f.Denominator) -} - -type quantile struct { - fraction - Item shed.Item - Position uint64 -} - -type quantiles []quantile - -func (q quantiles) Len() int { - return len(q) -} - -func (q quantiles) Less(i, j int) bool { - return q[i].fraction.Decimal() < q[j].fraction.Decimal() -} - -func (q quantiles) Swap(i, j int) { - q[i], q[j] = q[j], q[i] -} - -func (q quantiles) Get(f fraction) (item shed.Item, position uint64, found bool) { - for _, x := range q { - if x.fraction == f { - return x.Item, x.Position, true - } - } - return item, 0, false -} - -func (q quantiles) Closest(f fraction) (closest *quantile) { - for _, x := range q { - if x.fraction == f { - return &x - } - if closest == nil || math.Abs(x.Decimal()-f.Decimal()) < math.Abs(closest.Decimal()-f.Decimal()) { - closest = &x - } - } - return closest -} - -func (q quantiles) Set(f fraction, item shed.Item, position uint64) { - for i := range q { - if q[i].fraction == f { - q[i].Item = item - q[i].Position = position - return - } - } - q = append(q, quantile{ - fraction: f, - Item: item, - }) - sort.Sort(q) -} - -func quantilePosition(total, numerator, denominator uint64) uint64 { - return total / denominator * numerator -} - -// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks -// TODO: review and document exact quantiles for chunks -func chunkQuantileFraction(po, responsibilityRadius int) fraction { - if po < responsibilityRadius { - // More Distant Chunks - n := uint64(responsibilityRadius - po) - return fraction{Numerator: n, Denominator: n + 1} - } - // Most Proximate Chunks - return fraction{Numerator: 1, Denominator: 3} -} diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index b8f288fcd9..5fab024d43 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -313,7 +313,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS if err != nil { return 0, err } - position := quantilePosition(gcSize, f.Numerator, f.Denominator) + position := quantilePosition(gcSize, f.numerator, f.denominator) var gcQuantiles quantiles err = db.gcQuantiles.Get(&gcQuantiles) if err != nil && err != leveldb.ErrNotFound { diff --git a/storage/localstore/quantile.go b/storage/localstore/quantile.go new file mode 100644 index 0000000000..b437b32f66 --- /dev/null +++ b/storage/localstore/quantile.go @@ -0,0 +1,88 @@ +package localstore + +import ( + "github.com/ethersphere/swarm/shed" + "math" + "sort" +) + +type fraction struct { + numerator uint64 + denominator uint64 +} + +func (f fraction) Decimal() float64 { + return float64(f.numerator) / float64(f.denominator) +} + +type quantile struct { + fraction + Item shed.Item + Position uint64 +} + +type quantiles []quantile + +func (q quantiles) Len() int { + return len(q) +} + +func (q quantiles) Less(i, j int) bool { + return q[i].fraction.Decimal() < q[j].fraction.Decimal() +} + +func (q quantiles) Swap(i, j int) { + q[i], q[j] = q[j], q[i] +} + +func (q quantiles) Get(f fraction) (item shed.Item, position uint64, found bool) { + for _, x := range q { + if x.fraction == f { + return x.Item, x.Position, true + } + } + return item, 0, false +} + +func (q quantiles) Closest(f fraction) (closest *quantile) { + for _, x := range q { + if x.fraction == f { + return &x + } + if closest == nil || math.Abs(x.Decimal()-f.Decimal()) < math.Abs(closest.Decimal()-f.Decimal()) { + closest = &x + } + } + return closest +} + +func (q quantiles) Set(f fraction, item shed.Item, position uint64) { + for i := range q { + if q[i].fraction == f { + q[i].Item = item + q[i].Position = position + return + } + } + q = append(q, quantile{ + fraction: f, + Item: item, + }) + sort.Sort(q) +} + +func quantilePosition(total, numerator, denominator uint64) uint64 { + return total / denominator * numerator +} + +// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks +// TODO: review and document exact quantiles for chunks +func chunkQuantileFraction(po, responsibilityRadius int) fraction { + if po < responsibilityRadius { + // More Distant Chunks + n := uint64(responsibilityRadius - po) + return fraction{numerator: n, denominator: n + 1} + } + // Most Proximate Chunks + return fraction{numerator: 1, denominator: 3} +} From c0302ebd0ebac2b0d2468b4e82c39530ef75a1ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 16 Jan 2020 13:04:25 +0100 Subject: [PATCH 03/18] storage/localstore: first GC-quantiles tests --- storage/localstore/quantile_test.go | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 storage/localstore/quantile_test.go diff --git a/storage/localstore/quantile_test.go b/storage/localstore/quantile_test.go new file mode 100644 index 0000000000..f944d1a4c0 --- /dev/null +++ b/storage/localstore/quantile_test.go @@ -0,0 +1,59 @@ +package localstore + +import ( + "fmt" + "math" + "math/rand" + "sort" + "testing" +) + +func TestSorting(t *testing.T) { + t.Parallel() + n := uint64(10) + m := 10 * n + data := make(quantiles, n) + + rnd := rand.New(rand.NewSource(123)) // fixed seed for repeatable tests + + for i := uint64(0); i < n; i++ { + data[i].numerator = rnd.Uint64() % m + data[i].denominator = rnd.Uint64() % m + } + + sort.Sort(data) + + for i := uint64(1); i < n; i++ { + // compare without trusting any methods - use common denominator + x, y := data[i], data[i-1] + if x.numerator*y.denominator < y.numerator*x.denominator { + t.Error("quantiles not ordered correctly") + } + } +} + +func TestFraction(t *testing.T) { + t.Parallel() + + tolerance := float64(0.0000001) + + tests := []struct { + f fraction + r float64 + }{ + {fraction{1, 2}, 0.5}, + {fraction{2, 4}, 0.5}, + {fraction{3, 4}, 0.75}, + {fraction{4, 5}, 0.8}, + {fraction{1, 5}, 0.2}, + {fraction{99, 100}, 0.99}, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("Fraction %d/%d", test.f.numerator, test.f.denominator), func(tt *testing.T) { + if math.Abs(test.f.Decimal()-test.r) > tolerance { + tt.Errorf("expected: %f, received: %f", test.r, test.f.Decimal()) + } + }) + } +} From 040be9602b004701e9cf93e13f0d5350db1d42f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 16 Jan 2020 13:46:16 +0100 Subject: [PATCH 04/18] storage/localstore: more GC quantiles testing --- storage/localstore/quantile_test.go | 73 ++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/storage/localstore/quantile_test.go b/storage/localstore/quantile_test.go index f944d1a4c0..fc122bc240 100644 --- a/storage/localstore/quantile_test.go +++ b/storage/localstore/quantile_test.go @@ -10,20 +10,12 @@ import ( func TestSorting(t *testing.T) { t.Parallel() - n := uint64(10) - m := 10 * n - data := make(quantiles, n) - - rnd := rand.New(rand.NewSource(123)) // fixed seed for repeatable tests - - for i := uint64(0); i < n; i++ { - data[i].numerator = rnd.Uint64() % m - data[i].denominator = rnd.Uint64() % m - } + n := 100 + data := getRandomQuantiles(n, 123) sort.Sort(data) - for i := uint64(1); i < n; i++ { + for i := 1; i < n; i++ { // compare without trusting any methods - use common denominator x, y := data[i], data[i-1] if x.numerator*y.denominator < y.numerator*x.denominator { @@ -52,8 +44,65 @@ func TestFraction(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("Fraction %d/%d", test.f.numerator, test.f.denominator), func(tt *testing.T) { if math.Abs(test.f.Decimal()-test.r) > tolerance { - tt.Errorf("expected: %f, received: %f", test.r, test.f.Decimal()) + tt.Errorf("expected: %f, got: %f", test.r, test.f.Decimal()) } }) } } + +func TestClosest(t *testing.T) { + t.Parallel() + + emptyQ := make(quantiles, 0) + if emptyQ.Closest(fraction{1, 2}) != nil { + t.Error("expected nil") + } + + data := make(quantiles, 5) + data[0].numerator = 1 + data[0].denominator = 2 + data[1].numerator = 1 + data[1].denominator = 3 + data[2].numerator = 1 + data[2].denominator = 4 + data[3].numerator = 2 + data[3].denominator = 3 + data[4].numerator = 3 + data[4].denominator = 4 + + sanityChecks := []fraction{ + {0, 1}, {1, 1}, {1, 2}, {2, 2}, {2, 1}, + } + for _, check := range sanityChecks { + if data.Closest(check) == nil { + t.Error("expected any value, got nil") + } + } + + checks := []struct { + f fraction + expected *quantile + }{ + {fraction{1, 2}, &data[0]}, + } + + for _, check := range checks { + if c := data.Closest(check.f); c != check.expected { + t.Errorf("invalid quantile: expected fraction: %d/%d, got: %d/%d", + check.expected.numerator, check.expected.denominator, c.numerator, c.denominator) + } + } +} + +func getRandomQuantiles(n int, seed int64) quantiles { + m := uint64(100 * n) + data := make(quantiles, n) + + rnd := rand.New(rand.NewSource(seed)) // fixed seed for repeatable tests + + for i := 0; i < n; i++ { + data[i].numerator = rnd.Uint64() % m + data[i].denominator = rnd.Uint64() % m + } + return data +} From 1fb79025c97902dff1751791126664ba3541235a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 16 Jan 2020 13:46:51 +0100 Subject: [PATCH 05/18] storage/localstore: fix quantiles.Closest function Reference to local variable was returned instead of reference to element in underlying slice. --- storage/localstore/quantile.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/storage/localstore/quantile.go b/storage/localstore/quantile.go index b437b32f66..0342bb83c8 100644 --- a/storage/localstore/quantile.go +++ b/storage/localstore/quantile.go @@ -28,6 +28,7 @@ func (q quantiles) Len() int { } func (q quantiles) Less(i, j int) bool { + // TODO(tzdybal) - is it reasonable to use common denominator instead of Decimal() function? return q[i].fraction.Decimal() < q[j].fraction.Decimal() } @@ -45,12 +46,12 @@ func (q quantiles) Get(f fraction) (item shed.Item, position uint64, found bool) } func (q quantiles) Closest(f fraction) (closest *quantile) { - for _, x := range q { + for i, x := range q { if x.fraction == f { - return &x + return &q[i] } if closest == nil || math.Abs(x.Decimal()-f.Decimal()) < math.Abs(closest.Decimal()-f.Decimal()) { - closest = &x + closest = &q[i] } } return closest From 730cba86d76feef0b090140f10c13b44ac324119 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 16 Jan 2020 14:11:45 +0100 Subject: [PATCH 06/18] storage/localstore: more GC quantiles Closest test cases --- storage/localstore/quantile_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/storage/localstore/quantile_test.go b/storage/localstore/quantile_test.go index fc122bc240..5de86c34d1 100644 --- a/storage/localstore/quantile_test.go +++ b/storage/localstore/quantile_test.go @@ -83,7 +83,19 @@ func TestClosest(t *testing.T) { f fraction expected *quantile }{ - {fraction{1, 2}, &data[0]}, + {fraction{1, 2}, &data[0]}, // exact fraction + {fraction{4, 8}, &data[0]}, // almost same as above + {fraction{1, 3}, &data[1]}, // exact fraction + {fraction{3, 9}, &data[1]}, // almost same as above + {fraction{1, 4}, &data[2]}, // exact fraction + {fraction{1, 5}, &data[2]}, // smaller than any quantile + {fraction{2, 3}, &data[3]}, // exact fraction + {fraction{3, 4}, &data[4]}, // exact fraction + {fraction{4, 4}, &data[4]}, // greater than any quantile + {fraction{2, 1}, &data[4]}, // greater than any quantile + {fraction{4, 10}, &data[1]}, // 0.4 closest to 1/3 (0.33) + {fraction{42, 100}, &data[0]}, // 0.42 closest to 1/2 (0.5) + {fraction{7, 10}, &data[3]}, // 0.7 closest to 2/3 (0.66) } for _, check := range checks { From 24dd4c099f69a060af8670196ad131dd67e84e9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 16 Jan 2020 22:05:31 +0100 Subject: [PATCH 07/18] shed: fixed typo --- shed/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shed/index.go b/shed/index.go index 618e0a23da..ad5e3b7fc8 100644 --- a/shed/index.go +++ b/shed/index.go @@ -76,7 +76,7 @@ func (i Item) Merge(i2 Item) (new Item) { // Index represents a set of LevelDB key value pairs that have common // prefix. It holds functions for encoding and decoding keys and values -// to provide transparent actions on saved data which inclide: +// to provide transparent actions on saved data which include: // - getting a particular Item // - saving a particular Item // - iterating over a sorted LevelDB keys From d8a74a7db8db33458f9794ad95275076904f31a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Fri, 21 Feb 2020 11:37:56 +0100 Subject: [PATCH 08/18] storage/localstore: small refactoring / renaming --- storage/localstore/mode_set.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 5fab024d43..6e8e04a264 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -301,11 +301,11 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS db.retrievalAccessIndex.PutInBatch(batch, item) // Add in gcIndex only if this chunk is not pinned - ok, err := db.pinIndex.Has(item) + pinned, err := db.pinIndex.Has(item) if err != nil { return 0, err } - if !ok { + if !pinned { // set access timestamp based on quantile calculated from // chunk proximity and db.responsibilityRadius f := chunkQuantileFraction(int(db.po(item.Address)), db.getResponsibilityRadius()) From 75fea7d7f1203fe41f592a167458816c40dcdfb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Fri, 21 Feb 2020 12:00:10 +0100 Subject: [PATCH 09/18] storage/localstore: GC edge case - first item --- storage/localstore/mode_set.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 6e8e04a264..3374eda106 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -327,7 +327,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS break } } - if !found { + if len(gcQuantiles) > 0 && !found { var shift int64 if closest := gcQuantiles.Closest(f); closest == nil { shift = int64(position) From 137b99416a87ff8ba4340597652e10cbfd3dc11e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Fri, 21 Feb 2020 12:16:17 +0100 Subject: [PATCH 10/18] storage/localstore,swarm: fix linter errors --- storage/localstore/quantile.go | 3 ++- swarm.go | 10 ++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/storage/localstore/quantile.go b/storage/localstore/quantile.go index 0342bb83c8..4b33a0cb2f 100644 --- a/storage/localstore/quantile.go +++ b/storage/localstore/quantile.go @@ -1,9 +1,10 @@ package localstore import ( - "github.com/ethersphere/swarm/shed" "math" "sort" + + "github.com/ethersphere/swarm/shed" ) type fraction struct { diff --git a/swarm.go b/swarm.go index 47697a0e13..08210243f3 100644 --- a/swarm.go +++ b/swarm.go @@ -252,13 +252,11 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e }) for { - select { - case _, ok := <-c: - if !ok { - return - } - localStore.SetResponsibilityRadius(to.NeighbourhoodDepth()) + _, ok := <-c + if !ok { + return } + localStore.SetResponsibilityRadius(to.NeighbourhoodDepth()) } }() From 07fe98eea0be6bed10579d6deabadb3a3857188d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 25 Feb 2020 13:14:59 +0100 Subject: [PATCH 11/18] shed: failing test for Index.Offset --- shed/index_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/shed/index_test.go b/shed/index_test.go index 356cd8e41a..499146da4c 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1104,3 +1104,60 @@ func TestIndex_HasMulti(t *testing.T) { t.Errorf("got %v, want %v", got, want) } } + +func generateItems(n int) []Item { + items := make([]Item, 0, n) + + for i := 0; i < n; i++ { + items = append(items, Item{ + Address: []byte(fmt.Sprintf("hash%03d", i)), + Data: []byte(fmt.Sprintf("data%06d", i)), + StoreTimestamp: time.Now().UnixNano(), + }) + } + + return items +} + +func TestIndexOffset(t *testing.T) { + t.Parallel() + + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index, err := db.NewIndex("retrieval", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + items := generateItems(100) + for _, item := range items { + index.Put(item) + } + + tests := []struct { + start, offset int + }{ + {0, 0}, + {0, 1}, + {0, 50}, + {44, 0}, + {10, 10}, + {0, len(items) - 1}, + {10, -3}, + {10, -10}, + {len(items) - 1, 0}, + {len(items) - 2, 1}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) { + item, err := index.Offset(&items[tc.start], int64(tc.offset)) + if err != nil { + tt.Error(err) + } + checkItem(tt, item, items[tc.start+tc.offset]) + }) + } + +} From c2cdfb911f1f6e015a32112dd5199eeb09c93d45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 25 Feb 2020 13:19:13 +0100 Subject: [PATCH 12/18] shed: more tests for Index.Offset (causing crash) --- shed/index_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/shed/index_test.go b/shed/index_test.go index 499146da4c..1db556a7ba 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1160,4 +1160,20 @@ func TestIndexOffset(t *testing.T) { }) } + // special cases + tests = []struct { + start, offset int + }{ + {0, -1}, + {len(items) - 1, 1}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) { + _, err := index.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + }) + } } From 64e657ec09ba469143b602c46d31f20e315dd4a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 25 Feb 2020 13:29:22 +0100 Subject: [PATCH 13/18] shed: comprehensive test for Index.Offset Even more complicated test - assumes that there are multiple indexes in database (there are many index prefixes), including empty index. There are test cases that checks both positive and erroneous scenarios. Tests are failing, as implementation is incorrect (in many ways). --- shed/index_test.go | 46 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/shed/index_test.go b/shed/index_test.go index 1db556a7ba..3f263b3317 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1120,19 +1120,29 @@ func generateItems(n int) []Item { } func TestIndexOffset(t *testing.T) { - t.Parallel() - db, cleanupFunc := newTestDB(t) defer cleanupFunc() - index, err := db.NewIndex("retrieval", retrievalIndexFuncs) + index1, err := db.NewIndex("test1", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + index2, err := db.NewIndex("test2", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + index3, err := db.NewIndex("test3", retrievalIndexFuncs) if err != nil { t.Fatal(err) } items := generateItems(100) for _, item := range items { - index.Put(item) + index1.Put(item) + index2.Put(item) + index3.Put(item) } tests := []struct { @@ -1152,7 +1162,19 @@ func TestIndexOffset(t *testing.T) { for _, tc := range tests { t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) { - item, err := index.Offset(&items[tc.start], int64(tc.offset)) + item, err := index1.Offset(&items[tc.start], int64(tc.offset)) + if err != nil { + tt.Error(err) + } + checkItem(tt, item, items[tc.start+tc.offset]) + + item, err = index2.Offset(&items[tc.start], int64(tc.offset)) + if err != nil { + tt.Error(err) + } + checkItem(tt, item, items[tc.start+tc.offset]) + + item, err = index3.Offset(&items[tc.start], int64(tc.offset)) if err != nil { tt.Error(err) } @@ -1160,17 +1182,27 @@ func TestIndexOffset(t *testing.T) { }) } - // special cases + // special cases - testing all indexes, to catch all edge cases tests = []struct { start, offset int }{ {0, -1}, {len(items) - 1, 1}, + {0, -1000}, + {len(items) - 1, 1000}, } for _, tc := range tests { t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) { - _, err := index.Offset(&items[tc.start], int64(tc.offset)) + _, err := index1.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + _, err = index2.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + _, err = index3.Offset(&items[tc.start], int64(tc.offset)) if err == nil { tt.Error("expected error") } From 288be9ff5718969ee995e4e1c491bd4e867d9a08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 25 Feb 2020 14:06:07 +0100 Subject: [PATCH 14/18] shed: fixed implementation of Index.Offset + more test cases --- shed/index.go | 11 ++++++++--- shed/index_test.go | 41 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/shed/index.go b/shed/index.go index ad5e3b7fc8..034b8f80f8 100644 --- a/shed/index.go +++ b/shed/index.go @@ -462,21 +462,26 @@ func (f Index) Offset(start *Item, shift int64) (i Item, err error) { it := f.db.NewIterator() defer it.Release() + ok := it.Seek(startKey) + if !ok || bytes.Compare(it.Key(), startKey) != 0 { + return i, errors.New("start Item not found in index") + } + next := it.Next if shift < 0 { next = it.Prev shift *= -1 } - var key []byte - for ok := it.Seek(startKey); ok && shift > 0; ok = next() { + key := startKey + for shift != 0 && next() { key = it.Key() if key[0] != f.prefix[0] { break } shift-- } - if key == nil { + if shift != 0 { return i, errors.New("key not found") } diff --git a/shed/index_test.go b/shed/index_test.go index 3f263b3317..318f0cbef5 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1138,11 +1138,17 @@ func TestIndexOffset(t *testing.T) { t.Fatal(err) } + index4, err := db.NewIndex("test4", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + items := generateItems(100) for _, item := range items { index1.Put(item) index2.Put(item) - index3.Put(item) + // index3 is intentionally empty + index4.Put(item) } tests := []struct { @@ -1175,6 +1181,11 @@ func TestIndexOffset(t *testing.T) { checkItem(tt, item, items[tc.start+tc.offset]) item, err = index3.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + + item, err = index4.Offset(&items[tc.start], int64(tc.offset)) if err != nil { tt.Error(err) } @@ -1206,6 +1217,34 @@ func TestIndexOffset(t *testing.T) { if err == nil { tt.Error("expected error") } + _, err = index4.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } }) } + + t.Run("Invalid start Item", func(tt *testing.T) { + invalid := Item{ + Address: []byte("out-of-index"), + Data: []byte("not so random data"), + AccessTimestamp: time.Now().UnixNano(), + } + _, err := index1.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + _, err = index2.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + _, err = index3.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + _, err = index4.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + }) } From 95e8973a2e64283bca87f1574efd920cc4e56e1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Wed, 26 Feb 2020 11:28:29 +0100 Subject: [PATCH 15/18] shed: Index.Offset - correct handling of nil start Item --- shed/index.go | 7 +++++-- shed/index_test.go | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/shed/index.go b/shed/index.go index 034b8f80f8..3536b1388c 100644 --- a/shed/index.go +++ b/shed/index.go @@ -458,12 +458,15 @@ func (f Index) Offset(start *Item, shift int64) (i Item, err error) { if err != nil { return i, err } + } else { + startKey = f.prefix } + it := f.db.NewIterator() defer it.Release() ok := it.Seek(startKey) - if !ok || bytes.Compare(it.Key(), startKey) != 0 { + if !ok || !bytes.HasPrefix(it.Key(), startKey) { return i, errors.New("start Item not found in index") } @@ -473,7 +476,7 @@ func (f Index) Offset(start *Item, shift int64) (i Item, err error) { shift *= -1 } - key := startKey + key := it.Key() for shift != 0 && next() { key = it.Key() if key[0] != f.prefix[0] { diff --git a/shed/index_test.go b/shed/index_test.go index 318f0cbef5..e2f4cef629 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1247,4 +1247,18 @@ func TestIndexOffset(t *testing.T) { tt.Error("expected error") } }) + + t.Run("nil start Item", func(tt *testing.T) { + item, err := index1.Offset(nil, 0) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[0]) + + item, err = index2.Offset(nil, 10) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[10]) + }) } From 8816765e4232b018fe7e4ee6f25d4997d38c3a31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Fri, 10 Apr 2020 00:10:28 +0200 Subject: [PATCH 16/18] shed,storage/localstore: fix few bugs and add tests Most important issue: quantiles.Set was not altering the original object. This is because, reference to the receiver is passed as a value, so the substitution of `q` with `append` function has only local scope. Second of all, fraction fields needs public visibility, in order to be properly JSON encoded/decoded. --- shed/index.go | 3 +- shed/index_test.go | 12 ++++ storage/localstore/gc.go | 6 +- storage/localstore/gc_test.go | 51 +++++++++++++++ storage/localstore/mode_set.go | 6 +- storage/localstore/quantile.go | 37 +++++------ storage/localstore/quantile_test.go | 98 +++++++++++++++++------------ 7 files changed, 148 insertions(+), 65 deletions(-) diff --git a/shed/index.go b/shed/index.go index 3536b1388c..80b856363e 100644 --- a/shed/index.go +++ b/shed/index.go @@ -19,6 +19,7 @@ package shed import ( "bytes" "errors" + "fmt" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -485,7 +486,7 @@ func (f Index) Offset(start *Item, shift int64) (i Item, err error) { shift-- } if shift != 0 { - return i, errors.New("key not found") + return i, fmt.Errorf("key not found, start: %p, shift: %d", start, shift) } keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...)) diff --git a/shed/index_test.go b/shed/index_test.go index e2f4cef629..00c4aa44ff 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1255,6 +1255,18 @@ func TestIndexOffset(t *testing.T) { } checkItem(t, item, items[0]) + item, err = index1.Offset(nil, 1) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[1]) + + item, err = index1.Offset(nil, -1) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[1]) + item, err = index2.Offset(nil, 10) if err != nil { t.Error(err) diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index 2c4e4d065c..0122a5493a 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -275,11 +275,11 @@ func (db *DB) updateGCQuantiles() (err error) { } var newQuantiles quantiles for _, q := range gcQuantiles { - item, position, found := gcQuantiles.Get(q.fraction) + item, position, found := gcQuantiles.Get(q.Fraction) if !found { continue } - newPosition := quantilePosition(gcSize, q.numerator, q.denominator) + newPosition := quantilePosition(gcSize, q.Numerator, q.Denominator) diff := uint64Diff(position, newPosition) if diff == 0 { continue @@ -288,7 +288,7 @@ func (db *DB) updateGCQuantiles() (err error) { if err != nil { return err } - newQuantiles.Set(q.fraction, newItem, newPosition) + newQuantiles.Set(q.Fraction, newItem, newPosition) } return db.gcQuantiles.Put(newQuantiles) } diff --git a/storage/localstore/gc_test.go b/storage/localstore/gc_test.go index 5eb8dfe7b7..d043561305 100644 --- a/storage/localstore/gc_test.go +++ b/storage/localstore/gc_test.go @@ -529,3 +529,54 @@ func TestSetTestHookCollectGarbage(t *testing.T) { t.Errorf("got hook value %v, want %v", got, original) } } + +func TestUpdateGCQuantiles(t *testing.T) { + dir, err := ioutil.TempDir("", "localstore-gc-quantiles-update") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + baseKey := make([]byte, 32) + if _, err := rand.Read(baseKey); err != nil { + t.Fatal(err) + } + options := &Options{Capacity: 100} + db, err := New(dir, baseKey, options) + if err != nil { + t.Fatal(err) + } + + count := int(2 * options.Capacity) + + for i := 0; i < count; i++ { + ch := generateTestRandomChunk() + + _, err := db.Put(context.Background(), chunk.ModePutRequest, ch) + if err != nil { + t.Fatal(err) + } + + err = db.Set(context.Background(), chunk.ModeSetAccess, ch.Address()) + if err != nil { + t.Fatal(err) + } + + err = db.Set(context.Background(), chunk.ModeSetSyncPush, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + + if err := db.Close(); err != nil { + t.Fatal(err) + } + + db, err = New(dir, baseKey, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + db.collectGarbage() + db.collectGarbage() +} diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 3374eda106..29e6dd6f3e 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -313,7 +313,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS if err != nil { return 0, err } - position := quantilePosition(gcSize, f.numerator, f.denominator) + position := quantilePosition(gcSize, f.Numerator, f.Denominator) var gcQuantiles quantiles err = db.gcQuantiles.Get(&gcQuantiles) if err != nil && err != leveldb.ErrNotFound { @@ -321,7 +321,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS } var found bool for _, q := range gcQuantiles { - if q.fraction == f { + if q.Fraction == f { item.AccessTimestamp = q.Item.AccessTimestamp + 1 found = true break @@ -340,7 +340,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS } item.AccessTimestamp = i.AccessTimestamp + 1 } - gcQuantiles.Set(f, item, position) + gcQuantiles = gcQuantiles.Set(f, item, position) db.gcQuantiles.PutInBatch(batch, gcQuantiles) db.retrievalAccessIndex.PutInBatch(batch, item) diff --git a/storage/localstore/quantile.go b/storage/localstore/quantile.go index 4b33a0cb2f..e3105228f1 100644 --- a/storage/localstore/quantile.go +++ b/storage/localstore/quantile.go @@ -7,17 +7,17 @@ import ( "github.com/ethersphere/swarm/shed" ) -type fraction struct { - numerator uint64 - denominator uint64 +type Fraction struct { + Numerator uint64 + Denominator uint64 } -func (f fraction) Decimal() float64 { - return float64(f.numerator) / float64(f.denominator) +func (f Fraction) Decimal() float64 { + return float64(f.Numerator) / float64(f.Denominator) } type quantile struct { - fraction + Fraction Item shed.Item Position uint64 } @@ -30,25 +30,25 @@ func (q quantiles) Len() int { func (q quantiles) Less(i, j int) bool { // TODO(tzdybal) - is it reasonable to use common denominator instead of Decimal() function? - return q[i].fraction.Decimal() < q[j].fraction.Decimal() + return q[i].Fraction.Decimal() < q[j].Fraction.Decimal() } func (q quantiles) Swap(i, j int) { q[i], q[j] = q[j], q[i] } -func (q quantiles) Get(f fraction) (item shed.Item, position uint64, found bool) { +func (q quantiles) Get(f Fraction) (item shed.Item, position uint64, found bool) { for _, x := range q { - if x.fraction == f { + if x.Fraction == f { return x.Item, x.Position, true } } return item, 0, false } -func (q quantiles) Closest(f fraction) (closest *quantile) { +func (q quantiles) Closest(f Fraction) (closest *quantile) { for i, x := range q { - if x.fraction == f { + if x.Fraction == f { return &q[i] } if closest == nil || math.Abs(x.Decimal()-f.Decimal()) < math.Abs(closest.Decimal()-f.Decimal()) { @@ -58,19 +58,20 @@ func (q quantiles) Closest(f fraction) (closest *quantile) { return closest } -func (q quantiles) Set(f fraction, item shed.Item, position uint64) { +func (q quantiles) Set(f Fraction, item shed.Item, position uint64) quantiles { for i := range q { - if q[i].fraction == f { + if q[i].Fraction == f { q[i].Item = item q[i].Position = position - return + return q } } q = append(q, quantile{ - fraction: f, + Fraction: f, Item: item, }) sort.Sort(q) + return q } func quantilePosition(total, numerator, denominator uint64) uint64 { @@ -79,12 +80,12 @@ func quantilePosition(total, numerator, denominator uint64) uint64 { // based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks // TODO: review and document exact quantiles for chunks -func chunkQuantileFraction(po, responsibilityRadius int) fraction { +func chunkQuantileFraction(po, responsibilityRadius int) Fraction { if po < responsibilityRadius { // More Distant Chunks n := uint64(responsibilityRadius - po) - return fraction{numerator: n, denominator: n + 1} + return Fraction{Numerator: n, Denominator: n + 1} } // Most Proximate Chunks - return fraction{numerator: 1, denominator: 3} + return Fraction{Numerator: 1, Denominator: 3} } diff --git a/storage/localstore/quantile_test.go b/storage/localstore/quantile_test.go index 5de86c34d1..6d119a4e96 100644 --- a/storage/localstore/quantile_test.go +++ b/storage/localstore/quantile_test.go @@ -6,6 +6,8 @@ import ( "math/rand" "sort" "testing" + + "github.com/ethersphere/swarm/shed" ) func TestSorting(t *testing.T) { @@ -18,7 +20,7 @@ func TestSorting(t *testing.T) { for i := 1; i < n; i++ { // compare without trusting any methods - use common denominator x, y := data[i], data[i-1] - if x.numerator*y.denominator < y.numerator*x.denominator { + if x.Numerator*y.Denominator < y.Numerator*x.Denominator { t.Error("quantiles not ordered correctly") } } @@ -30,19 +32,19 @@ func TestFraction(t *testing.T) { tolerance := float64(0.0000001) tests := []struct { - f fraction + f Fraction r float64 }{ - {fraction{1, 2}, 0.5}, - {fraction{2, 4}, 0.5}, - {fraction{3, 4}, 0.75}, - {fraction{4, 5}, 0.8}, - {fraction{1, 5}, 0.2}, - {fraction{99, 100}, 0.99}, + {Fraction{1, 2}, 0.5}, + {Fraction{2, 4}, 0.5}, + {Fraction{3, 4}, 0.75}, + {Fraction{4, 5}, 0.8}, + {Fraction{1, 5}, 0.2}, + {Fraction{99, 100}, 0.99}, } for _, test := range tests { - t.Run(fmt.Sprintf("Fraction %d/%d", test.f.numerator, test.f.denominator), func(tt *testing.T) { + t.Run(fmt.Sprintf("Fraction %d/%d", test.f.Numerator, test.f.Denominator), func(tt *testing.T) { if math.Abs(test.f.Decimal()-test.r) > tolerance { tt.Errorf("expected: %f, got: %f", test.r, test.f.Decimal()) } @@ -54,23 +56,23 @@ func TestClosest(t *testing.T) { t.Parallel() emptyQ := make(quantiles, 0) - if emptyQ.Closest(fraction{1, 2}) != nil { + if emptyQ.Closest(Fraction{1, 2}) != nil { t.Error("expected nil") } data := make(quantiles, 5) - data[0].numerator = 1 - data[0].denominator = 2 - data[1].numerator = 1 - data[1].denominator = 3 - data[2].numerator = 1 - data[2].denominator = 4 - data[3].numerator = 2 - data[3].denominator = 3 - data[4].numerator = 3 - data[4].denominator = 4 - - sanityChecks := []fraction{ + data[0].Numerator = 1 + data[0].Denominator = 2 + data[1].Numerator = 1 + data[1].Denominator = 3 + data[2].Numerator = 1 + data[2].Denominator = 4 + data[3].Numerator = 2 + data[3].Denominator = 3 + data[4].Numerator = 3 + data[4].Denominator = 4 + + sanityChecks := []Fraction{ {0, 1}, {1, 1}, {1, 2}, {2, 2}, {2, 1}, } for _, check := range sanityChecks { @@ -80,32 +82,48 @@ func TestClosest(t *testing.T) { } checks := []struct { - f fraction + f Fraction expected *quantile }{ - {fraction{1, 2}, &data[0]}, // exact fraction - {fraction{4, 8}, &data[0]}, // almost same as above - {fraction{1, 3}, &data[1]}, // exact fraction - {fraction{3, 9}, &data[1]}, // almost same as above - {fraction{1, 4}, &data[2]}, // exact fraction - {fraction{1, 5}, &data[2]}, // smaller than any quantile - {fraction{2, 3}, &data[3]}, // exact fraction - {fraction{3, 4}, &data[4]}, // exact fraction - {fraction{4, 4}, &data[4]}, // greater than any quantile - {fraction{2, 1}, &data[4]}, // greater than any quantile - {fraction{4, 10}, &data[1]}, // 0.4 closest to 1/3 (0.33) - {fraction{42, 100}, &data[0]}, // 0.42 closest to 1/2 (0.5) - {fraction{7, 10}, &data[3]}, // 0.7 closest to 2/3 (0.66) + {Fraction{1, 2}, &data[0]}, // exact Fraction + {Fraction{4, 8}, &data[0]}, // almost same as above + {Fraction{1, 3}, &data[1]}, // exact Fraction + {Fraction{3, 9}, &data[1]}, // almost same as above + {Fraction{1, 4}, &data[2]}, // exact Fraction + {Fraction{1, 5}, &data[2]}, // smaller than any quantile + {Fraction{2, 3}, &data[3]}, // exact Fraction + {Fraction{3, 4}, &data[4]}, // exact Fraction + {Fraction{4, 4}, &data[4]}, // greater than any quantile + {Fraction{2, 1}, &data[4]}, // greater than any quantile + {Fraction{4, 10}, &data[1]}, // 0.4 closest to 1/3 (0.33) + {Fraction{42, 100}, &data[0]}, // 0.42 closest to 1/2 (0.5) + {Fraction{7, 10}, &data[3]}, // 0.7 closest to 2/3 (0.66) } for _, check := range checks { if c := data.Closest(check.f); c != check.expected { - t.Errorf("invalid quantile: expected fraction: %d/%d, got: %d/%d", - check.expected.numerator, check.expected.denominator, c.numerator, c.denominator) + t.Errorf("invalid quantile: expected Fraction: %d/%d, got: %d/%d", + check.expected.Numerator, check.expected.Denominator, c.Numerator, c.Denominator) } } } +func TestSet(t *testing.T) { + t.Parallel() + + var q quantiles + + if len(q) != 0 { + t.Errorf("Prerequisite is false") + } + + q = q.Set(Fraction{1, 1}, shed.Item{}, 1) + + if len(q) == 0 { + t.Errorf("quantiles.Set doesn't work") + } +} + func getRandomQuantiles(n int, seed int64) quantiles { m := uint64(100 * n) data := make(quantiles, n) @@ -113,8 +131,8 @@ func getRandomQuantiles(n int, seed int64) quantiles { rnd := rand.New(rand.NewSource(seed)) // fixed seed for repeatable tests for i := 0; i < n; i++ { - data[i].numerator = rnd.Uint64() % m - data[i].denominator = rnd.Uint64() % m + data[i].Numerator = rnd.Uint64() % m + data[i].Denominator = rnd.Uint64() % m } return data } From 4a2d7b03bd5f9967b33c58e00b9d0408bc6837cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 14 Apr 2020 14:33:10 +0200 Subject: [PATCH 17/18] shed: fix index test --- shed/index_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/shed/index_test.go b/shed/index_test.go index 00c4aa44ff..42b7a462e0 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1262,10 +1262,9 @@ func TestIndexOffset(t *testing.T) { checkItem(t, item, items[1]) item, err = index1.Offset(nil, -1) - if err != nil { - t.Error(err) + if err == nil { + t.Error("Error was expected!") } - checkItem(t, item, items[1]) item, err = index2.Offset(nil, 10) if err != nil { From 5169578478a8ff5db10bd7a67f59f7526a5484c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 9 Jun 2020 12:48:32 +0200 Subject: [PATCH 18/18] storage/localstore: cleanup and squash of GC changes --- storage/localstore/gc.go | 51 ++++++++++++------ storage/localstore/gc_test.go | 1 - storage/localstore/localstore.go | 3 ++ storage/localstore/mode_set.go | 88 +++++++++++++++++--------------- storage/localstore/quantile.go | 17 ++---- 5 files changed, 89 insertions(+), 71 deletions(-) diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index 0122a5493a..d16729de20 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) @@ -40,6 +41,36 @@ var ( gcBatchSize uint64 = 200 ) +type GCPolicy interface { + GetEvictionMetric(addr chunk.Address) Fraction + GetAdmissionMetric(addr chunk.Address) Fraction +} + +// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks +type DefaultGCPolicy struct { + db *DB +} + +// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks +func (p *DefaultGCPolicy) GetEvictionMetric(addr chunk.Address) Fraction { + item := addressToItem(addr) + po := int(p.db.po(item.Address)) + responsibilityRadius := p.db.getResponsibilityRadius() + + if po < responsibilityRadius { + // More Distant Chunks + n := uint64(responsibilityRadius - po) + return Fraction{Numerator: n, Denominator: n + 1} + } + // Most Proximate Chunks + return Fraction{Numerator: 1, Denominator: 3} +} + +// GetAdmissionMetric is currently not in use +func (p *DefaultGCPolicy) GetAdmissionMetric(addr chunk.Address) Fraction { + return Fraction{1, 1} +} + // collectGarbageWorker is a long running function that waits for // collectGarbageTrigger channel to signal a garbage collection // run. GC run iterates on gcIndex and removes older items @@ -96,7 +127,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { // run through the recently pinned chunks and // remove them from the gcIndex before iterating through gcIndex - err = db.removeChunksInExcludeIndexFromGC() + err = db.syncExcludeAndGCIndex() if err != nil { log.Error("localstore exclude pinned chunks", "err", err) return 0, true, err @@ -148,8 +179,8 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { return collectedCount, done, err } -// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex. -func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { +// syncExcludeAndGCIndex removes any chunks in the exclude Index, from the gcIndex. +func (db *DB) syncExcludeAndGCIndex() (err error) { metricName := "localstore/gc/exclude" metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(metricName, time.Now()) @@ -275,20 +306,8 @@ func (db *DB) updateGCQuantiles() (err error) { } var newQuantiles quantiles for _, q := range gcQuantiles { - item, position, found := gcQuantiles.Get(q.Fraction) - if !found { - continue - } newPosition := quantilePosition(gcSize, q.Numerator, q.Denominator) - diff := uint64Diff(position, newPosition) - if diff == 0 { - continue - } - newItem, err := db.gcIndex.Offset(&item, diff) - if err != nil { - return err - } - newQuantiles.Set(q.Fraction, newItem, newPosition) + newQuantiles = newQuantiles.Set(q.Fraction, q.Item, newPosition) } return db.gcQuantiles.Put(newQuantiles) } diff --git a/storage/localstore/gc_test.go b/storage/localstore/gc_test.go index d043561305..5ecefb06ad 100644 --- a/storage/localstore/gc_test.go +++ b/storage/localstore/gc_test.go @@ -578,5 +578,4 @@ func TestUpdateGCQuantiles(t *testing.T) { defer db.Close() db.collectGarbage() - db.collectGarbage() } diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index d6d8a65de1..ca906de648 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -89,6 +89,8 @@ type DB struct { // pin files Index pinIndex shed.Index + gcPolicy GCPolicy + // field that stores number of intems in gc index gcSize shed.Uint64Field @@ -183,6 +185,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { collectGarbageWorkerDone: make(chan struct{}), putToGCCheck: o.PutToGCCheck, } + db.gcPolicy = &DefaultGCPolicy{db} if db.capacity <= 0 { db.capacity = defaultCapacity } diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 29e6dd6f3e..2f952cc47a 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/ethersphere/swarm/shed" "time" "github.com/ethereum/go-ethereum/metrics" @@ -201,7 +202,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS // provided by the access function, and it is not // a property of a chunk provided to Accessor.Put. - i, err := db.retrievalDataIndex.Get(item) + accessIndexItem, err := db.retrievalDataIndex.Get(item) if err != nil { if err == leveldb.ErrNotFound { // chunk is not found, @@ -213,8 +214,8 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS } return 0, err } - item.StoreTimestamp = i.StoreTimestamp - item.BinID = i.BinID + item.StoreTimestamp = accessIndexItem.StoreTimestamp + item.BinID = accessIndexItem.BinID switch mode { case chunk.ModeSetSyncPull: @@ -286,10 +287,10 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS db.pushIndex.DeleteInBatch(batch, item) } - i, err = db.retrievalAccessIndex.Get(item) + accessIndexItem, err = db.retrievalAccessIndex.Get(item) switch err { case nil: - item.AccessTimestamp = i.AccessTimestamp + item.AccessTimestamp = accessIndexItem.AccessTimestamp db.gcIndex.DeleteInBatch(batch, item) gcSizeChange-- case leveldb.ErrNotFound: @@ -306,50 +307,57 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS return 0, err } if !pinned { - // set access timestamp based on quantile calculated from - // chunk proximity and db.responsibilityRadius - f := chunkQuantileFraction(int(db.po(item.Address)), db.getResponsibilityRadius()) - gcSize, err := db.gcSize.Get() + err := db.addToGC(batch, item, accessIndexItem) if err != nil { return 0, err } - position := quantilePosition(gcSize, f.Numerator, f.Denominator) - var gcQuantiles quantiles - err = db.gcQuantiles.Get(&gcQuantiles) - if err != nil && err != leveldb.ErrNotFound { - return 0, err + gcSizeChange++ + } + + return gcSizeChange, nil +} + +func (db *DB) addToGC(batch *leveldb.Batch, item shed.Item, i shed.Item) error { + // set access timestamp based on quantile calculated from + // chunk proximity and db.responsibilityRadius + f := db.gcPolicy.GetEvictionMetric(item.Address) + gcSize, err := db.gcSize.Get() + if err != nil { + return err + } + position := quantilePosition(gcSize, f.Numerator, f.Denominator) + var gcQuantiles quantiles + err = db.gcQuantiles.Get(&gcQuantiles) + if err != nil && err != leveldb.ErrNotFound { + return err + } + var found bool + for _, q := range gcQuantiles { + if q.Fraction == f { + item.AccessTimestamp = q.Item.AccessTimestamp + 1 + found = true + break } - var found bool - for _, q := range gcQuantiles { - if q.Fraction == f { - item.AccessTimestamp = q.Item.AccessTimestamp + 1 - found = true - break - } + } + if len(gcQuantiles) > 0 && !found { + var shift int64 + if closest := gcQuantiles.Closest(f); closest == nil { + shift = int64(position) + } else { + shift = int64(closest.Position) - int64(position) } - if len(gcQuantiles) > 0 && !found { - var shift int64 - if closest := gcQuantiles.Closest(f); closest == nil { - shift = int64(position) - } else { - shift = int64(closest.Position) - int64(position) - } - i, err = db.gcIndex.Offset(nil, shift) - if err != nil { - return 0, err - } + i, err = db.gcIndex.Offset(nil, shift) + if err == nil { item.AccessTimestamp = i.AccessTimestamp + 1 } - gcQuantiles = gcQuantiles.Set(f, item, position) - db.gcQuantiles.PutInBatch(batch, gcQuantiles) - - db.retrievalAccessIndex.PutInBatch(batch, item) - db.pushIndex.DeleteInBatch(batch, item) - db.gcIndex.PutInBatch(batch, item) - gcSizeChange++ } + gcQuantiles = gcQuantiles.Set(f, item, position) + db.gcQuantiles.PutInBatch(batch, gcQuantiles) - return gcSizeChange, nil + db.retrievalAccessIndex.PutInBatch(batch, item) + db.pushIndex.DeleteInBatch(batch, item) + db.gcIndex.PutInBatch(batch, item) + return nil } // setRemove removes the chunk by updating indexes: diff --git a/storage/localstore/quantile.go b/storage/localstore/quantile.go index e3105228f1..206aabb348 100644 --- a/storage/localstore/quantile.go +++ b/storage/localstore/quantile.go @@ -66,26 +66,15 @@ func (q quantiles) Set(f Fraction, item shed.Item, position uint64) quantiles { return q } } - q = append(q, quantile{ + newQ := append(q, quantile{ Fraction: f, Item: item, }) - sort.Sort(q) - return q + sort.Sort(newQ) + return newQ } func quantilePosition(total, numerator, denominator uint64) uint64 { return total / denominator * numerator } -// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks -// TODO: review and document exact quantiles for chunks -func chunkQuantileFraction(po, responsibilityRadius int) Fraction { - if po < responsibilityRadius { - // More Distant Chunks - n := uint64(responsibilityRadius - po) - return Fraction{Numerator: n, Denominator: n + 1} - } - // Most Proximate Chunks - return Fraction{Numerator: 1, Denominator: 3} -}