diff --git a/go.mod b/go.mod index a64125cf9a..53a0a5d33c 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/googleapis/gnostic v0.0.0-20190624222214-25d8b0b66985 // indirect github.com/gorilla/mux v1.7.3 // indirect github.com/hashicorp/golang-lru v0.5.3 + github.com/influxdata/influxdb v0.0.0-20180221223340-01288bdb0883 github.com/json-iterator/go v1.1.7 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect diff --git a/shed/field_json.go b/shed/field_json.go new file mode 100644 index 0000000000..56938d67d5 --- /dev/null +++ b/shed/field_json.go @@ -0,0 +1,72 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package shed + +import ( + "encoding/json" + + "github.com/syndtr/goleveldb/leveldb" +) + +// JSONField is a helper to store complex structure by +// encoding it in JSON format. +type JSONField struct { + db *DB + key []byte +} + +// NewJSONField returns a new JSONField. +// It validates its name and type against the database schema. +func (db *DB) NewJSONField(name string) (f JSONField, err error) { + key, err := db.schemaFieldKey(name, "json") + if err != nil { + return f, err + } + return JSONField{ + db: db, + key: key, + }, nil +} + +// Get unmarshals data from the database to a provided val. +// If the data is not found leveldb.ErrNotFound is returned. +func (f JSONField) Get(val interface{}) (err error) { + b, err := f.db.Get(f.key) + if err != nil { + return err + } + return json.Unmarshal(b, val) +} + +// Put marshals provided val and saves it to the database. +func (f JSONField) Put(val interface{}) (err error) { + b, err := json.Marshal(val) + if err != nil { + return err + } + return f.db.Put(f.key, b) +} + +// PutInBatch marshals provided val and puts it into the batch. +func (f JSONField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error) { + b, err := json.Marshal(val) + if err != nil { + return err + } + batch.Put(f.key, b) + return nil +} diff --git a/shed/index.go b/shed/index.go index 8dbb74bb1e..80b856363e 100644 --- a/shed/index.go +++ b/shed/index.go @@ -18,6 +18,8 @@ package shed import ( "bytes" + "errors" + "fmt" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -75,7 +77,7 @@ func (i Item) Merge(i2 Item) (new Item) { // Index represents a set of LevelDB key value pairs that have common // prefix. It holds functions for encoding and decoding keys and values -// to provide transparent actions on saved data which inclide: +// to provide transparent actions on saved data which include: // - getting a particular Item // - saving a particular Item // - iterating over a sorted LevelDB keys @@ -449,3 +451,52 @@ func (f Index) CountFrom(start Item) (count int, err error) { } return count, it.Error() } + +func (f Index) Offset(start *Item, shift int64) (i Item, err error) { + var startKey []byte + if start != nil { + startKey, err = f.encodeKeyFunc(*start) + if err != nil { + return i, err + } + } else { + startKey = f.prefix + } + + it := f.db.NewIterator() + defer it.Release() + + ok := it.Seek(startKey) + if !ok || !bytes.HasPrefix(it.Key(), startKey) { + return i, errors.New("start Item not found in index") + } + + next := it.Next + if shift < 0 { + next = it.Prev + shift *= -1 + } + + key := it.Key() + for shift != 0 && next() { + key = it.Key() + if key[0] != f.prefix[0] { + break + } + shift-- + } + if shift != 0 { + return i, fmt.Errorf("key not found, start: %p, shift: %d", start, shift) + } + + keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...)) + if err != nil { + return i, err + } + // create a copy of value byte slice not to share leveldb underlaying slice array + valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...)) + if err != nil { + return i, err + } + return keyItem.Merge(valueItem), it.Error() +} diff --git a/shed/index_test.go b/shed/index_test.go index 356cd8e41a..42b7a462e0 100644 --- a/shed/index_test.go +++ b/shed/index_test.go @@ -1104,3 +1104,172 @@ func TestIndex_HasMulti(t *testing.T) { t.Errorf("got %v, want %v", got, want) } } + +func generateItems(n int) []Item { + items := make([]Item, 0, n) + + for i := 0; i < n; i++ { + items = append(items, Item{ + Address: []byte(fmt.Sprintf("hash%03d", i)), + Data: []byte(fmt.Sprintf("data%06d", i)), + StoreTimestamp: time.Now().UnixNano(), + }) + } + + return items +} + +func TestIndexOffset(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index1, err := db.NewIndex("test1", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + index2, err := db.NewIndex("test2", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + index3, err := db.NewIndex("test3", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + index4, err := db.NewIndex("test4", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + items := generateItems(100) + for _, item := range items { + index1.Put(item) + index2.Put(item) + // index3 is intentionally empty + index4.Put(item) + } + + tests := []struct { + start, offset int + }{ + {0, 0}, + {0, 1}, + {0, 50}, + {44, 0}, + {10, 10}, + {0, len(items) - 1}, + {10, -3}, + {10, -10}, + {len(items) - 1, 0}, + {len(items) - 2, 1}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) { + item, err := index1.Offset(&items[tc.start], int64(tc.offset)) + if err != nil { + tt.Error(err) + } + checkItem(tt, item, items[tc.start+tc.offset]) + + item, err = index2.Offset(&items[tc.start], int64(tc.offset)) + if err != nil { + tt.Error(err) + } + checkItem(tt, item, items[tc.start+tc.offset]) + + item, err = index3.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + + item, err = index4.Offset(&items[tc.start], int64(tc.offset)) + if err != nil { + tt.Error(err) + } + checkItem(tt, item, items[tc.start+tc.offset]) + }) + } + + // special cases - testing all indexes, to catch all edge cases + tests = []struct { + start, offset int + }{ + {0, -1}, + {len(items) - 1, 1}, + {0, -1000}, + {len(items) - 1, 1000}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%d_%d", tc.start, tc.offset), func(tt *testing.T) { + _, err := index1.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + _, err = index2.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + _, err = index3.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + _, err = index4.Offset(&items[tc.start], int64(tc.offset)) + if err == nil { + tt.Error("expected error") + } + }) + } + + t.Run("Invalid start Item", func(tt *testing.T) { + invalid := Item{ + Address: []byte("out-of-index"), + Data: []byte("not so random data"), + AccessTimestamp: time.Now().UnixNano(), + } + _, err := index1.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + _, err = index2.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + _, err = index3.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + _, err = index4.Offset(&invalid, 0) + if err == nil { + tt.Error("expected error") + } + }) + + t.Run("nil start Item", func(tt *testing.T) { + item, err := index1.Offset(nil, 0) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[0]) + + item, err = index1.Offset(nil, 1) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[1]) + + item, err = index1.Offset(nil, -1) + if err == nil { + t.Error("Error was expected!") + } + + item, err = index2.Offset(nil, 10) + if err != nil { + t.Error(err) + } + checkItem(t, item, items[10]) + }) +} diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index 6373ecc7d7..d16729de20 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) @@ -40,6 +41,36 @@ var ( gcBatchSize uint64 = 200 ) +type GCPolicy interface { + GetEvictionMetric(addr chunk.Address) Fraction + GetAdmissionMetric(addr chunk.Address) Fraction +} + +// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks +type DefaultGCPolicy struct { + db *DB +} + +// based on https://hackmd.io/t-OQFK3mTsGfrpLCqDrdlw#Synced-chunks +func (p *DefaultGCPolicy) GetEvictionMetric(addr chunk.Address) Fraction { + item := addressToItem(addr) + po := int(p.db.po(item.Address)) + responsibilityRadius := p.db.getResponsibilityRadius() + + if po < responsibilityRadius { + // More Distant Chunks + n := uint64(responsibilityRadius - po) + return Fraction{Numerator: n, Denominator: n + 1} + } + // Most Proximate Chunks + return Fraction{Numerator: 1, Denominator: 3} +} + +// GetAdmissionMetric is currently not in use +func (p *DefaultGCPolicy) GetAdmissionMetric(addr chunk.Address) Fraction { + return Fraction{1, 1} +} + // collectGarbageWorker is a long running function that waits for // collectGarbageTrigger channel to signal a garbage collection // run. GC run iterates on gcIndex and removes older items @@ -62,7 +93,7 @@ func (db *DB) collectGarbageWorker() { db.triggerGarbageCollection() } - if testHookCollectGarbage != nil { + if collectedCount > 0 && testHookCollectGarbage != nil { testHookCollectGarbage(collectedCount) } case <-db.close: @@ -90,13 +121,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { batch := new(leveldb.Batch) target := db.gcTarget() - // protect database from changing idexes and gcSize + // protect database from changing indexes and gcSize db.batchMu.Lock() defer db.batchMu.Unlock() // run through the recently pinned chunks and // remove them from the gcIndex before iterating through gcIndex - err = db.removeChunksInExcludeIndexFromGC() + err = db.syncExcludeAndGCIndex() if err != nil { log.Error("localstore exclude pinned chunks", "err", err) return 0, true, err @@ -124,7 +155,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { db.gcIndex.DeleteInBatch(batch, item) collectedCount++ if collectedCount >= gcBatchSize { - // bach size limit reached, + // batch size limit reached, // another gc run is needed done = false return true, nil @@ -143,11 +174,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { metrics.GetOrRegisterCounter(metricName+"/writebatch/err", nil).Inc(1) return 0, false, err } - return collectedCount, done, nil + + err = db.updateGCQuantiles() + return collectedCount, done, err } -// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex. -func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { +// syncExcludeAndGCIndex removes any chunks in the exclude Index, from the gcIndex. +func (db *DB) syncExcludeAndGCIndex() (err error) { metricName := "localstore/gc/exclude" metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(metricName, time.Now()) @@ -258,6 +291,34 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { return nil } +// updateGCQuantiles adjusts gc quantiles based on the +// current gc size, by comparing stored quantile +// positions with the ones calculated from the current +// gc index size. +func (db *DB) updateGCQuantiles() (err error) { + var gcQuantiles quantiles + if err = db.gcQuantiles.Get(&gcQuantiles); err != nil { + return err + } + gcSize, err := db.gcSize.Get() + if err != nil { + return err + } + var newQuantiles quantiles + for _, q := range gcQuantiles { + newPosition := quantilePosition(gcSize, q.Numerator, q.Denominator) + newQuantiles = newQuantiles.Set(q.Fraction, q.Item, newPosition) + } + return db.gcQuantiles.Put(newQuantiles) +} + +func uint64Diff(one, two uint64) int64 { + if one > two { + return int64(one - two) + } + return -1 * int64(two-one) +} + // testHookCollectGarbage is a hook that can provide // information when a garbage collection run is done // and how many items it removed. diff --git a/storage/localstore/gc_test.go b/storage/localstore/gc_test.go index 5eb8dfe7b7..5ecefb06ad 100644 --- a/storage/localstore/gc_test.go +++ b/storage/localstore/gc_test.go @@ -529,3 +529,53 @@ func TestSetTestHookCollectGarbage(t *testing.T) { t.Errorf("got hook value %v, want %v", got, original) } } + +func TestUpdateGCQuantiles(t *testing.T) { + dir, err := ioutil.TempDir("", "localstore-gc-quantiles-update") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + baseKey := make([]byte, 32) + if _, err := rand.Read(baseKey); err != nil { + t.Fatal(err) + } + options := &Options{Capacity: 100} + db, err := New(dir, baseKey, options) + if err != nil { + t.Fatal(err) + } + + count := int(2 * options.Capacity) + + for i := 0; i < count; i++ { + ch := generateTestRandomChunk() + + _, err := db.Put(context.Background(), chunk.ModePutRequest, ch) + if err != nil { + t.Fatal(err) + } + + err = db.Set(context.Background(), chunk.ModeSetAccess, ch.Address()) + if err != nil { + t.Fatal(err) + } + + err = db.Set(context.Background(), chunk.ModeSetSyncPush, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + + if err := db.Close(); err != nil { + t.Fatal(err) + } + + db, err = New(dir, baseKey, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + db.collectGarbage() +} diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index fb17ffe2c6..ca906de648 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -89,9 +89,15 @@ type DB struct { // pin files Index pinIndex shed.Index + gcPolicy GCPolicy + // field that stores number of intems in gc index gcSize shed.Uint64Field + // field that stores all positions (item and index) + // of all gc quantiles + gcQuantiles shed.JSONField + // garbage collection is triggered when gcSize exceeds // the capacity value capacity uint64 @@ -107,6 +113,9 @@ type DB struct { // are done before closing the database updateGCWG sync.WaitGroup + responsibilityRadius int + responsibilityRadiusMu sync.RWMutex + baseKey []byte batchMu sync.Mutex @@ -176,6 +185,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { collectGarbageWorkerDone: make(chan struct{}), putToGCCheck: o.PutToGCCheck, } + db.gcPolicy = &DefaultGCPolicy{db} if db.capacity <= 0 { db.capacity = defaultCapacity } @@ -386,6 +396,8 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if err != nil { return nil, err } + // create a bucket for gc index quantiles + db.gcQuantiles, err = db.shed.NewJSONField("gc-quantiles") // Create a index structure for storing pinned chunks and their pin counts db.pinIndex, err = db.shed.NewIndex("Hash->PinCounter", shed.IndexFuncs{ @@ -494,6 +506,20 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) { return indexInfo, err } +func (db *DB) SetResponsibilityRadius(r int) { + db.responsibilityRadiusMu.Lock() + defer db.responsibilityRadiusMu.Unlock() + + db.responsibilityRadius = r +} + +func (db *DB) getResponsibilityRadius() (r int) { + db.responsibilityRadiusMu.RLock() + defer db.responsibilityRadiusMu.RUnlock() + + return db.responsibilityRadius +} + // chunkToItem creates new Item with data provided by the Chunk. func chunkToItem(ch chunk.Chunk) shed.Item { return shed.Item{ diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 7703532fdc..2f952cc47a 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/ethersphere/swarm/shed" "time" "github.com/ethereum/go-ethereum/metrics" @@ -201,7 +202,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS // provided by the access function, and it is not // a property of a chunk provided to Accessor.Put. - i, err := db.retrievalDataIndex.Get(item) + accessIndexItem, err := db.retrievalDataIndex.Get(item) if err != nil { if err == leveldb.ErrNotFound { // chunk is not found, @@ -213,8 +214,8 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS } return 0, err } - item.StoreTimestamp = i.StoreTimestamp - item.BinID = i.BinID + item.StoreTimestamp = accessIndexItem.StoreTimestamp + item.BinID = accessIndexItem.BinID switch mode { case chunk.ModeSetSyncPull: @@ -286,10 +287,10 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS db.pushIndex.DeleteInBatch(batch, item) } - i, err = db.retrievalAccessIndex.Get(item) + accessIndexItem, err = db.retrievalAccessIndex.Get(item) switch err { case nil: - item.AccessTimestamp = i.AccessTimestamp + item.AccessTimestamp = accessIndexItem.AccessTimestamp db.gcIndex.DeleteInBatch(batch, item) gcSizeChange-- case leveldb.ErrNotFound: @@ -301,12 +302,12 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS db.retrievalAccessIndex.PutInBatch(batch, item) // Add in gcIndex only if this chunk is not pinned - ok, err := db.pinIndex.Has(item) + pinned, err := db.pinIndex.Has(item) if err != nil { return 0, err } - if !ok { - err = db.gcIndex.PutInBatch(batch, item) + if !pinned { + err := db.addToGC(batch, item, accessIndexItem) if err != nil { return 0, err } @@ -316,6 +317,49 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS return gcSizeChange, nil } +func (db *DB) addToGC(batch *leveldb.Batch, item shed.Item, i shed.Item) error { + // set access timestamp based on quantile calculated from + // chunk proximity and db.responsibilityRadius + f := db.gcPolicy.GetEvictionMetric(item.Address) + gcSize, err := db.gcSize.Get() + if err != nil { + return err + } + position := quantilePosition(gcSize, f.Numerator, f.Denominator) + var gcQuantiles quantiles + err = db.gcQuantiles.Get(&gcQuantiles) + if err != nil && err != leveldb.ErrNotFound { + return err + } + var found bool + for _, q := range gcQuantiles { + if q.Fraction == f { + item.AccessTimestamp = q.Item.AccessTimestamp + 1 + found = true + break + } + } + if len(gcQuantiles) > 0 && !found { + var shift int64 + if closest := gcQuantiles.Closest(f); closest == nil { + shift = int64(position) + } else { + shift = int64(closest.Position) - int64(position) + } + i, err = db.gcIndex.Offset(nil, shift) + if err == nil { + item.AccessTimestamp = i.AccessTimestamp + 1 + } + } + gcQuantiles = gcQuantiles.Set(f, item, position) + db.gcQuantiles.PutInBatch(batch, gcQuantiles) + + db.retrievalAccessIndex.PutInBatch(batch, item) + db.pushIndex.DeleteInBatch(batch, item) + db.gcIndex.PutInBatch(batch, item) + return nil +} + // setRemove removes the chunk by updating indexes: // - delete from retrieve, pull, gc // Provided batch is updated. diff --git a/storage/localstore/quantile.go b/storage/localstore/quantile.go new file mode 100644 index 0000000000..206aabb348 --- /dev/null +++ b/storage/localstore/quantile.go @@ -0,0 +1,80 @@ +package localstore + +import ( + "math" + "sort" + + "github.com/ethersphere/swarm/shed" +) + +type Fraction struct { + Numerator uint64 + Denominator uint64 +} + +func (f Fraction) Decimal() float64 { + return float64(f.Numerator) / float64(f.Denominator) +} + +type quantile struct { + Fraction + Item shed.Item + Position uint64 +} + +type quantiles []quantile + +func (q quantiles) Len() int { + return len(q) +} + +func (q quantiles) Less(i, j int) bool { + // TODO(tzdybal) - is it reasonable to use common denominator instead of Decimal() function? + return q[i].Fraction.Decimal() < q[j].Fraction.Decimal() +} + +func (q quantiles) Swap(i, j int) { + q[i], q[j] = q[j], q[i] +} + +func (q quantiles) Get(f Fraction) (item shed.Item, position uint64, found bool) { + for _, x := range q { + if x.Fraction == f { + return x.Item, x.Position, true + } + } + return item, 0, false +} + +func (q quantiles) Closest(f Fraction) (closest *quantile) { + for i, x := range q { + if x.Fraction == f { + return &q[i] + } + if closest == nil || math.Abs(x.Decimal()-f.Decimal()) < math.Abs(closest.Decimal()-f.Decimal()) { + closest = &q[i] + } + } + return closest +} + +func (q quantiles) Set(f Fraction, item shed.Item, position uint64) quantiles { + for i := range q { + if q[i].Fraction == f { + q[i].Item = item + q[i].Position = position + return q + } + } + newQ := append(q, quantile{ + Fraction: f, + Item: item, + }) + sort.Sort(newQ) + return newQ +} + +func quantilePosition(total, numerator, denominator uint64) uint64 { + return total / denominator * numerator +} + diff --git a/storage/localstore/quantile_test.go b/storage/localstore/quantile_test.go new file mode 100644 index 0000000000..6d119a4e96 --- /dev/null +++ b/storage/localstore/quantile_test.go @@ -0,0 +1,138 @@ +package localstore + +import ( + "fmt" + "math" + "math/rand" + "sort" + "testing" + + "github.com/ethersphere/swarm/shed" +) + +func TestSorting(t *testing.T) { + t.Parallel() + + n := 100 + data := getRandomQuantiles(n, 123) + sort.Sort(data) + + for i := 1; i < n; i++ { + // compare without trusting any methods - use common denominator + x, y := data[i], data[i-1] + if x.Numerator*y.Denominator < y.Numerator*x.Denominator { + t.Error("quantiles not ordered correctly") + } + } +} + +func TestFraction(t *testing.T) { + t.Parallel() + + tolerance := float64(0.0000001) + + tests := []struct { + f Fraction + r float64 + }{ + {Fraction{1, 2}, 0.5}, + {Fraction{2, 4}, 0.5}, + {Fraction{3, 4}, 0.75}, + {Fraction{4, 5}, 0.8}, + {Fraction{1, 5}, 0.2}, + {Fraction{99, 100}, 0.99}, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("Fraction %d/%d", test.f.Numerator, test.f.Denominator), func(tt *testing.T) { + if math.Abs(test.f.Decimal()-test.r) > tolerance { + tt.Errorf("expected: %f, got: %f", test.r, test.f.Decimal()) + } + }) + } +} + +func TestClosest(t *testing.T) { + t.Parallel() + + emptyQ := make(quantiles, 0) + if emptyQ.Closest(Fraction{1, 2}) != nil { + t.Error("expected nil") + } + + data := make(quantiles, 5) + data[0].Numerator = 1 + data[0].Denominator = 2 + data[1].Numerator = 1 + data[1].Denominator = 3 + data[2].Numerator = 1 + data[2].Denominator = 4 + data[3].Numerator = 2 + data[3].Denominator = 3 + data[4].Numerator = 3 + data[4].Denominator = 4 + + sanityChecks := []Fraction{ + {0, 1}, {1, 1}, {1, 2}, {2, 2}, {2, 1}, + } + for _, check := range sanityChecks { + if data.Closest(check) == nil { + t.Error("expected any value, got nil") + } + } + + checks := []struct { + f Fraction + expected *quantile + }{ + {Fraction{1, 2}, &data[0]}, // exact Fraction + {Fraction{4, 8}, &data[0]}, // almost same as above + {Fraction{1, 3}, &data[1]}, // exact Fraction + {Fraction{3, 9}, &data[1]}, // almost same as above + {Fraction{1, 4}, &data[2]}, // exact Fraction + {Fraction{1, 5}, &data[2]}, // smaller than any quantile + {Fraction{2, 3}, &data[3]}, // exact Fraction + {Fraction{3, 4}, &data[4]}, // exact Fraction + {Fraction{4, 4}, &data[4]}, // greater than any quantile + {Fraction{2, 1}, &data[4]}, // greater than any quantile + {Fraction{4, 10}, &data[1]}, // 0.4 closest to 1/3 (0.33) + {Fraction{42, 100}, &data[0]}, // 0.42 closest to 1/2 (0.5) + {Fraction{7, 10}, &data[3]}, // 0.7 closest to 2/3 (0.66) + } + + for _, check := range checks { + if c := data.Closest(check.f); c != check.expected { + t.Errorf("invalid quantile: expected Fraction: %d/%d, got: %d/%d", + check.expected.Numerator, check.expected.Denominator, c.Numerator, c.Denominator) + } + } +} + +func TestSet(t *testing.T) { + t.Parallel() + + var q quantiles + + if len(q) != 0 { + t.Errorf("Prerequisite is false") + } + + q = q.Set(Fraction{1, 1}, shed.Item{}, 1) + + if len(q) == 0 { + t.Errorf("quantiles.Set doesn't work") + } +} + +func getRandomQuantiles(n int, seed int64) quantiles { + m := uint64(100 * n) + data := make(quantiles, n) + + rnd := rand.New(rand.NewSource(seed)) // fixed seed for repeatable tests + + for i := 0; i < n; i++ { + data[i].Numerator = rnd.Uint64() % m + data[i].Denominator = rnd.Uint64() % m + } + return data +} diff --git a/swarm.go b/swarm.go index ff109c6bcc..08210243f3 100644 --- a/swarm.go +++ b/swarm.go @@ -242,6 +242,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.retrieval = retrieval.New(to, self.netStore, bzzconfig.Address, self.swap) self.netStore.RemoteGet = self.retrieval.RequestFromPeers + // update localstore responsibility radius on kademlia + // neighbourhood depth change. + go func() { + c, unsbscribe := to.SubscribeToNeighbourhoodDepthChange() + self.cleanupFuncs = append(self.cleanupFuncs, func() error { + unsbscribe() + return nil + }) + + for { + _, ok := <-c + if !ok { + return + } + localStore.SetResponsibilityRadius(to.NeighbourhoodDepth()) + } + }() + feedsHandler.SetStore(self.netStore) syncing := true