From c9d425448326b11811be9f79244e514ccb6f108d Mon Sep 17 00:00:00 2001 From: jmacd Date: Sun, 3 Nov 2019 09:22:25 -0800 Subject: [PATCH 01/28] Add go module, add simple sampler helper --- LICENSE | 21 +++++++++++++++++ README.md | 13 ++++++++++- go.mod | 5 +++++ go.sum | 10 +++++++++ simple.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++ simple_test.go | 38 +++++++++++++++++++++++++++++++ varopt.go | 12 ++++++---- varopt_test.go | 36 +++++++++++++++-------------- 8 files changed, 174 insertions(+), 22 deletions(-) create mode 100644 LICENSE create mode 100644 go.mod create mode 100644 go.sum create mode 100644 simple.go create mode 100644 simple_test.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..017c0cd --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2019 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index c9770ee..0c2992f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,14 @@ This is an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper [Stream sampling for variance-optimal -estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf). +estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf) (2008) +by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel +Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed size sample +and provides a mechanism for merging unequal-weight samples. + +This package also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, using +Algorithm R from [Random sampling with a +reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) +(1985) by Jeffrey Vitter. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1f0ead4 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/lightstep/varopt + +go 1.13 + +require github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e863f51 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/simple.go b/simple.go new file mode 100644 index 0000000..7d0fb7f --- /dev/null +++ b/simple.go @@ -0,0 +1,61 @@ +// Copyright 2019, LightStep Inc. + +package varopt + +import ( + "math/rand" +) + +// Simple implements unweighted reservoir sampling using Algorithm R +// from "Random sampling with a reservoir" by Jeffrey Vitter (1985) +// https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +type Simple struct { + capacity int + observed int + buffer []Sample +} + +func NewSimple(capacity int) *Simple { + return &Simple{ + capacity: capacity, + } +} + +func (s *Simple) Add(span Sample) { + s.observed++ + + if s.buffer == nil { + s.buffer = make([]Sample, 0, s.capacity) + } + + if len(s.buffer) < s.capacity { + s.buffer = append(s.buffer, span) + return + } + + // Give this a capacity/observed chance of replacing an existing entry. + index := rand.Intn(s.observed) + if index < s.capacity { + s.buffer[index] = span + } +} + +func (s *Simple) Get(i int) Sample { + return s.buffer[i] +} + +func (s *Simple) Size() int { + return len(s.buffer) +} + +func (s *Simple) Weight() float64 { + return float64(s.observed) / float64(s.Size()) +} + +func (s *Simple) Prob() float64 { + return 1 / s.Weight() +} + +func (s *Simple) Observed() int { + return s.observed +} diff --git a/simple_test.go b/simple_test.go new file mode 100644 index 0000000..8c83500 --- /dev/null +++ b/simple_test.go @@ -0,0 +1,38 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "testing" + + "github.com/lightstep/varopt" + "github.com/stretchr/testify/require" +) + +type iRec int + +func TestSimple(t *testing.T) { + const ( + popSize = 1e6 + sampleProb = 0.1 + sampleSize int = popSize * sampleProb + epsilon = 0.01 + ) + + ss := varopt.NewSimple(sampleSize) + + psum := 0. + for i := 0; i < popSize; i++ { + ss.Add(iRec(i)) + psum += float64(i) + } + + require.Equal(t, ss.Size(), sampleSize) + + ssum := 0.0 + for i := 0; i < sampleSize; i++ { + ssum += float64(ss.Get(i).(iRec)) + } + + require.InEpsilon(t, ssum/float64(ss.Size()), psum/popSize, epsilon) +} diff --git a/varopt.go b/varopt.go index d3cd2f1..e02d428 100644 --- a/varopt.go +++ b/varopt.go @@ -1,7 +1,4 @@ -// Stream sampling for variance-optimal estimation of subset sums -// Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup -// 2008 -// https://arxiv.org/pdf/0803.0473.pdf +// Copyright 2019, LightStep Inc. package varopt @@ -11,6 +8,11 @@ import ( "math/rand" ) +// Varopt implements the algorithm from Stream sampling for +// variance-optimal estimation of subset sums Edith Cohen, Nick +// Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup 2008 +// +// https://arxiv.org/pdf/0803.0473.pdf type Varopt struct { // Large-weight items L largeHeap @@ -31,6 +33,8 @@ type Varopt struct { totalWeight float64 } +type Sample interface{} + type vsample struct { sample Sample weight float64 diff --git a/varopt_test.go b/varopt_test.go index 4a19b80..108aeba 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -1,9 +1,12 @@ +// Copyright 2019, LightStep Inc. + package varopt_test import ( "math" "testing" + "github.com/lightstep/varopt" "github.com/stretchr/testify/require" ) @@ -11,9 +14,8 @@ import ( // There are odd and even numbers, in equal amount // There are last-digits 0-9 in equal amount // -// Much like simple_test.go, we will test the mean is correct and, -// because unbiased, also the odd/even and last-digit-0-9 groupings -// will be balanced. +// We will test the mean is correct and, because unbiased, also the +// odd/even and last-digit-0-9 groupings will be balanced. const ( numBlocks = 100 popSize = 1e7 @@ -53,7 +55,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { extra = popSize - bigSize*numBig - smallSize*numSmall ) - population := make([]Sample, popSize) + population := make([]varopt.Sample, popSize) psum := 0.0 @@ -67,17 +69,17 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { // population[i], population[j] = population[j], population[i] // }) - smallBlocks := make([][]Sample, numSmall) - bigBlocks := make([][]Sample, numBig) + smallBlocks := make([][]varopt.Sample, numSmall) + bigBlocks := make([][]varopt.Sample, numBig) for i := 0; i < numSmall; i++ { - smallBlocks[i] = make([]Sample, smallSize) + smallBlocks[i] = make([]varopt.Sample, smallSize) } for i := 0; i < numBig; i++ { if i == 0 { - bigBlocks[0] = make([]Sample, bigSize+extra) + bigBlocks[0] = make([]varopt.Sample, bigSize+extra) } else { - bigBlocks[i] = make([]Sample, bigSize) + bigBlocks[i] = make([]varopt.Sample, bigSize) } } @@ -98,13 +100,13 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { maxDiff := 0.0 - func(allBlockLists ...[][][]Sample) { + func(allBlockLists ...[][][]varopt.Sample) { for _, blockLists := range allBlockLists { - varopt := NewVaropt(sampleSize) + vsample := varopt.NewVaropt(sampleSize) for _, blockList := range blockLists { for _, block := range blockList { - simple := NewSimple(sampleSize) + simple := varopt.NewSimple(sampleSize) for _, s := range block { simple.Add(s) @@ -112,7 +114,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { weight := simple.Weight() for i := 0; i < simple.Size(); i++ { - varopt.Add(simple.Get(i), weight) + vsample.Add(simple.Get(i), weight) } } } @@ -121,8 +123,8 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { odd := 0.0 even := 0.0 - for i := 0; i < varopt.Size(); i++ { - v, w := varopt.Get(i) + for i := 0; i < vsample.Size(); i++ { + v, w := vsample.Get(i) vi := v.(int) if vi%2 == 0 { even++ @@ -140,7 +142,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { require.InEpsilon(t, odd, even, epsilon) } }( - [][][]Sample{bigBlocks, smallBlocks}, - [][][]Sample{smallBlocks, bigBlocks}, + [][][]varopt.Sample{bigBlocks, smallBlocks}, + [][][]varopt.Sample{smallBlocks, bigBlocks}, ) } From 865dd6a734a1360e4cce64e17d2f53386456248c Mon Sep 17 00:00:00 2001 From: jmacd Date: Thu, 7 Nov 2019 16:45:46 -0800 Subject: [PATCH 02/28] NewVaropt->New --- README.md | 2 +- varopt.go | 9 ++------- varopt_test.go | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 0c2992f..1cdb63c 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf) (2008) by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup. -VarOpt is a reservoir-type sampler that maintains a fixed size sample +VarOpt is a reservoir-type sampler that maintains a fixed-size sample and provides a mechanism for merging unequal-weight samples. This package also includes a simple reservoir sampling algorithm, diff --git a/varopt.go b/varopt.go index e02d428..3852d8c 100644 --- a/varopt.go +++ b/varopt.go @@ -42,13 +42,8 @@ type vsample struct { type largeHeap []vsample -func NewVaropt(capacity int) *Varopt { - v := InitVaropt(capacity) - return &v -} - -func InitVaropt(capacity int) Varopt { - return Varopt{ +func New(capacity int) *Varopt { + return &Varopt{ capacity: capacity, } } diff --git a/varopt_test.go b/varopt_test.go index 108aeba..3cebe7c 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -102,7 +102,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { func(allBlockLists ...[][][]varopt.Sample) { for _, blockLists := range allBlockLists { - vsample := varopt.NewVaropt(sampleSize) + vsample := varopt.New(sampleSize) for _, blockList := range blockLists { for _, block := range blockList { From 3dfa2d1ac5a721e1026bfd01fbc23c00c2245cb0 Mon Sep 17 00:00:00 2001 From: jmacd Date: Thu, 7 Nov 2019 22:49:09 -0800 Subject: [PATCH 03/28] Add testable examples --- frequency_test.go | 143 ++++++++++++++++++++++++++++++++++++++++++++++ varopt.go | 10 +++- varopt_test.go | 4 +- weighted_test.go | 82 ++++++++++++++++++++++++++ 4 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 frequency_test.go create mode 100644 weighted_test.go diff --git a/frequency_test.go b/frequency_test.go new file mode 100644 index 0000000..9da7bca --- /dev/null +++ b/frequency_test.go @@ -0,0 +1,143 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "fmt" + "math" + "math/rand" + + "github.com/lightstep/varopt" +) + +type curve struct { + color string + mean float64 + stddev float64 +} + +type point struct { + color int + xvalue float64 +} + +var colors = []curve{ + {color: "red", mean: 10, stddev: 15}, + {color: "green", mean: 30, stddev: 10}, + {color: "blue", mean: 50, stddev: 20}, +} + +// This example shows how to use Varopt sampling to estimate +// frequencies with the use of inverse probability weights. The use +// of inverse probability creates a uniform expected value, in this of +// the number of sample points per second. +// +// While the number of expected points per second is uniform, the +// output sample weights are expected to match the original +// frequencies. +func ExampleFrequency() { + // Number of points. + const totalCount = 1e6 + + // Relative size of the sample. + const sampleRatio = 0.01 + + // Ensure this test is deterministic. + rnd := rand.New(rand.NewSource(104729)) + + // Construct a timeseries consisting of three colored signals, + // for x=0 to x=60 seconds. + var points []point + + // origCounts stores the original signals at second granularity. + origCounts := make([][]int, len(colors)) + for i := range colors { + origCounts[i] = make([]int, 60) + } + + // Construct the signals by choosing a random color, then + // using its Gaussian to compute a timestamp. + for len(points) < totalCount { + choose := rnd.Intn(len(colors)) + series := colors[choose] + xvalue := rnd.NormFloat64()*series.stddev + series.mean + + if xvalue < 0 || xvalue > 60 { + continue + } + origCounts[choose][int(math.Floor(xvalue))]++ + points = append(points, point{ + color: choose, + xvalue: xvalue, + }) + } + + // Compute the total number of points per second. This will be + // used to establish the per-second probability. + xcount := make([]int, 60) + for _, point := range points { + xcount[int(math.Floor(point.xvalue))]++ + } + + // Compute the sample with using the inverse probability as a + // weight. This ensures a uniform distribution of points in each + // second. + sampleSize := int(sampleRatio * float64(totalCount)) + sampler := varopt.New(sampleSize, rnd) + for _, point := range points { + second := int(math.Floor(point.xvalue)) + prob := float64(xcount[second]) / float64(totalCount) + sampler.Add(point, 1/prob) + } + + // sampleCounts stores the reconstructed signals. + sampleCounts := make([][]float64, len(colors)) + for i := range colors { + sampleCounts[i] = make([]float64, 60) + } + + // pointCounts stores the number of points per second. + pointCounts := make([]int, 60) + + // Reconstruct the signals using the output sample weights. + // The effective count of each sample point is its output + // weight divided by its original weight. + for i := 0; i < sampler.Size(); i++ { + sample, weight := sampler.Get(i) + origWeight := sampler.GetOriginalWeight(i) + point := sample.(point) + second := int(math.Floor(point.xvalue)) + sampleCounts[point.color][second] += (weight / origWeight) + pointCounts[second]++ + } + + // Compute standard deviation of sample points per second. + sum := 0.0 + mean := float64(sampleSize) / 60 + for s := 0; s < 60; s++ { + e := float64(pointCounts[s]) - mean + sum += e * e + } + stddev := math.Sqrt(sum / (60 - 1)) + + fmt.Printf("Samples per second mean %.2f\n", mean) + fmt.Printf("Samples per second standard deviation %.2f\n", stddev) + + // Compute mean absolute percentage error between sampleCounts + // and origCounts for each signal. + for c := range colors { + mae := 0.0 + for s := 0; s < 60; s++ { + mae += math.Abs(sampleCounts[c][s]-float64(origCounts[c][s])) / float64(origCounts[c][s]) + } + mae /= 60 + fmt.Printf("Mean absolute percentage error (%s) = %.2f%%\n", colors[c].color, mae*100) + } + + // Output: + // Samples per second mean 166.67 + // Samples per second standard deviation 13.75 + // Mean absolute percentage error (red) = 25.16% + // Mean absolute percentage error (green) = 14.30% + // Mean absolute percentage error (blue) = 14.23% +} diff --git a/varopt.go b/varopt.go index 3852d8c..4dd8ea3 100644 --- a/varopt.go +++ b/varopt.go @@ -14,6 +14,9 @@ import ( // // https://arxiv.org/pdf/0803.0473.pdf type Varopt struct { + // Random number generator + rnd *rand.Rand + // Large-weight items L largeHeap @@ -42,9 +45,10 @@ type vsample struct { type largeHeap []vsample -func New(capacity int) *Varopt { +func New(capacity int, rnd *rand.Rand) *Varopt { return &Varopt{ capacity: capacity, + rnd: rnd, } } @@ -99,7 +103,7 @@ func (s *Varopt) Add(sample Sample, weight float64) { } s.X = s.X[:len(s.X)-1] } else { - ti := rand.Intn(len(s.T)) + ti := s.rnd.Intn(len(s.T)) s.T[ti], s.T[len(s.T)-1] = s.T[len(s.T)-1], s.T[ti] s.T = s.T[:len(s.T)-1] } @@ -109,7 +113,7 @@ func (s *Varopt) Add(sample Sample, weight float64) { func (s *Varopt) uniform() float64 { for { - r := rand.Float64() + r := s.rnd.Float64() if r != 0.0 { return r } diff --git a/varopt_test.go b/varopt_test.go index 3cebe7c..16786ab 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -4,6 +4,7 @@ package varopt_test import ( "math" + "math/rand" "testing" "github.com/lightstep/varopt" @@ -99,10 +100,11 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { require.Equal(t, len(population), pos) maxDiff := 0.0 + rnd := rand.New(rand.NewSource(98887)) func(allBlockLists ...[][][]varopt.Sample) { for _, blockLists := range allBlockLists { - vsample := varopt.New(sampleSize) + vsample := varopt.New(sampleSize, rnd) for _, blockList := range blockLists { for _, block := range blockList { diff --git a/weighted_test.go b/weighted_test.go new file mode 100644 index 0000000..e545fbd --- /dev/null +++ b/weighted_test.go @@ -0,0 +1,82 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "fmt" + "math" + "math/rand" + + "github.com/lightstep/varopt" +) + +type packet struct { + size int + color string + protocol string +} + +func ExampleWeighted() { + const totalPackets = 1e6 + const sampleRatio = 0.01 + + colors := []string{"red", "green", "blue"} + protocols := []string{"http", "tcp", "udp"} + + sizeByColor := map[string]int{} + sizeByProtocol := map[string]int{} + trueTotalWeight := 0.0 + + rnd := rand.New(rand.NewSource(32491)) + sampler := varopt.New(totalPackets*sampleRatio, rnd) + + for i := 0; i < totalPackets; i++ { + packet := packet{ + size: 1 + rnd.Intn(100000), + color: colors[rnd.Intn(len(colors))], + protocol: protocols[rnd.Intn(len(protocols))], + } + + sizeByColor[packet.color] += packet.size + sizeByProtocol[packet.protocol] += packet.size + trueTotalWeight += float64(packet.size) + + sampler.Add(packet, float64(packet.size)) + } + + estSizeByColor := map[string]float64{} + estSizeByProtocol := map[string]float64{} + estTotalWeight := 0.0 + + for i := 0; i < sampler.Size(); i++ { + sample, weight := sampler.Get(i) + packet := sample.(packet) + estSizeByColor[packet.color] += weight + estSizeByProtocol[packet.protocol] += weight + estTotalWeight += weight + } + + // Compute mean average percentage error for colors + colorMape := 0.0 + for _, c := range colors { + colorMape += math.Abs(float64(sizeByColor[c])-estSizeByColor[c]) / float64(sizeByColor[c]) + } + colorMape /= float64(len(colors)) + + // Compute mean average percentage error for protocols + protocolMape := 0.0 + for _, p := range protocols { + protocolMape += math.Abs(float64(sizeByProtocol[p])-estSizeByProtocol[p]) / float64(sizeByProtocol[p]) + } + protocolMape /= float64(len(protocols)) + + // Compute total sum error percentage + fmt.Printf("Total sum error %.2g%%\n", 100*math.Abs(estTotalWeight-trueTotalWeight)/trueTotalWeight) + fmt.Printf("Color mean absolute percentage error %.2f%%\n", 100*colorMape) + fmt.Printf("Protocol mean absolute percentage error %.2f%%\n", 100*protocolMape) + + // Output: + // Total sum error 2.4e-11% + // Color mean absolute percentage error 0.73% + // Protocol mean absolute percentage error 1.62% +} From 2be048e87da0b605d77e8960a67090ebd177bc39 Mon Sep 17 00:00:00 2001 From: jmacd Date: Thu, 7 Nov 2019 23:05:28 -0800 Subject: [PATCH 04/28] Update README with examples and godoc link --- README.md | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/README.md b/README.md index 1cdb63c..82cc1c4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ +[![Docs](https://godoc.org/github.com/lightstep/varopt?status.svg)](https://godoc.org/github.com/lightstep/varopt) + +# VarOpt Sampling Algorithm + This is an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper [Stream sampling for variance-optimal estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf) (2008) @@ -12,3 +16,35 @@ often useful in conjunction with weighed reservoir sampling, using Algorithm R from [Random sampling with a reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) (1985) by Jeffrey Vitter. + +# Usage: Natural Weights + +A typical use of VarOpt sampling is to estimate network flows using +sample packets. In this use-case, the weight applied to each sample +is the size of the packet. Beacuse VarOpt computes an unbiased +sample, the sample data points can be summarized along secondary +dimensions. For example, we can select a subset of the sample +according to a secondary attribute, sum the sample weights, and the +result is expected value of the secondary attribute in the original +population. + +See [weighted_test.go](https://github.com/lightstep/varopt/blob/master/weighted_test.go) for an example. + +# Usage: Inverse-probability Weights + +Another use for VarOpt sampling uses inverse-probability weights to +estimate frequencies while simultaneously controlling sample +diversity. Suppose a sequence of observations can be naturally +categorized into N different buckets. The goal in this case is to +compute a sample where each bucket is well represented, while +maintaining frequency estimates. + +In this use-case, the weight assigned to each observation is the +inverse probability of the bucket it belongs to. The result of +weighted sampling with inverse-probability weights is a uniform +expectation, in this example we expect an equal number of observations +falling into each bucket. Each observation represents a frequency of +its sample weight (computed by VarOpt) divided by its original weight +(the inverse-probability). + +See [frequency_test.go](https://github.com/lightstep/varopt/blob/master/frequency_test.go) for an example. From 08e548be48e5ad1232d9e340f17e96da3bc660c1 Mon Sep 17 00:00:00 2001 From: jmacd Date: Thu, 7 Nov 2019 23:08:10 -0800 Subject: [PATCH 05/28] Update --- README.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 82cc1c4..ba73609 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Algorithm R from [Random sampling with a reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) (1985) by Jeffrey Vitter. -# Usage: Natural Weights +## Usage: Natural Weights A typical use of VarOpt sampling is to estimate network flows using sample packets. In this use-case, the weight applied to each sample @@ -30,7 +30,7 @@ population. See [weighted_test.go](https://github.com/lightstep/varopt/blob/master/weighted_test.go) for an example. -# Usage: Inverse-probability Weights +## Usage: Inverse-probability Weights Another use for VarOpt sampling uses inverse-probability weights to estimate frequencies while simultaneously controlling sample @@ -48,3 +48,11 @@ its sample weight (computed by VarOpt) divided by its original weight (the inverse-probability). See [frequency_test.go](https://github.com/lightstep/varopt/blob/master/frequency_test.go) for an example. + +## Usage: Merging Samples + +VarOpt supports merging independently collected samples one +observation at a time. This is useful for building distributed +sampling schemes. In this use-case, each node in a distributed system +computes a weighted sample. To combine samples, simply input all the +observations and their corresponding weights into a new VarOpt sample. \ No newline at end of file From 43a69f477853bd6cc3828f5ff0bb10d4fec68334 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 09:34:51 -0800 Subject: [PATCH 06/28] Use a deterministic random source; name examples for godoc --- frequency_test.go | 2 +- simple.go | 6 ++++-- simple_test.go | 5 ++++- varopt_test.go | 6 ++---- weighted_test.go | 2 +- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/frequency_test.go b/frequency_test.go index 9da7bca..e4b3429 100644 --- a/frequency_test.go +++ b/frequency_test.go @@ -35,7 +35,7 @@ var colors = []curve{ // While the number of expected points per second is uniform, the // output sample weights are expected to match the original // frequencies. -func ExampleFrequency() { +func ExampleVaropt_GetOriginalWeight() { // Number of points. const totalCount = 1e6 diff --git a/simple.go b/simple.go index 7d0fb7f..d34d42a 100644 --- a/simple.go +++ b/simple.go @@ -13,11 +13,13 @@ type Simple struct { capacity int observed int buffer []Sample + rnd *rand.Rand } -func NewSimple(capacity int) *Simple { +func NewSimple(capacity int, rnd *rand.Rand) *Simple { return &Simple{ capacity: capacity, + rnd: rnd, } } @@ -34,7 +36,7 @@ func (s *Simple) Add(span Sample) { } // Give this a capacity/observed chance of replacing an existing entry. - index := rand.Intn(s.observed) + index := s.rnd.Intn(s.observed) if index < s.capacity { s.buffer[index] = span } diff --git a/simple_test.go b/simple_test.go index 8c83500..c70503e 100644 --- a/simple_test.go +++ b/simple_test.go @@ -3,6 +3,7 @@ package varopt_test import ( + "math/rand" "testing" "github.com/lightstep/varopt" @@ -19,7 +20,9 @@ func TestSimple(t *testing.T) { epsilon = 0.01 ) - ss := varopt.NewSimple(sampleSize) + rnd := rand.New(rand.NewSource(17167)) + + ss := varopt.NewSimple(sampleSize, rnd) psum := 0. for i := 0; i < popSize; i++ { diff --git a/varopt_test.go b/varopt_test.go index 16786ab..3d6eec3 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -23,9 +23,7 @@ const ( sampleProb = 0.001 sampleSize int = popSize * sampleProb - // TODO epsilon is somewhat variable b/c we're using the - // static rand w/o a fixed seed for the test. - epsilon = 0.06 + epsilon = 0.08 ) func TestUnbiased(t *testing.T) { @@ -108,7 +106,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { for _, blockList := range blockLists { for _, block := range blockList { - simple := varopt.NewSimple(sampleSize) + simple := varopt.NewSimple(sampleSize, rnd) for _, s := range block { simple.Add(s) diff --git a/weighted_test.go b/weighted_test.go index e545fbd..1e82043 100644 --- a/weighted_test.go +++ b/weighted_test.go @@ -16,7 +16,7 @@ type packet struct { protocol string } -func ExampleWeighted() { +func ExampleNew() { const totalPackets = 1e6 const sampleRatio = 0.01 From 68f1faba1c25c8dcf24ae854b7d0cf1ff20205ac Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 09:50:25 -0800 Subject: [PATCH 07/28] Add comments --- doc.go | 22 ++++++++++++++++++++++ simple.go | 15 ++++++++++----- varopt.go | 18 ++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) create mode 100644 doc.go diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..deb2de9 --- /dev/null +++ b/doc.go @@ -0,0 +1,22 @@ +// Copyright 2019, LightStep Inc. + +/* +Package varopt is an implementation of VarOpt, an unbiased weighted +sampling algorithm described in the paper "Stream sampling for +variance-optimal estimation of subset sums" +https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick +Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed-size sample +and provides a mechanism for merging unequal-weight samples. + +This package also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, using +Algorithm R from "Random sampling with a +reservoir", https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +(1985), by Jeffrey Vitter. + +See https://github.com/lightstep/varopt/blob/master/README.md for +more detail. +*/ +package varopt diff --git a/simple.go b/simple.go index d34d42a..6ff64f1 100644 --- a/simple.go +++ b/simple.go @@ -16,6 +16,8 @@ type Simple struct { rnd *rand.Rand } +// NewSimple returns a simple reservoir sampler with given capacity +// (i.e., reservoir size) and random number generator. func NewSimple(capacity int, rnd *rand.Rand) *Simple { return &Simple{ capacity: capacity, @@ -23,6 +25,8 @@ func NewSimple(capacity int, rnd *rand.Rand) *Simple { } } +// Add considers a new observation for the sample. Items have unit +// weight. func (s *Simple) Add(span Sample) { s.observed++ @@ -42,22 +46,23 @@ func (s *Simple) Add(span Sample) { } } +// Get returns the i'th selected item from the sample. func (s *Simple) Get(i int) Sample { return s.buffer[i] } +// Get returns the number of items in the sample. If the reservoir is +// full, Size() equals Capacity(). func (s *Simple) Size() int { return len(s.buffer) } +// Weight returns the adjusted weight of each item in the sample. func (s *Simple) Weight() float64 { return float64(s.observed) / float64(s.Size()) } -func (s *Simple) Prob() float64 { - return 1 / s.Weight() -} - -func (s *Simple) Observed() int { +// Count returns the number of items that were observed. +func (s *Simple) Count() int { return s.observed } diff --git a/varopt.go b/varopt.go index 4dd8ea3..1856614 100644 --- a/varopt.go +++ b/varopt.go @@ -36,6 +36,9 @@ type Varopt struct { totalWeight float64 } +// Sample is an empty interface that represents a sample item. +// Sampling algorithms treat these as opaque, as their weight is +// passed in separately. type Sample interface{} type vsample struct { @@ -45,6 +48,8 @@ type vsample struct { type largeHeap []vsample +// New returns a new Varopt sampler with given capacity (i.e., +// reservoir size) and random number generator. func New(capacity int, rnd *rand.Rand) *Varopt { return &Varopt{ capacity: capacity, @@ -52,6 +57,7 @@ func New(capacity int, rnd *rand.Rand) *Varopt { } } +// Add considers a new observation for the sample with given weight. func (s *Varopt) Add(sample Sample, weight float64) { individual := vsample{ sample: sample, @@ -131,6 +137,9 @@ func (s *Varopt) Get(i int) (Sample, float64) { return s.T[i-len(s.L)].sample, s.tau } +// GetOriginalWeight returns the original input weight of the sample +// item that was passed to Add(). This can be useful for computing a +// frequency from the adjusted sample weight. func (s *Varopt) GetOriginalWeight(i int) float64 { if i < len(s.L) { return s.L[i].weight @@ -139,22 +148,31 @@ func (s *Varopt) GetOriginalWeight(i int) float64 { return s.T[i-len(s.L)].weight } +// Capacity returns the size of the reservoir. This is the maximum +// size of the sample. func (s *Varopt) Capacity() int { return s.capacity } +// Size returns the current number of items in the sample. If the +// reservoir is full, this returns Capacity(). func (s *Varopt) Size() int { return len(s.L) + len(s.T) } +// TotalWeight returns the sum of weights that were passed to Add(). func (s *Varopt) TotalWeight() float64 { return s.totalWeight } +// TotalCount returns the number of calls to Add(). func (s *Varopt) TotalCount() int { return s.totalCount } +// Tau returns the current large-weight threshold. Weights larger +// than Tau() carry their exact weight int he sample. See the VarOpt +// paper for details. func (s *Varopt) Tau() float64 { return s.tau } From 5c2a41c30ef4fb1688346f362a18788ff296069e Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 09:58:00 -0800 Subject: [PATCH 08/28] Add circle config --- .circleci/config.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .circleci/config.yml diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..1e65ea3 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,21 @@ +version: 2 + +jobs: + test: + working_directory: ~/go/src/github.com/lightstep/varopt + docker: + - image: circleci/golang:1.13 + steps: + - run: + name: "configure environment" + command: | + echo 'export GOPATH="$HOME/go"' >> $BASH_ENV + source $BASH_ENV + - checkout + - run: go test + +workflows: + version: 2 + test: + jobs: + - test From cd735680d9ca6efe402d3819a5f89e2ec7f751cc Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 10:46:59 -0800 Subject: [PATCH 09/28] Move simple into subpackage --- .circleci/config.yml | 2 +- simple.go => simple/simple.go | 16 ++++++++------ simple_test.go | 41 ----------------------------------- varopt_test.go | 11 +++++----- 4 files changed, 16 insertions(+), 54 deletions(-) rename simple.go => simple/simple.go (80%) delete mode 100644 simple_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 1e65ea3..b31eb32 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -12,7 +12,7 @@ jobs: echo 'export GOPATH="$HOME/go"' >> $BASH_ENV source $BASH_ENV - checkout - - run: go test + - run: go test -v ./... workflows: version: 2 diff --git a/simple.go b/simple/simple.go similarity index 80% rename from simple.go rename to simple/simple.go index 6ff64f1..c086dfa 100644 --- a/simple.go +++ b/simple/simple.go @@ -1,9 +1,11 @@ // Copyright 2019, LightStep Inc. -package varopt +package simple import ( "math/rand" + + "github.com/lightstep/varopt" ) // Simple implements unweighted reservoir sampling using Algorithm R @@ -12,13 +14,13 @@ import ( type Simple struct { capacity int observed int - buffer []Sample + buffer []varopt.Sample rnd *rand.Rand } -// NewSimple returns a simple reservoir sampler with given capacity +// New returns a simple reservoir sampler with given capacity // (i.e., reservoir size) and random number generator. -func NewSimple(capacity int, rnd *rand.Rand) *Simple { +func New(capacity int, rnd *rand.Rand) *Simple { return &Simple{ capacity: capacity, rnd: rnd, @@ -27,11 +29,11 @@ func NewSimple(capacity int, rnd *rand.Rand) *Simple { // Add considers a new observation for the sample. Items have unit // weight. -func (s *Simple) Add(span Sample) { +func (s *Simple) Add(span varopt.Sample) { s.observed++ if s.buffer == nil { - s.buffer = make([]Sample, 0, s.capacity) + s.buffer = make([]varopt.Sample, 0, s.capacity) } if len(s.buffer) < s.capacity { @@ -47,7 +49,7 @@ func (s *Simple) Add(span Sample) { } // Get returns the i'th selected item from the sample. -func (s *Simple) Get(i int) Sample { +func (s *Simple) Get(i int) varopt.Sample { return s.buffer[i] } diff --git a/simple_test.go b/simple_test.go deleted file mode 100644 index c70503e..0000000 --- a/simple_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2019, LightStep Inc. - -package varopt_test - -import ( - "math/rand" - "testing" - - "github.com/lightstep/varopt" - "github.com/stretchr/testify/require" -) - -type iRec int - -func TestSimple(t *testing.T) { - const ( - popSize = 1e6 - sampleProb = 0.1 - sampleSize int = popSize * sampleProb - epsilon = 0.01 - ) - - rnd := rand.New(rand.NewSource(17167)) - - ss := varopt.NewSimple(sampleSize, rnd) - - psum := 0. - for i := 0; i < popSize; i++ { - ss.Add(iRec(i)) - psum += float64(i) - } - - require.Equal(t, ss.Size(), sampleSize) - - ssum := 0.0 - for i := 0; i < sampleSize; i++ { - ssum += float64(ss.Get(i).(iRec)) - } - - require.InEpsilon(t, ssum/float64(ss.Size()), psum/popSize, epsilon) -} diff --git a/varopt_test.go b/varopt_test.go index 3d6eec3..06f7bc1 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/lightstep/varopt" + "github.com/lightstep/varopt/simple" "github.com/stretchr/testify/require" ) @@ -106,15 +107,15 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { for _, blockList := range blockLists { for _, block := range blockList { - simple := varopt.NewSimple(sampleSize, rnd) + ss := simple.New(sampleSize, rnd) for _, s := range block { - simple.Add(s) + ss.Add(s) } - weight := simple.Weight() - for i := 0; i < simple.Size(); i++ { - vsample.Add(simple.Get(i), weight) + weight := ss.Weight() + for i := 0; i < ss.Size(); i++ { + vsample.Add(ss.Get(i), weight) } } } From 2afca792c50cd4cab9c8709c47553ad0b5f3aa41 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 12:49:00 -0800 Subject: [PATCH 10/28] Restore the simple test --- simple/simple_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ varopt.go | 2 +- 2 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 simple/simple_test.go diff --git a/simple/simple_test.go b/simple/simple_test.go new file mode 100644 index 0000000..4ea9592 --- /dev/null +++ b/simple/simple_test.go @@ -0,0 +1,41 @@ +// Copyright 2019, LightStep Inc. + +package simple_test + +import ( + "math/rand" + "testing" + + "github.com/lightstep/varopt/simple" + "github.com/stretchr/testify/require" +) + +type iRec int + +func TestSimple(t *testing.T) { + const ( + popSize = 1e6 + sampleProb = 0.1 + sampleSize int = popSize * sampleProb + epsilon = 0.01 + ) + + rnd := rand.New(rand.NewSource(17167)) + + ss := simple.New(sampleSize, rnd) + + psum := 0. + for i := 0; i < popSize; i++ { + ss.Add(iRec(i)) + psum += float64(i) + } + + require.Equal(t, ss.Size(), sampleSize) + + ssum := 0.0 + for i := 0; i < sampleSize; i++ { + ssum += float64(ss.Get(i).(iRec)) + } + + require.InEpsilon(t, ssum/float64(ss.Size()), psum/popSize, epsilon) +} diff --git a/varopt.go b/varopt.go index 1856614..5ab1ee5 100644 --- a/varopt.go +++ b/varopt.go @@ -171,7 +171,7 @@ func (s *Varopt) TotalCount() int { } // Tau returns the current large-weight threshold. Weights larger -// than Tau() carry their exact weight int he sample. See the VarOpt +// than Tau() carry their exact weight in the sample. See the VarOpt // paper for details. func (s *Varopt) Tau() float64 { return s.tau From 7ec92c7916046345eb267889db7a52405372897f Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 13:35:18 -0800 Subject: [PATCH 11/28] Add simple/doc.go --- README.md | 6 +++--- doc.go | 2 +- simple/doc.go | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) create mode 100644 simple/doc.go diff --git a/README.md b/README.md index ba73609..a949cc0 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,9 @@ Thorup. VarOpt is a reservoir-type sampler that maintains a fixed-size sample and provides a mechanism for merging unequal-weight samples. -This package also includes a simple reservoir sampling algorithm, -often useful in conjunction with weighed reservoir sampling, using -Algorithm R from [Random sampling with a +This repository also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, that +implements Algorithm R from [Random sampling with a reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) (1985) by Jeffrey Vitter. diff --git a/doc.go b/doc.go index deb2de9..fe0e2e6 100644 --- a/doc.go +++ b/doc.go @@ -1,7 +1,7 @@ // Copyright 2019, LightStep Inc. /* -Package varopt is an implementation of VarOpt, an unbiased weighted +Package varopt contains an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper "Stream sampling for variance-optimal estimation of subset sums" https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick diff --git a/simple/doc.go b/simple/doc.go new file mode 100644 index 0000000..0d79ebe --- /dev/null +++ b/simple/doc.go @@ -0,0 +1,4 @@ +// Copyright 2019, LightStep Inc. + +/* Package simple implements an unweighted reservoir sampling algorithm. */ +package simple From 4b69751fc60eb5e2fad15643ebdb68ea26375984 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 8 Nov 2019 20:31:24 -0800 Subject: [PATCH 12/28] Rephrase and fix typos --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index a949cc0..131b6b3 100644 --- a/README.md +++ b/README.md @@ -21,12 +21,12 @@ reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) A typical use of VarOpt sampling is to estimate network flows using sample packets. In this use-case, the weight applied to each sample -is the size of the packet. Beacuse VarOpt computes an unbiased -sample, the sample data points can be summarized along secondary -dimensions. For example, we can select a subset of the sample +is the size of the packet. Because VarOpt computes an unbiased +sample, sample data points can be summarized along secondary +dimensions. For example, we can select a subset of sampled packets according to a secondary attribute, sum the sample weights, and the -result is expected value of the secondary attribute in the original -population. +result is expected to equal the size of packets corresponding to the +secondary attribute from the original population. See [weighted_test.go](https://github.com/lightstep/varopt/blob/master/weighted_test.go) for an example. @@ -42,10 +42,10 @@ maintaining frequency estimates. In this use-case, the weight assigned to each observation is the inverse probability of the bucket it belongs to. The result of weighted sampling with inverse-probability weights is a uniform -expectation, in this example we expect an equal number of observations -falling into each bucket. Each observation represents a frequency of -its sample weight (computed by VarOpt) divided by its original weight -(the inverse-probability). +expectated value; in this example we expect an equal number of +observations falling into each bucket. Each observation represents a +frequency of its sample weight (computed by VarOpt) divided by its +original weight (the inverse-probability). See [frequency_test.go](https://github.com/lightstep/varopt/blob/master/frequency_test.go) for an example. From 4062031cc972442adf70edcc136ff6daac969213 Mon Sep 17 00:00:00 2001 From: jmacd Date: Sat, 9 Nov 2019 11:49:31 -0800 Subject: [PATCH 13/28] Add a benchmark --- benchmark_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 benchmark_test.go diff --git a/benchmark_test.go b/benchmark_test.go new file mode 100644 index 0000000..a2f7c04 --- /dev/null +++ b/benchmark_test.go @@ -0,0 +1,58 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "math" + "math/rand" + "testing" + + "github.com/lightstep/varopt" +) + +func normValue(rnd *rand.Rand) float64 { + return 1 + math.Abs(rnd.NormFloat64()) +} + +func expValue(rnd *rand.Rand) float64 { + return rnd.ExpFloat64() +} + +func BenchmarkAdd_Norm_100(b *testing.B) { + benchmarkAdd(b, 100, normValue) +} + +func BenchmarkAdd_Norm_10000(b *testing.B) { + benchmarkAdd(b, 10000, normValue) +} + +func BenchmarkAdd_Norm_1000000(b *testing.B) { + benchmarkAdd(b, 1000000, normValue) +} + +func BenchmarkAdd_Exp_100(b *testing.B) { + benchmarkAdd(b, 100, expValue) +} + +func BenchmarkAdd_Exp_10000(b *testing.B) { + benchmarkAdd(b, 10000, expValue) +} + +func BenchmarkAdd_Exp_1000000(b *testing.B) { + benchmarkAdd(b, 1000000, expValue) +} + +func benchmarkAdd(b *testing.B, size int, f func(rnd *rand.Rand) float64) { + b.ReportAllocs() + rnd := rand.New(rand.NewSource(3331)) + v := varopt.New(size, rnd) + weights := make([]float64, b.N) + for i := 0; i < b.N; i++ { + weights[i] = f(rnd) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + v.Add(nil, weights[i]) + } +} From ba028d8125e01c56a625258c7a7341efd7e9b8fc Mon Sep 17 00:00:00 2001 From: jmacd Date: Sat, 9 Nov 2019 11:55:29 -0800 Subject: [PATCH 14/28] Add benchmark notes --- benchmark_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/benchmark_test.go b/benchmark_test.go index a2f7c04..131a08d 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -1,4 +1,18 @@ // Copyright 2019, LightStep Inc. +// +// The benchmark results point to a performance drop when the +// largeHeap starts to be used because of interface conversions in and +// out of the heap, primarily due to the heap interface. This +// suggests room for improvement by avoiding the built-in heap. + +/* +BenchmarkAdd_Norm_100-8 37540165 32.1 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Norm_10000-8 39850280 30.6 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Norm_1000000-8 7958835 183 ns/op 52 B/op 0 allocs/op +BenchmarkAdd_Exp_100-8 41565934 28.5 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Exp_10000-8 43622184 29.2 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Exp_1000000-8 8103663 220 ns/op 55 B/op 0 allocs/op +*/ package varopt_test From db2f575931030ee0fdd5ad2324d12f8517f2e96c Mon Sep 17 00:00:00 2001 From: jmacd Date: Tue, 12 Nov 2019 23:26:37 -0800 Subject: [PATCH 15/28] Check for NaN weight --- varopt.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/varopt.go b/varopt.go index 5ab1ee5..fc802c5 100644 --- a/varopt.go +++ b/varopt.go @@ -5,6 +5,7 @@ package varopt import ( "container/heap" "fmt" + "math" "math/rand" ) @@ -64,8 +65,8 @@ func (s *Varopt) Add(sample Sample, weight float64) { weight: weight, } - if weight <= 0 { - panic(fmt.Sprint("Invalid weight <= 0: ", weight)) + if weight <= 0 || math.IsNaN(weight) { + panic(fmt.Sprint("Invalid weight: ", weight)) } s.totalCount++ From 5cd2650f3fdc3f9c914b102ec11df57c687d439d Mon Sep 17 00:00:00 2001 From: jmacd Date: Wed, 13 Nov 2019 21:51:59 -0800 Subject: [PATCH 16/28] Return an error, don't panic --- varopt.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/varopt.go b/varopt.go index fc802c5..f786d12 100644 --- a/varopt.go +++ b/varopt.go @@ -49,6 +49,8 @@ type vsample struct { type largeHeap []vsample +var ErrInvalidWeight = fmt.Errorf("Negative or NaN weight") + // New returns a new Varopt sampler with given capacity (i.e., // reservoir size) and random number generator. func New(capacity int, rnd *rand.Rand) *Varopt { @@ -59,14 +61,16 @@ func New(capacity int, rnd *rand.Rand) *Varopt { } // Add considers a new observation for the sample with given weight. -func (s *Varopt) Add(sample Sample, weight float64) { +// +// An error will be returned if the weight is either negative or NaN. +func (s *Varopt) Add(sample Sample, weight float64) error { individual := vsample{ sample: sample, weight: weight, } if weight <= 0 || math.IsNaN(weight) { - panic(fmt.Sprint("Invalid weight: ", weight)) + return ErrInvalidWeight } s.totalCount++ @@ -74,7 +78,7 @@ func (s *Varopt) Add(sample Sample, weight float64) { if s.Size() < s.capacity { heap.Push(&s.L, individual) - return + return nil } // the X <- {} step from the paper is not done here, @@ -116,6 +120,7 @@ func (s *Varopt) Add(sample Sample, weight float64) { } s.T = append(s.T, s.X...) s.X = s.X[:0] + return nil } func (s *Varopt) uniform() float64 { From b92f0877e9a51c422d2e715ff97a550fd5f70a68 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 15 Nov 2019 12:20:01 -0800 Subject: [PATCH 17/28] Inline the large-weight heap to avoid interface conversions --- varopt.go | 66 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/varopt.go b/varopt.go index 5ab1ee5..7e4449c 100644 --- a/varopt.go +++ b/varopt.go @@ -3,7 +3,6 @@ package varopt import ( - "container/heap" "fmt" "math/rand" ) @@ -17,7 +16,7 @@ type Varopt struct { // Random number generator rnd *rand.Rand - // Large-weight items + // Large-weight items stored in a min-heap. L largeHeap // Light-weight items. @@ -72,7 +71,7 @@ func (s *Varopt) Add(sample Sample, weight float64) { s.totalWeight += weight if s.Size() < s.capacity { - heap.Push(&s.L, individual) + s.L.push(individual) return } @@ -82,14 +81,14 @@ func (s *Varopt) Add(sample Sample, weight float64) { W := s.tau * float64(len(s.T)) if weight > s.tau { - heap.Push(&s.L, individual) + s.L.push(individual) } else { s.X = append(s.X, individual) W += weight } for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight { - h := heap.Pop(&s.L).(vsample) + h := s.L.pop() s.X = append(s.X, h) W += h.weight } @@ -177,26 +176,49 @@ func (s *Varopt) Tau() float64 { return s.tau } -func (b largeHeap) Len() int { - return len(b) -} +func (lp *largeHeap) push(v vsample) { + l := append(*lp, v) + n := len(l) - 1 -func (b largeHeap) Swap(i, j int) { - b[i], b[j] = b[j], b[i] -} + // This copies the body of heap.up(). + j := n + for { + i := (j - 1) / 2 // parent + if i == j || l[j].weight >= l[i].weight { + break + } + l[i], l[j] = l[j], l[i] + j = i + } -func (b largeHeap) Less(i, j int) bool { - return b[i].weight < b[j].weight + *lp = l } -func (b *largeHeap) Push(x interface{}) { - *b = append(*b, x.(vsample)) -} +func (lp *largeHeap) pop() vsample { + l := *lp + n := len(l) - 1 + result := l[0] + l[0] = l[n] + l = l[:n] + + // This copies the body of heap.down(). + i := 0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && l[j2].weight < l[j1].weight { + j = j2 // = 2*i + 2 // right child + } + if l[j].weight >= l[i].weight { + break + } + l[i], l[j] = l[j], l[i] + i = j + } -func (b *largeHeap) Pop() interface{} { - old := *b - n := len(old) - x := old[n-1] - *b = old[0 : n-1] - return x + *lp = l + return result } From b00b2fa4dd994f7f8bcb12518bcc1d84de31c555 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 15 Nov 2019 12:42:56 -0800 Subject: [PATCH 18/28] Add a test --- varopt.go | 2 +- varopt_test.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/varopt.go b/varopt.go index f786d12..22751c2 100644 --- a/varopt.go +++ b/varopt.go @@ -49,7 +49,7 @@ type vsample struct { type largeHeap []vsample -var ErrInvalidWeight = fmt.Errorf("Negative or NaN weight") +var ErrInvalidWeight = fmt.Errorf("Negative, zero, or NaN weight") // New returns a new Varopt sampler with given capacity (i.e., // reservoir size) and random number generator. diff --git a/varopt_test.go b/varopt_test.go index 06f7bc1..c7f0b68 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -147,3 +147,17 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { [][][]varopt.Sample{smallBlocks, bigBlocks}, ) } + +func TestInvalidWeight(t *testing.T) { + rnd := rand.New(rand.NewSource(98887)) + v := varopt.New(1, rnd) + + err := v.Add(nil, math.NaN()) + require.Equal(t, err, varopt.ErrInvalidWeight) + + err = v.Add(nil, -1) + require.Equal(t, err, varopt.ErrInvalidWeight) + + err = v.Add(nil, 0) + require.Equal(t, err, varopt.ErrInvalidWeight) +} From f53b1a85587a206e19f8837dc367a7ff3f87acd9 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 15 Nov 2019 21:51:31 -0800 Subject: [PATCH 19/28] Move heap code to internal for better testing --- internal/sampleheap.go | 57 ++++++++++++++++++++++++ internal/sampleheap_test.go | 58 ++++++++++++++++++++++++ varopt.go | 88 ++++++++----------------------------- 3 files changed, 133 insertions(+), 70 deletions(-) create mode 100644 internal/sampleheap.go create mode 100644 internal/sampleheap_test.go diff --git a/internal/sampleheap.go b/internal/sampleheap.go new file mode 100644 index 0000000..c3d82a5 --- /dev/null +++ b/internal/sampleheap.go @@ -0,0 +1,57 @@ +// Copyright 2019, LightStep Inc. + +package internal + +type Vsample struct { + Sample interface{} + Weight float64 +} + +type SampleHeap []Vsample + +func (sh *SampleHeap) Push(v Vsample) { + l := append(*sh, v) + n := len(l) - 1 + + // This copies the body of heap.up(). + j := n + for { + i := (j - 1) / 2 // parent + if i == j || l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + j = i + } + + *sh = l +} + +func (sh *SampleHeap) Pop() Vsample { + l := *sh + n := len(l) - 1 + result := l[0] + l[0] = l[n] + l = l[:n] + + // This copies the body of heap.down(). + i := 0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && l[j2].Weight < l[j1].Weight { + j = j2 // = 2*i + 2 // right child + } + if l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + i = j + } + + *sh = l + return result +} diff --git a/internal/sampleheap_test.go b/internal/sampleheap_test.go new file mode 100644 index 0000000..50615c0 --- /dev/null +++ b/internal/sampleheap_test.go @@ -0,0 +1,58 @@ +// Copyright 2019, LightStep Inc. + +package internal_test + +import ( + "container/heap" + "math/rand" + "testing" + + "github.com/lightstep/varopt/internal" + "github.com/stretchr/testify/require" +) + +type simpleHeap []float64 + +func (s *simpleHeap) Len() int { + return len(*s) +} + +func (s *simpleHeap) Swap(i, j int) { + (*s)[i], (*s)[j] = (*s)[j], (*s)[i] +} + +func (s *simpleHeap) Less(i, j int) bool { + return (*s)[i] < (*s)[j] +} + +func (s *simpleHeap) Push(x interface{}) { + *s = append(*s, x.(float64)) +} + +func (s *simpleHeap) Pop() interface{} { + old := *s + n := len(old) + x := old[n-1] + *s = old[0 : n-1] + return x +} + +func TestLargeHeap(t *testing.T) { + var L internal.SampleHeap + var S simpleHeap + + for i := 0; i < 1e6; i++ { + v := rand.NormFloat64() + L.Push(internal.Vsample{Weight: v}) + heap.Push(&S, v) + } + + for len(S) > 0 { + v1 := heap.Pop(&S).(float64) + v2 := L.Pop().Weight + + require.Equal(t, v1, v2) + } + + require.Equal(t, 0, len(L)) +} diff --git a/varopt.go b/varopt.go index a535d35..44b5843 100644 --- a/varopt.go +++ b/varopt.go @@ -6,6 +6,8 @@ import ( "fmt" "math" "math/rand" + + "github.com/lightstep/varopt/internal" ) // Varopt implements the algorithm from Stream sampling for @@ -18,13 +20,13 @@ type Varopt struct { rnd *rand.Rand // Large-weight items stored in a min-heap. - L largeHeap + L internal.SampleHeap // Light-weight items. - T []vsample + T []internal.Vsample // Temporary buffer. - X []vsample + X []internal.Vsample // Current threshold tau float64 @@ -41,13 +43,6 @@ type Varopt struct { // passed in separately. type Sample interface{} -type vsample struct { - sample Sample - weight float64 -} - -type largeHeap []vsample - var ErrInvalidWeight = fmt.Errorf("Negative, zero, or NaN weight") // New returns a new Varopt sampler with given capacity (i.e., @@ -63,9 +58,9 @@ func New(capacity int, rnd *rand.Rand) *Varopt { // // An error will be returned if the weight is either negative or NaN. func (s *Varopt) Add(sample Sample, weight float64) error { - individual := vsample{ - sample: sample, - weight: weight, + individual := internal.Vsample{ + Sample: sample, + Weight: weight, } if weight <= 0 || math.IsNaN(weight) { @@ -76,7 +71,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error { s.totalWeight += weight if s.Size() < s.capacity { - s.L.push(individual) + s.L.Push(individual) return nil } @@ -86,16 +81,16 @@ func (s *Varopt) Add(sample Sample, weight float64) error { W := s.tau * float64(len(s.T)) if weight > s.tau { - s.L.push(individual) + s.L.Push(individual) } else { s.X = append(s.X, individual) W += weight } - for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight { - h := s.L.pop() + for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].Weight { + h := s.L.Pop() s.X = append(s.X, h) - W += h.weight + W += h.Weight } s.tau = W / float64(len(s.T)+len(s.X)-1) @@ -103,7 +98,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error { d := 0 for d < len(s.X) && r >= 0 { - wxd := s.X[d].weight + wxd := s.X[d].Weight r -= (1 - wxd/s.tau) d++ } @@ -136,10 +131,10 @@ func (s *Varopt) uniform() float64 { // GetOriginalWeight(i). func (s *Varopt) Get(i int) (Sample, float64) { if i < len(s.L) { - return s.L[i].sample, s.L[i].weight + return s.L[i].Sample, s.L[i].Weight } - return s.T[i-len(s.L)].sample, s.tau + return s.T[i-len(s.L)].Sample, s.tau } // GetOriginalWeight returns the original input weight of the sample @@ -147,10 +142,10 @@ func (s *Varopt) Get(i int) (Sample, float64) { // frequency from the adjusted sample weight. func (s *Varopt) GetOriginalWeight(i int) float64 { if i < len(s.L) { - return s.L[i].weight + return s.L[i].Weight } - return s.T[i-len(s.L)].weight + return s.T[i-len(s.L)].Weight } // Capacity returns the size of the reservoir. This is the maximum @@ -181,50 +176,3 @@ func (s *Varopt) TotalCount() int { func (s *Varopt) Tau() float64 { return s.tau } - -func (lp *largeHeap) push(v vsample) { - l := append(*lp, v) - n := len(l) - 1 - - // This copies the body of heap.up(). - j := n - for { - i := (j - 1) / 2 // parent - if i == j || l[j].weight >= l[i].weight { - break - } - l[i], l[j] = l[j], l[i] - j = i - } - - *lp = l -} - -func (lp *largeHeap) pop() vsample { - l := *lp - n := len(l) - 1 - result := l[0] - l[0] = l[n] - l = l[:n] - - // This copies the body of heap.down(). - i := 0 - for { - j1 := 2*i + 1 - if j1 >= n || j1 < 0 { // j1 < 0 after int overflow - break - } - j := j1 // left child - if j2 := j1 + 1; j2 < n && l[j2].weight < l[j1].weight { - j = j2 // = 2*i + 2 // right child - } - if l[j].weight >= l[i].weight { - break - } - l[i], l[j] = l[j], l[i] - i = j - } - - *lp = l - return result -} From 4b2b9c3b5b56890c39de16f9adf85b0306eeac8d Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 22 Nov 2019 22:48:50 -0800 Subject: [PATCH 20/28] Add a Reset method --- varopt.go | 11 +++++++++++ varopt_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/varopt.go b/varopt.go index 44b5843..37598cc 100644 --- a/varopt.go +++ b/varopt.go @@ -54,6 +54,17 @@ func New(capacity int, rnd *rand.Rand) *Varopt { } } +// Reset returns the sampler to its initial state, maintaining its +// capacity and random number source. +func (s *Varopt) Reset() { + s.L = s.L[:0] + s.T = s.T[:0] + s.X = s.X[:0] + s.tau = 0 + s.totalCount = 0 + s.totalWeight = 0 +} + // Add considers a new observation for the sample with given weight. // // An error will be returned if the weight is either negative or NaN. diff --git a/varopt_test.go b/varopt_test.go index c7f0b68..2e1a9cd 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -161,3 +161,28 @@ func TestInvalidWeight(t *testing.T) { err = v.Add(nil, 0) require.Equal(t, err, varopt.ErrInvalidWeight) } + +func TestReset(t *testing.T) { + const capacity = 10 + const insert = 100 + rnd := rand.New(rand.NewSource(98887)) + v := varopt.New(capacity, rnd) + + sum := 0. + for i := 1.; i <= insert; i++ { + v.Add(nil, i) + sum += i + } + + require.Equal(t, capacity, v.Size()) + require.Equal(t, insert, v.TotalCount()) + require.Equal(t, sum, v.TotalWeight()) + require.Less(t, 0., v.Tau()) + + v.Reset() + + require.Equal(t, 0, v.Size()) + require.Equal(t, 0, v.TotalCount()) + require.Equal(t, 0., v.TotalWeight()) + require.Equal(t, 0., v.Tau()) +} From d2bfbc820e6da2b0fcda49c0485fdb2abb718349 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 22 Nov 2019 23:16:41 -0800 Subject: [PATCH 21/28] Return the ejected sample from Add --- varopt.go | 13 +++++++---- varopt_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/varopt.go b/varopt.go index 37598cc..0b8a570 100644 --- a/varopt.go +++ b/varopt.go @@ -66,16 +66,18 @@ func (s *Varopt) Reset() { } // Add considers a new observation for the sample with given weight. +// If there is an item ejected from the same as a result, the item is +// returned. // // An error will be returned if the weight is either negative or NaN. -func (s *Varopt) Add(sample Sample, weight float64) error { +func (s *Varopt) Add(sample Sample, weight float64) (Sample, error) { individual := internal.Vsample{ Sample: sample, Weight: weight, } if weight <= 0 || math.IsNaN(weight) { - return ErrInvalidWeight + return nil, ErrInvalidWeight } s.totalCount++ @@ -83,7 +85,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error { if s.Size() < s.capacity { s.L.Push(individual) - return nil + return nil, nil } // the X <- {} step from the paper is not done here, @@ -113,19 +115,22 @@ func (s *Varopt) Add(sample Sample, weight float64) error { r -= (1 - wxd/s.tau) d++ } + var eject Sample if r < 0 { if d < len(s.X) { s.X[d], s.X[len(s.X)-1] = s.X[len(s.X)-1], s.X[d] } + eject = s.X[len(s.X)-1].Sample s.X = s.X[:len(s.X)-1] } else { ti := s.rnd.Intn(len(s.T)) s.T[ti], s.T[len(s.T)-1] = s.T[len(s.T)-1], s.T[ti] + eject = s.T[len(s.T)-1].Sample s.T = s.T[:len(s.T)-1] } s.T = append(s.T, s.X...) s.X = s.X[:0] - return nil + return eject, nil } func (s *Varopt) uniform() float64 { diff --git a/varopt_test.go b/varopt_test.go index 2e1a9cd..484cf47 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -152,13 +152,13 @@ func TestInvalidWeight(t *testing.T) { rnd := rand.New(rand.NewSource(98887)) v := varopt.New(1, rnd) - err := v.Add(nil, math.NaN()) + _, err := v.Add(nil, math.NaN()) require.Equal(t, err, varopt.ErrInvalidWeight) - err = v.Add(nil, -1) + _, err = v.Add(nil, -1) require.Equal(t, err, varopt.ErrInvalidWeight) - err = v.Add(nil, 0) + _, err = v.Add(nil, 0) require.Equal(t, err, varopt.ErrInvalidWeight) } @@ -186,3 +186,55 @@ func TestReset(t *testing.T) { require.Equal(t, 0., v.TotalWeight()) require.Equal(t, 0., v.Tau()) } + +func TestEject(t *testing.T) { + const capacity = 100 + const rounds = 10000 + const maxvalue = 10000 + + entries := make([]int, capacity+1) + freelist := make([]*int, capacity+1) + + for i := range entries { + freelist[i] = &entries[i] + } + + // Make two deterministically equal samplers + rnd1 := rand.New(rand.NewSource(98887)) + rnd2 := rand.New(rand.NewSource(98887)) + vsrc := rand.New(rand.NewSource(98887)) + + expected := varopt.New(capacity, rnd1) + ejector := varopt.New(capacity, rnd2) + + for i := 0; i < rounds; i++ { + value := vsrc.Intn(maxvalue) + weight := vsrc.ExpFloat64() + + _, _ = expected.Add(&value, weight) + + lastitem := len(freelist) - 1 + item := freelist[lastitem] + freelist = freelist[:lastitem] + + *item = value + eject, _ := ejector.Add(item, weight) + + if eject != nil { + freelist = append(freelist, eject.(*int)) + } + } + + require.Equal(t, expected.Size(), ejector.Size()) + require.Equal(t, expected.TotalCount(), ejector.TotalCount()) + require.Equal(t, expected.TotalWeight(), ejector.TotalWeight()) + require.Equal(t, expected.Tau(), ejector.Tau()) + + for i := 0; i < capacity; i++ { + expectItem, expectWeight := expected.Get(i) + ejectItem, ejectWeight := expected.Get(i) + + require.Equal(t, *expectItem.(*int), *ejectItem.(*int)) + require.Equal(t, expectWeight, ejectWeight) + } +} From be3c37ae3ccdac4cb2b999e6fb494c6546665bbc Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 22 Nov 2019 23:18:39 -0800 Subject: [PATCH 22/28] Typo --- varopt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/varopt.go b/varopt.go index 0b8a570..99fd01c 100644 --- a/varopt.go +++ b/varopt.go @@ -66,8 +66,8 @@ func (s *Varopt) Reset() { } // Add considers a new observation for the sample with given weight. -// If there is an item ejected from the same as a result, the item is -// returned. +// If there is an item ejected from the sample as a result, the item +// is returned to allow re-use of memory. // // An error will be returned if the weight is either negative or NaN. func (s *Varopt) Add(sample Sample, weight float64) (Sample, error) { From 30965f6f1f8f97ecc0ead16b2b5fbf7b2b1e0ad3 Mon Sep 17 00:00:00 2001 From: jmacd Date: Sun, 24 Nov 2019 21:29:48 -0800 Subject: [PATCH 23/28] Pre-allocate main buffers --- varopt.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/varopt.go b/varopt.go index 99fd01c..f573850 100644 --- a/varopt.go +++ b/varopt.go @@ -51,6 +51,8 @@ func New(capacity int, rnd *rand.Rand) *Varopt { return &Varopt{ capacity: capacity, rnd: rnd, + L: make(internal.SampleHeap, 0, capacity), + T: make(internal.SampleHeap, 0, capacity), } } From 8de72e7fedb787571ca65e12cdfe33fde01a485b Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 13 Jul 2021 16:24:45 -0700 Subject: [PATCH 24/28] Check for infinity, it creates trouble (#15) --- varopt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/varopt.go b/varopt.go index f573850..dbe605b 100644 --- a/varopt.go +++ b/varopt.go @@ -43,7 +43,7 @@ type Varopt struct { // passed in separately. type Sample interface{} -var ErrInvalidWeight = fmt.Errorf("Negative, zero, or NaN weight") +var ErrInvalidWeight = fmt.Errorf("Negative, Zero, Inf or NaN weight") // New returns a new Varopt sampler with given capacity (i.e., // reservoir size) and random number generator. @@ -78,7 +78,7 @@ func (s *Varopt) Add(sample Sample, weight float64) (Sample, error) { Weight: weight, } - if weight <= 0 || math.IsNaN(weight) { + if weight <= 0 || math.IsNaN(weight) || math.IsInf(weight, 1) { return nil, ErrInvalidWeight } From 69a5c9ff0cf1934f2d96025f682f54a5235c5670 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 13 Jul 2021 16:36:02 -0700 Subject: [PATCH 25/28] Use Apache Software Licence v2 (#17) --- LICENSE | 222 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 201 insertions(+), 21 deletions(-) diff --git a/LICENSE b/LICENSE index 017c0cd..261eeb9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,21 +1,201 @@ -The MIT License (MIT) - -Copyright (c) 2019 - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. From cc9013009aa09971e79b29725f8a1a0c3c8cb984 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 13 Jul 2021 21:36:31 -0700 Subject: [PATCH 26/28] Remove a test-only method (#16) * Remove a test-only method * update circle Go version * simplify circleci * mod update --- .circleci/config.yml | 7 +------ go.mod | 2 +- go.sum | 1 + simple/simple.go | 7 +------ varopt_test.go | 2 +- 5 files changed, 5 insertions(+), 14 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b31eb32..3b06f3c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,13 +4,8 @@ jobs: test: working_directory: ~/go/src/github.com/lightstep/varopt docker: - - image: circleci/golang:1.13 + - image: circleci/golang:1.15 steps: - - run: - name: "configure environment" - command: | - echo 'export GOPATH="$HOME/go"' >> $BASH_ENV - source $BASH_ENV - checkout - run: go test -v ./... diff --git a/go.mod b/go.mod index 1f0ead4..e61e0b6 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/lightstep/varopt -go 1.13 +go 1.15 require github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum index e863f51..8fdee58 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/simple/simple.go b/simple/simple.go index c086dfa..e3fb340 100644 --- a/simple/simple.go +++ b/simple/simple.go @@ -53,17 +53,12 @@ func (s *Simple) Get(i int) varopt.Sample { return s.buffer[i] } -// Get returns the number of items in the sample. If the reservoir is +// Size returns the number of items in the sample. If the reservoir is // full, Size() equals Capacity(). func (s *Simple) Size() int { return len(s.buffer) } -// Weight returns the adjusted weight of each item in the sample. -func (s *Simple) Weight() float64 { - return float64(s.observed) / float64(s.Size()) -} - // Count returns the number of items that were observed. func (s *Simple) Count() int { return s.observed diff --git a/varopt_test.go b/varopt_test.go index 484cf47..a56c691 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -113,7 +113,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { ss.Add(s) } - weight := ss.Weight() + weight := float64(ss.Count()) / float64(ss.Size()) for i := 0; i < ss.Size(); i++ { vsample.Add(ss.Get(i), weight) } From f3f0808ad3d6837c777c2ea8fcb4fb1ab97a0f98 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 10 Aug 2021 13:32:04 -0700 Subject: [PATCH 27/28] t-digest example --- examples/tdigest/flat.go | 39 +++++ examples/tdigest/stream.go | 300 ++++++++++++++++++++++++++++++++ examples/tdigest/stream_test.go | 178 +++++++++++++++++++ examples/tdigest/tdigest.go | 180 +++++++++++++++++++ 4 files changed, 697 insertions(+) create mode 100644 examples/tdigest/flat.go create mode 100644 examples/tdigest/stream.go create mode 100644 examples/tdigest/stream_test.go create mode 100644 examples/tdigest/tdigest.go diff --git a/examples/tdigest/flat.go b/examples/tdigest/flat.go new file mode 100644 index 0000000..26246d7 --- /dev/null +++ b/examples/tdigest/flat.go @@ -0,0 +1,39 @@ +package tdigest + +import ( + "sort" +) + +// Flat uses uniform probability density for each centroid. +type Flat struct { + t *TDigest +} + +var _ ProbabilityFunc = &Flat{} + +func newFlatFunc(t *TDigest) *Flat { + return &Flat{t: t} +} + +func (f *Flat) Probability(value float64) float64 { + digest := f.t.Digest + sumw := f.t.SumWeight + if value <= digest[0].Mean { + return digest[0].Weight / (2 * sumw) + } + if value >= digest[len(digest)-1].Mean { + return digest[len(digest)-1].Weight / (2 * sumw) + } + + idx := sort.Search(len(digest), func(i int) bool { + return value < digest[i].Mean + }) + + lower := value - digest[idx-1].Mean + upper := digest[idx].Mean - value + + if lower > upper { + return digest[idx-1].Weight / sumw + } + return digest[idx].Weight / sumw +} diff --git a/examples/tdigest/stream.go b/examples/tdigest/stream.go new file mode 100644 index 0000000..6adb78f --- /dev/null +++ b/examples/tdigest/stream.go @@ -0,0 +1,300 @@ +package tdigest + +import ( + "fmt" + "math" + "math/rand" + "sort" + + "github.com/lightstep/varopt" +) + +type ( + QuantileFunc interface { + Quantile(value float64) float64 + } + + Config struct { + // quality determines the number of centroids used to + // compute the digest. + quality Quality + + // currentSize is the current sample size. + currentSize int + + // completeSize is the complete sample size. + completeSize int + + // windowSize is the number of points between + // re-computing the digest. + windowSize int + } + + currentWindow struct { + sample *varopt.Varopt + temporary []float64 + } + + completeWindow struct { + sample *varopt.Varopt + temporary []float64 + freelist []*float64 + } + + Stream struct { + // Config stores parameters. + Config + + // rnd is used by both both samplers. + rnd *rand.Rand + + // Buffer and tinput used as inputs to T-digest. + // Buffer is ordered by this code, tinput is sorted + // while computing the digest. This is twice the + // currentSize, since at each round we combine the + // prior distribution with the current. Note that + // T-digest supports zero-weight inputs, so we always + // pass entire slices, even when partially filled. + buffer []Item + tinput Input + + // The cycle count tells the number of T-digest + // iterations, indicating which half of buffer and + // weight to fill. + cycle int + + // digest is the current estimated distribution. + digest *TDigest + + // flushed prevents Add() after Quantile() is called. + flushed bool + + // current stores a sample of the latest window of + // data. + current currentWindow + + // current stores a sample of the complete stream of data. + complete completeWindow + } +) + +var _ QuantileFunc = &Stream{} + +func NewConfig(quality Quality, currentSize, completeSize, windowSize int) Config { + return Config{ + quality: quality, + currentSize: currentSize, + completeSize: completeSize, + windowSize: windowSize, + } +} + +func NewStream(config Config, rnd *rand.Rand) *Stream { + bufsz := 2 * config.currentSize + if config.completeSize > bufsz { + bufsz = config.completeSize + } + + stream := &Stream{ + Config: config, + digest: New(config.quality), + rnd: rnd, + buffer: make([]Item, bufsz), + tinput: make([]*Item, bufsz), + } + + for i := range stream.buffer { + stream.tinput[i] = &stream.buffer[i] + } + + stream.current = stream.newCurrentWindow(config.currentSize) + stream.complete = stream.newCompleteWindow(config.completeSize) + return stream +} + +func (s *Stream) newCurrentWindow(sampleSize int) currentWindow { + return currentWindow{ + sample: varopt.New(sampleSize, s.rnd), + temporary: make([]float64, 0, s.windowSize), + } +} + +func (v *currentWindow) add(value, weight float64) error { + v.temporary = append(v.temporary, value) + _, err := v.sample.Add(&v.temporary[len(v.temporary)-1], weight) + return err +} + +func (v *currentWindow) full() bool { + return len(v.temporary) == cap(v.temporary) +} + +func (v *currentWindow) size() int { + return v.sample.Size() +} + +func (v *currentWindow) get(i int) (interface{}, float64) { + value, weight := v.sample.Get(i) + freq := weight / v.sample.GetOriginalWeight(i) + + if math.IsNaN(freq) { + panic(fmt.Sprintln("NaN here", value, weight)) + } + + return value, freq +} + +func (v *currentWindow) restart() { + v.temporary = v.temporary[:0] + v.sample.Reset() +} + +func (s *Stream) newCompleteWindow(sampleSize int) completeWindow { + cw := completeWindow{ + sample: varopt.New(sampleSize, s.rnd), + temporary: make([]float64, sampleSize+1), + freelist: make([]*float64, sampleSize+1), + } + for i := range cw.temporary { + cw.freelist[i] = &cw.temporary[i] + } + return cw +} + +func (v *completeWindow) add(value, weight float64) error { + lastdata := len(v.freelist) - 1 + data := v.freelist[lastdata] + v.freelist = v.freelist[:lastdata] + + *data = value + eject, err := v.sample.Add(data, weight) + + if err != nil { + v.freelist = append(v.freelist, data) + } else if eject != nil { + v.freelist = append(v.freelist, eject.(*float64)) + } + return err +} + +func (v *completeWindow) size() int { + return v.sample.Size() +} + +func (v *completeWindow) get(i int) (interface{}, float64) { + value, weight := v.sample.Get(i) + freq := weight / v.sample.GetOriginalWeight(i) + return value, freq +} + +func (s *Stream) Add(x float64) error { + var weight float64 + + if s.cycle == 0 { + weight = 1 + } else { + prob := s.digest.Probability(x) + weight = 1 / prob + } + + if err := s.current.add(x, weight); err != nil { + return err + } + + if !s.current.full() { + return nil + } + + return s.recompute() +} + +func (s *Stream) recompute() error { + // Recompute the T-Digest from the new weighted sample + // combined with the prior data. This computes a MAP + // estimate. + offset := s.currentSize * (s.cycle % 2) + + for i := 0; i < s.current.size(); i++ { + valueI, freq := s.current.get(i) + value := *(valueI.(*float64)) + + //fmt.Println("New W", value, freq) + + s.buffer[offset+i].Value = value + s.buffer[offset+i].Weight = freq + + s.complete.add(value, freq) + } + + // Fill in zero-weight values in case the sample size was + // smaller than capacity at the end of the stream. + for i := s.current.size(); i < s.currentSize; i++ { + s.buffer[offset+i] = Item{} + } + + if err := s.digest.Compute(s.tinput[0 : 2*s.currentSize]); err != nil { + return err + } + + s.current.restart() + s.cycle++ + return nil +} + +func (s *Stream) flush() error { + if s.flushed { + // Note: this could be fixed if needed. + return fmt.Errorf("Do not Add() after calling Quantile()") + } + s.flushed = true + + for i := 0; i < s.current.size(); i++ { + valueI, freq := s.current.get(i) + value := *(valueI.(*float64)) + + s.complete.add(value, freq) + } + + sumWeight := 0.0 + for i := 0; i < s.complete.size(); i++ { + valueI, freq := s.complete.get(i) + value := *(valueI.(*float64)) + + s.buffer[i].Value = value + s.buffer[i].Weight = freq + s.tinput[i] = &s.buffer[i] + sumWeight += freq + } + + s.tinput = s.tinput[0:s.complete.size()] + sort.Sort(&s.tinput) + + sum := 0.0 + for _, t := range s.tinput { + sum += t.Weight + t.Weight = sum / sumWeight + } + + return nil +} + +// Quantile returns the estimated value for a given quantile. +// +// Note: TDigest can implement QuantileFunc itself, but in this +// implementation we use the stream sample directly. See section 2.9 +// of the T-digest paper for recommendations about interpolating the +// CDF of a T-digest. +func (s *Stream) Quantile(quantile float64) float64 { + if !s.flushed { + s.flush() + } + + idx := sort.Search(len(s.tinput), func(i int) bool { + return quantile <= s.tinput[i].Weight + }) + + if idx == len(s.tinput) { + return s.tinput[len(s.tinput)-1].Value + } + return s.tinput[idx].Value +} diff --git a/examples/tdigest/stream_test.go b/examples/tdigest/stream_test.go new file mode 100644 index 0000000..f6a4ecd --- /dev/null +++ b/examples/tdigest/stream_test.go @@ -0,0 +1,178 @@ +package tdigest_test + +import ( + "math" + "math/rand" + "sort" + "testing" + + "github.com/lightstep/varopt/examples/tdigest" + "github.com/stretchr/testify/require" +) + +func TestStreamNormal(t *testing.T) { + const ( + mean = 10 + stddev = 10 + ) + + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .50, .1, .50, .5, .05, .050}, + func(rnd *rand.Rand) float64 { + return mean + stddev*rnd.NormFloat64() + }) +} + +func TestStreamExponential(t *testing.T) { + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .50, 1., 1.5, 1., .20, .050}, + func(rnd *rand.Rand) float64 { + return rnd.ExpFloat64() + }) +} + +func TestStreamAlternating(t *testing.T) { + count := 0 + testStreamDigest(t, + // Quantiles <0.66 == -1 + // Quantiles >=0.66 == +1 + []float64{.1, .5, .66, .67, .7, .8, .87, .9}, + // This performs poorly, but it's hard to show + // concisely with this test framework. At the 90th + // percentile, we finally see a +1 + []float64{10, 50, 66, 67, 70, 80, 21, 24}, + func(rnd *rand.Rand) float64 { + c := count + count++ + count = count % 3 + switch c { + case 0, 2: + return -1 + default: + return +1 + } + }) +} + +func TestStreamLinear(t *testing.T) { + count := 0 + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .5, .5, 1., .5, .5, .5}, + func(rnd *rand.Rand) float64 { + c := count + count++ + return float64(c) + }) +} + +func TestStreamUniform(t *testing.T) { + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .5, .5, 1., .5, .5, .5}, + func(rnd *rand.Rand) float64 { + return rnd.Float64() + }) +} + +func testStreamDigest(t *testing.T, quantiles, tolerance []float64, f func(rnd *rand.Rand) float64) { + const ( + quality = 5000 + sampleSize = 5000 + windowSize = 25000 + streamCount = 1000000 + ) + + rnd := rand.New(rand.NewSource(31181)) + correct := &CorrectQuantile{} + + stream := tdigest.NewStream(tdigest.NewConfig(quality, sampleSize, sampleSize, windowSize), rnd) + + for i := 0; i < streamCount; i++ { + value := f(rnd) + correct.Add(value) + err := stream.Add(value) + if err != nil { + t.Error("Stream add error", err) + } + } + + for i, q := range quantiles { + require.GreaterOrEqual(t, tolerance[i], math.Abs(correct.Distance(stream, q)), + "at quantile=%v", q) + } +} + +type Quantiler interface { + Quantile(float64) float64 +} + +type CorrectQuantile struct { + values []float64 + sorted bool +} + +// Distance returns quantile distance in percent units. +func (l *CorrectQuantile) Distance(qq Quantiler, quant float64) float64 { + value := qq.Quantile(quant) + actual := l.LookupQuantile(value) + return 100 * (actual - quant) +} + +func (l *CorrectQuantile) Add(f float64) { + l.values = append(l.values, f) + l.sorted = false +} + +func (l *CorrectQuantile) Quantile(f float64) float64 { + if len(l.values) == 0 { + return math.NaN() + } + if !l.sorted { + sort.Float64s(l.values) + l.sorted = true + } + quantileLocation := float64(len(l.values)) * f + if quantileLocation <= 0 { + return l.values[0] + } + if quantileLocation >= float64(len(l.values)-1) { + return l.values[len(l.values)-1] + } + beforeIndex := int(math.Floor(quantileLocation)) + afterIndex := beforeIndex + 1 + delta := l.values[afterIndex] - l.values[beforeIndex] + if delta == 0 { + return l.values[beforeIndex] + } + return l.values[beforeIndex] + delta*(quantileLocation-float64(beforeIndex)) +} + +func (l *CorrectQuantile) LookupQuantile(value float64) float64 { + if !l.sorted { + sort.Float64s(l.values) + l.sorted = true + } + + idx := sort.Search(len(l.values), func(i int) bool { + return l.values[i] >= value + }) + + if idx == 0 { + return 0 + } + + if idx == len(l.values) { + return 1 + } + + above := l.values[idx] + below := l.values[idx-1] + diff := above - below + if diff == 0 { + panic("impossible") + } + return (float64(idx) + (value-below)/diff) / float64(len(l.values)-1) +} diff --git a/examples/tdigest/tdigest.go b/examples/tdigest/tdigest.go new file mode 100644 index 0000000..ae51a4e --- /dev/null +++ b/examples/tdigest/tdigest.go @@ -0,0 +1,180 @@ +// Package tdigest uses the t-digest Algorithm 1 to compute an ordered +// list of non-overlapping centroids, which can be used to estimate +// quantiles and probability density. +// +// What T-digest calls a "compression" paramter is called Quality here. +// +// See the 2019 paper here: https://arxiv.org/pdf/1902.04023.pdf +package tdigest + +import ( + "fmt" + "math" + "sort" +) + +type ( + ProbabilityFunc interface { + Probability(value float64) float64 + } + + Centroid struct { + Sum float64 // Total sum of values + Weight float64 // Total weight of values + Mean float64 // Sum / Weight + Count int // Number of distinct values + } + + // Quality is a quality parameter, must be > 0. + // + // Forcing the compression parameter to be an integer + // simplifies the code, because each centroid / quantile has + // k(i) == 1. + Quality int + + // Item is a value/weight pair for input to the digest. + Item struct { + Value float64 + Weight float64 + } + + // input is for sorting Items. + Input []*Item + + // TDigest is computed using Algorithm 1 from the T-digest + // paper. + TDigest struct { + Quality Quality + SumWeight float64 + Digest []Centroid + + ProbabilityFunc + } +) + +var _ ProbabilityFunc = &TDigest{} + +var ( + ErrEmptyDataSet = fmt.Errorf("Empty data set") + ErrInvalidWeight = fmt.Errorf("Negative or NaN weight") + ErrInvalidInterp = fmt.Errorf("Unknown interpolation") +) + +// The T-digest scaling function maps quantile to index, where +// 0 <= index < quality +func (quality Quality) digestK(q float64) float64 { + return float64(quality) * (0.5 + math.Asin(2*q-1)/math.Pi) +} + +// The T-digest inverse-scaling function maps index to quantile. +func (quality Quality) inverseK(k float64) float64 { + if k > float64(quality) { + return 1 + } + return 0.5 * (math.Sin(math.Pi*(k/float64(quality)-0.5)) + 1) +} + +func New(quality Quality) *TDigest { + return &TDigest{ + Quality: quality, + Digest: make([]Centroid, 0, quality), + } +} + +func (t *TDigest) Compute(in Input) error { + t.Digest = t.Digest[:0] + + if len(in) == 0 { + return ErrEmptyDataSet + } + + // Compute the total weight, check for invalid weights. + // combine weights for equal values. + + sumWeight := 0.0 + + for _, it := range in { + we := it.Weight + if we < 0 || math.IsNaN(we) || math.IsInf(we, +1) { + return ErrInvalidWeight + } + + sumWeight += we + } + + // Both of the following loops require sorted data. + sort.Sort(&in) + + outIndex := 0 + for i := 0; i < len(in); i++ { + va := in[i].Value + we := in[i].Weight + + if we == 0 { + continue + } + + if outIndex != 0 && i != 0 && in[i-1].Value == va { + in[outIndex-1].Weight += we + continue + } + + in[outIndex].Value = va + in[outIndex].Weight = we + outIndex++ + } + + in = in[0:outIndex] + + // T-digest's Algorithm 1. The step above to de-duplicate + // values ensures that each bucket has a non-zero width. + digest := t.Digest + + qleft := 0.0 + qlimit := t.Quality.inverseK(1) + + current := Centroid{} + + for pos := 0; pos < len(in); pos++ { + vpos := in[pos].Value + wpos := in[pos].Weight + + q := qleft + (current.Weight+wpos)/sumWeight + + if q <= qlimit { + current.Sum += vpos * wpos + current.Weight += wpos + current.Count++ + continue + } + + if current.Count > 0 { + digest = append(digest, current) + } + qleft += current.Weight / sumWeight + + qlimit = t.Quality.inverseK(t.Quality.digestK(qleft) + 1) + current.Sum = vpos * wpos + current.Weight = wpos + current.Count = 1 + } + digest = append(digest, current) + + t.Digest = digest + t.SumWeight = sumWeight + t.ProbabilityFunc = newFlatFunc(t) + + return nil +} + +func (in *Input) Len() int { + return len(*in) +} + +func (in *Input) Swap(i, j int) { + (*in)[i], (*in)[j] = (*in)[j], (*in)[i] +} + +func (in *Input) Less(i, j int) bool { + return (*in)[i].Value < (*in)[j].Value +} From 30ae4c3f43ad20e1a1eb12f9125ec2e4de4f63c8 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 17 Oct 2024 23:23:04 -0700 Subject: [PATCH 28/28] missed file --- spline.go.dead | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 spline.go.dead diff --git a/spline.go.dead b/spline.go.dead new file mode 100644 index 0000000..973a2e8 --- /dev/null +++ b/spline.go.dead @@ -0,0 +1,72 @@ +package tdigest + +import ( + "fmt" + "math" + + "github.com/jmacd/gospline" +) + +// MonotoneSpline uses cubic monotone interpolation. +// TODO: If this works out, the gospline can be optimized a bit. +type MonotoneSpline struct { + t *TDigest + spline gospline.Spline + iwidth float64 +} + +var _ ProbabilityFunc = &MonotoneSpline{} + +func newSplineFunc(t *TDigest) *MonotoneSpline { + xvalues := make([]float64, len(t.Digest)+2) + yvalues := make([]float64, len(t.Digest)+2) + + // Place 0 and 1 probabilities + xvalues[0] = -1e100 + xvalues[len(t.Digest)+1] = +1e100 + yvalues[0] = 0 + yvalues[len(t.Digest)+1] = 1 + + cum := 0.0 + min := math.MaxFloat64 + max := -math.MaxFloat64 + for i, c := range t.Digest { + mean := c.Sum / c.Weight + c.Mean = mean + cum += c.Weight / t.SumWeight + xvalues[i+1] = mean + yvalues[i+1] = cum + min = math.Min(min, mean) + max = math.Max(max, mean) + } + + return &MonotoneSpline{ + t: t, + spline: gospline.NewMonotoneSpline(xvalues, yvalues), + iwidth: (max - min) / (8 * float64(len(t.Digest))), + } +} + +// Probability interpolates the PDF. Uses cubic monotone spline +// interpolation. +// +// TODO Instead of computing At() twice, can we interpolate the PDF +// directly? +func (m *MonotoneSpline) Probability(value float64) float64 { + if value < m.t.Digest[0].Mean { + return m.t.Digest[0].Weight / (2 * m.t.SumWeight) + } + if value > m.t.Digest[len(m.t.Digest)-1].Mean { + return m.t.Digest[len(m.t.Digest)-1].Weight / (2 * m.t.SumWeight) + } + + // TODO: Note that this not symmetrical, should be. Requires + // care on the boundaries to center this (vs. the above + // special cases). + below := m.spline.At(value) + above := m.spline.At(value + m.iwidth) + if above == below { + fmt.Println("Zero prob at", value, below, above) + } + return (above - below) / (m.iwidth) +}