Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-state Aggregator and test refactor #812

Merged
merged 25 commits into from
Jun 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 4 additions & 21 deletions api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package internal_test

import (
"context"
"strings"
"testing"

"go.opentelemetry.io/otel/api/global"
Expand All @@ -26,9 +25,7 @@ import (
"go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand All @@ -37,6 +34,7 @@ var Must = metric.Must
// benchFixture is copied from sdk/metric/benchmark_test.go.
// TODO refactor to share this code.
type benchFixture struct {
export.AggregationSelector
accumulator *sdk.Accumulator
meter metric.Meter
B *testing.B
Expand All @@ -47,30 +45,15 @@ var _ metric.Provider = &benchFixture{}
func newFixture(b *testing.B) *benchFixture {
b.ReportAllocs()
bf := &benchFixture{
B: b,
B: b,
AggregationSelector: test.AggregationSelector(),
}

bf.accumulator = sdk.NewAccumulator(bf)
bf.meter = metric.WrapMeterImpl(bf.accumulator, "test")
return bf
}

func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.CounterKind:
return sum.New()
case metric.ValueRecorderKind:
if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") {
return minmaxsumcount.New(descriptor)
} else if strings.HasSuffix(descriptor.Name(), "ddsketch") {
return ddsketch.New(descriptor, ddsketch.NewDefaultConfig())
} else if strings.HasSuffix(descriptor.Name(), "array") {
return ddsketch.New(descriptor, ddsketch.NewDefaultConfig())
}
}
return nil
}

func (*benchFixture) Process(export.Record) error {
return nil
}
Expand Down
69 changes: 39 additions & 30 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -99,11 +100,13 @@ func TestStdoutTimestamp(t *testing.T) {

ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
lvagg := lastvalue.New()

lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
lvagg.Checkpoint(&desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, lvagg)
checkpointSet.Add(&desc, ckpt)

if err := exporter.Export(ctx, checkpointSet); err != nil {
t.Fatal("Unexpected export error: ", err)
Expand Down Expand Up @@ -144,11 +147,13 @@ func TestStdoutCounterFormat(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()

cagg, ckpt := test.Unslice2(sum.New(2))

aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
cagg.Checkpoint(&desc)
require.NoError(t, cagg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, cagg, kv.String("A", "B"), kv.String("C", "D"))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

fix.Export(checkpointSet)

Expand All @@ -161,11 +166,12 @@ func TestStdoutLastValueFormat(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(&desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

fix.Export(checkpointSet)

Expand All @@ -178,12 +184,14 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := minmaxsumcount.New(&desc)

magg, ckpt := test.Unslice2(minmaxsumcount.New(2, &desc))

aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
magg.Checkpoint(&desc)
require.NoError(t, magg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

fix.Export(checkpointSet)

Expand All @@ -198,15 +206,15 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := array.New()
aagg, ckpt := test.Unslice2(array.New(2))

for i := 0; i < 1000; i++ {
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(float64(i)+0.5), &desc)
aggtest.CheckedUpdate(fix.t, aagg, metric.NewFloat64Number(float64(i)+0.5), &desc)
}

magg.Checkpoint(&desc)
require.NoError(t, aagg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

fix.Export(checkpointSet)

Expand Down Expand Up @@ -239,28 +247,27 @@ func TestStdoutValueRecorderFormat(t *testing.T) {

func TestStdoutNoData(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
for name, tc := range map[string]export.Aggregator{
"ddsketch": ddsketch.New(&desc, ddsketch.NewDefaultConfig()),
"minmaxsumcount": minmaxsumcount.New(&desc),
} {
tc := tc
t.Run(name, func(t *testing.T) {

runTwoAggs := func(agg, ckpt export.Aggregator) {
t.Run(fmt.Sprintf("%T", agg), func(t *testing.T) {
t.Parallel()

fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(testResource)

magg := tc
magg.Checkpoint(&desc)
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, magg)
checkpointSet.Add(&desc, ckpt)

fix.Export(checkpointSet)

require.Equal(t, `{"updates":null}`, fix.Output())
})
}

runTwoAggs(test.Unslice2(ddsketch.New(2, &desc, ddsketch.NewDefaultConfig())))
runTwoAggs(test.Unslice2(minmaxsumcount.New(2, &desc)))
}

func TestStdoutLastValueNotSet(t *testing.T) {
Expand All @@ -269,8 +276,9 @@ func TestStdoutLastValueNotSet(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
lvagg.Checkpoint(&desc)

lvagg, ckpt := test.Unslice2(lastvalue.New(2))
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -319,11 +327,12 @@ func TestStdoutResource(t *testing.T) {
checkpointSet := test.NewCheckpointSet(tc.res)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(&desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))

checkpointSet.Add(&desc, lvagg, tc.attrs...)
checkpointSet.Add(&desc, ckpt, tc.attrs...)

fix.Export(checkpointSet)

Expand Down
80 changes: 38 additions & 42 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ package test
import (
"context"
"errors"
"reflect"
"sync"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -36,13 +33,34 @@ type mapkey struct {
distinct label.Distinct
}

// CheckpointSet is useful for testing Exporters.
type CheckpointSet struct {
sync.RWMutex
records map[mapkey]export.Record
updates []export.Record
resource *resource.Resource
}

// NoopAggregator is useful for testing Exporters.
type NoopAggregator struct{}

var _ export.Aggregator = (*NoopAggregator)(nil)

// Update implements export.Aggregator.
func (*NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor) error {
return nil
}

// SynchronizedCopy implements export.Aggregator.
func (*NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
return nil
}

// Merge implements export.Aggregator.
func (*NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
return nil
}

// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
Expand All @@ -52,12 +70,13 @@ func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
}
}

// Reset clears the Aggregator state.
func (p *CheckpointSet) Reset() {
p.records = make(map[mapkey]export.Record)
p.updates = nil
}

// Add a new descriptor to a Checkpoint.
// Add a new record to a CheckpointSet.
//
// If there is an existing record with the same descriptor and labels,
// the stored aggregator will be returned and should be merged.
Expand All @@ -78,43 +97,6 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return newAgg, true
}

func createNumber(desc *metric.Descriptor, v float64) metric.Number {
if desc.NumberKind() == metric.Float64NumberKind {
return metric.NewFloat64Number(v)
}
return metric.NewInt64Number(int64(v))
}

func (p *CheckpointSet) AddLastValue(desc *metric.Descriptor, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, lastvalue.New(), v, labels...)
}

func (p *CheckpointSet) AddCounter(desc *metric.Descriptor, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, sum.New(), v, labels...)
}

func (p *CheckpointSet) AddValueRecorder(desc *metric.Descriptor, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, array.New(), v, labels...)
}

func (p *CheckpointSet) AddHistogramValueRecorder(desc *metric.Descriptor, boundaries []float64, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, histogram.New(desc, boundaries), v, labels...)
}

func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.Aggregator, v float64, labels ...kv.KeyValue) {
ctx := context.Background()
// Updates and checkpoint the new aggregator
_ = newAgg.Update(ctx, createNumber(desc, v), desc)
newAgg.Checkpoint(desc)

// Try to add this aggregator to the CheckpointSet
agg, added := p.Add(desc, newAgg, labels...)
if !added {
// An aggregator already exist for this descriptor and label set, we should merge them.
_ = agg.Merge(newAgg, desc)
}
}

func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
for _, r := range p.updates {
if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) {
Expand All @@ -123,3 +105,17 @@ func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
}
return nil
}

// Takes a slice of []some.Aggregator and returns a slice of []export.Aggregator
func Unslice2(sl interface{}) (one, two export.Aggregator) {
slv := reflect.ValueOf(sl)
if slv.Type().Kind() != reflect.Slice {
panic("Invalid Unslice2")
}
if slv.Len() != 2 {
panic("Invalid Unslice2: length > 2")
}
one = slv.Index(0).Addr().Interface().(export.Aggregator)
two = slv.Index(1).Addr().Interface().(export.Aggregator)
return
}
30 changes: 30 additions & 0 deletions exporters/metric/test/test_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package test

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestUnslice(t *testing.T) {
in := make([]NoopAggregator, 2)

a, b := Unslice2(in)

require.Equal(t, a.(*NoopAggregator), &in[0])
require.Equal(t, b.(*NoopAggregator), &in[1])
}
Loading