Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Exporter/Prometheus: Simplify histogram creation. (#1061)
Browse files Browse the repository at this point in the history
* Exporter/Prometheus: Simplify histogram creation.

* Add a test on unordered bucket bounds.

* Fix review comments.
  • Loading branch information
songy23 authored Mar 13, 2019
1 parent bf23ae1 commit 084f0af
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 20 deletions.
21 changes: 2 additions & 19 deletions exporter/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"log"
"net/http"
"sort"
"sync"

"go.opencensus.io/internal"
Expand Down Expand Up @@ -213,25 +212,9 @@ func (c *collector) toMetric(desc *prometheus.Desc, v *view.View, row *view.Row)
case *view.DistributionData:
points := make(map[float64]uint64)
// Histograms are cumulative in Prometheus.
// 1. Sort buckets in ascending order but, retain
// their indices for reverse lookup later on.
// TODO: If there is a guarantee that distribution elements
// are always sorted, then skip the sorting.
indicesMap := make(map[float64]int)
buckets := make([]float64, 0, len(v.Aggregation.Buckets))
for i, b := range v.Aggregation.Buckets {
if _, ok := indicesMap[b]; !ok {
indicesMap[b] = i
buckets = append(buckets, b)
}
}
sort.Float64s(buckets)

// 2. Now that the buckets are sorted by magnitude
// we can create cumulative indicesmap them back by reverse index
// Get cumulative bucket counts.
cumCount := uint64(0)
for _, b := range buckets {
i := indicesMap[b]
for i, b := range v.Aggregation.Buckets {
cumCount += uint64(data.CountPerBucket[i])
points[b] = cumCount
}
Expand Down
89 changes: 88 additions & 1 deletion exporter/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,94 @@ func TestCumulativenessFromHistograms(t *testing.T) {
// We want the results that look like this:
// 1: [0.25] | 1 + prev(i) = 1 + 0 = 1
// 5: [1.45] | 1 + prev(i) = 1 + 1 = 2
// 10: [] | 1 + prev(i) = 1 + 2 = 3
// 10: [7.69] | 1 + prev(i) = 1 + 2 = 3
// 20: [12] | 1 + prev(i) = 1 + 3 = 4
// 50: [] | 0 + prev(i) = 0 + 4 = 4
// 100: [] | 0 + prev(i) = 0 + 4 = 4
// 250: [187.12, 199.9, 245.67] | 3 + prev(i) = 3 + 4 = 7
wantLines := []string{
`cash_register_bucket{le="1"} 1`,
`cash_register_bucket{le="5"} 2`,
`cash_register_bucket{le="10"} 3`,
`cash_register_bucket{le="20"} 4`,
`cash_register_bucket{le="50"} 4`,
`cash_register_bucket{le="100"} 4`,
`cash_register_bucket{le="250"} 7`,
`cash_register_bucket{le="+Inf"} 7`,
`cash_register_sum 654.0799999999999`, // Summation of the input values
`cash_register_count 7`,
}

ctx := context.Background()
ms := make([]stats.Measurement, 0, len(values))
for _, value := range values {
mx := m.M(value)
ms = append(ms, mx)
}
stats.Record(ctx, ms...)

// Give the recorder ample time to process recording
<-time.After(10 * reportPeriod)

cst := httptest.NewServer(exporter)
defer cst.Close()
res, err := http.Get(cst.URL)
if err != nil {
t.Fatalf("http.Get error: %v", err)
}
blob, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatalf("Read body error: %v", err)
}
str := strings.Trim(string(blob), "\n")
lines := strings.Split(str, "\n")
nonComments := make([]string, 0, len(lines))
for _, line := range lines {
if !strings.Contains(line, "#") {
nonComments = append(nonComments, line)
}
}

got := strings.Join(nonComments, "\n")
want := strings.Join(wantLines, "\n")
if got != want {
t.Fatalf("\ngot:\n%s\n\nwant:\n%s\n", got, want)
}
}

func TestHistogramUnorderedBucketBounds(t *testing.T) {
exporter, err := NewExporter(Options{})
if err != nil {
t.Fatalf("failed to create prometheus exporter: %v", err)
}
view.RegisterExporter(exporter)
reportPeriod := time.Millisecond
view.SetReportingPeriod(reportPeriod)

m := stats.Float64("tests/bills", "payments by denomination", stats.UnitDimensionless)
v := &view.View{
Name: "cash/register",
Description: "this is a test",
Measure: m,

// Intentionally used unordered and duplicated elements in the distribution
// to ensure unordered bucket bounds are handled.
Aggregation: view.Distribution(10, 5, 1, 1, 50, 5, 20, 100, 250),
}

if err := view.Register(v); err != nil {
t.Fatalf("Register error: %v", err)
}
defer view.Unregister(v)

// Give the reporter ample time to process registration
<-time.After(10 * reportPeriod)

values := []float64{0.25, 245.67, 12, 1.45, 199.9, 7.69, 187.12}
// We want the results that look like this:
// 1: [0.25] | 1 + prev(i) = 1 + 0 = 1
// 5: [1.45] | 1 + prev(i) = 1 + 1 = 2
// 10: [7.69] | 1 + prev(i) = 1 + 2 = 3
// 20: [12] | 1 + prev(i) = 1 + 3 = 4
// 50: [] | 0 + prev(i) = 0 + 4 = 4
// 100: [] | 0 + prev(i) = 0 + 4 = 4
Expand Down

0 comments on commit 084f0af

Please sign in to comment.