Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Context arguments from Aggregator.Checkpoint and Integrator.Process #803

Merged
merged 7 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
return nil
}

func (*benchFixture) Process(context.Context, export.Record) error {
func (*benchFixture) Process(export.Record) error {
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestStdoutTimestamp(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
lvagg.Checkpoint(ctx, &desc)
lvagg.Checkpoint(&desc)

checkpointSet.Add(&desc, lvagg)

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestStdoutCounterFormat(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
cagg.Checkpoint(fix.ctx, &desc)
cagg.Checkpoint(&desc)

checkpointSet.Add(&desc, cagg, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -163,7 +163,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(fix.ctx, &desc)
lvagg.Checkpoint(&desc)

checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -181,7 +181,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
magg := minmaxsumcount.New(&desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
magg.Checkpoint(fix.ctx, &desc)
magg.Checkpoint(&desc)

checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -204,7 +204,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(float64(i)+0.5), &desc)
}

magg.Checkpoint(fix.ctx, &desc)
magg.Checkpoint(&desc)

checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestStdoutNoData(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)

magg := tc
magg.Checkpoint(fix.ctx, &desc)
magg.Checkpoint(&desc)

checkpointSet.Add(&desc, magg)

Expand All @@ -270,7 +270,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
lvagg.Checkpoint(fix.ctx, &desc)
lvagg.Checkpoint(&desc)

checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -321,7 +321,7 @@ func TestStdoutResource(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(fix.ctx, &desc)
lvagg.Checkpoint(&desc)

checkpointSet.Add(&desc, lvagg, tc.attrs...)

Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.
ctx := context.Background()
// Updates and checkpoint the new aggregator
_ = newAgg.Update(ctx, createNumber(desc, v), desc)
newAgg.Checkpoint(ctx, desc)
newAgg.Checkpoint(desc)

// Try to add this aggregator to the CheckpointSet
agg, added := p.Add(desc, newAgg, labels...)
Expand Down
10 changes: 5 additions & 5 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
assert.EqualError(t, err, aggregator.ErrNoData.Error())

// Checkpoint to set non-zero values
mmsc.Checkpoint(context.Background(), &metric.Descriptor{})
mmsc.Checkpoint(&metric.Descriptor{})
min, max, sum, count, err := minMaxSumCountValues(mmsc)
if assert.NoError(t, err) {
assert.Equal(t, min, metric.NewInt64Number(1))
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
return
}
mmsc.Checkpoint(ctx, &metric.Descriptor{})
mmsc.Checkpoint(&metric.Descriptor{})
for _, test := range tests {
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
metric.WithDescription(test.description),
Expand All @@ -165,7 +165,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
mmsc := minmaxsumcount.New(&desc)
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
mmsc.Checkpoint(context.Background(), &desc)
mmsc.Checkpoint(&desc)
expected := []*metricpb.SummaryDataPoint{
{
Count: 2,
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestSumInt64DataPoints(t *testing.T) {
labels := label.NewSet()
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
s.Checkpoint(&desc)
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
Expand All @@ -275,7 +275,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
labels := label.NewSet()
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
s.Checkpoint(&desc)
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints)
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
default:
t.Fatalf("invalid number kind: %v", r.nKind)
}
agg.Checkpoint(ctx, &desc)
agg.Checkpoint(&desc)

equiv := r.resource.Equivalent()
resources[equiv] = r.resource
Expand Down
17 changes: 9 additions & 8 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ type Integrator interface {

// Process is called by the SDK once per internal record,
// passing the export Record (a Descriptor, the corresponding
// Labels, and the checkpointed Aggregator).
//
// The Context argument originates from the controller that
// orchestrates collection.
Process(ctx context.Context, record Record) error
// Labels, and the checkpointed Aggregator). This call has no
// Context argument because it is expected to perform only
// computation. An SDK is not expected to call exporters from
// with Process, use a controller for that (see
// ./controllers/{pull,push}.
Process(record Record) error
}

// AggregationSelector supports selecting the kind of Aggregator to
Expand Down Expand Up @@ -119,9 +120,9 @@ type Aggregator interface {
// accessed using by converting to one a suitable interface
// types in the `aggregator` sub-package.
//
// The Context argument originates from the controller that
// orchestrates collection.
Checkpoint(context.Context, *metric.Descriptor)
// This call has no Context argument because it is expected to
// perform only computation.
Checkpoint(*metric.Descriptor)

// Merge combines the checkpointed state from the argument
// aggregator into this aggregator's checkpointed state.
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Aggregator) Points() ([]metric.Number, error) {

// Checkpoint saves the current state and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, nil
c.lock.Unlock()
Expand Down
17 changes: 5 additions & 12 deletions sdk/metric/aggregator/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package array

import (
"context"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -66,8 +65,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}

ctx := context.Background()
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)

all.Sort()

Expand Down Expand Up @@ -116,8 +114,6 @@ type mergeTest struct {
}

func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
ctx := context.Background()

descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

agg1 := New()
Expand Down Expand Up @@ -145,8 +141,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}

agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)

test.CheckedMerge(t, agg1, agg2, descriptor)

Expand Down Expand Up @@ -213,16 +209,14 @@ func TestArrayErrors(t *testing.T) {
require.Error(t, err)
require.Equal(t, err, aggregator.ErrNoData)

ctx := context.Background()

descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

test.CheckedUpdate(t, agg, metric.Number(0), descriptor)

if profile.NumberKind == metric.Float64NumberKind {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)

count, err := agg.Count()
require.Equal(t, int64(1), count, "NaN value was not counted")
Expand Down Expand Up @@ -275,7 +269,6 @@ func TestArrayFloat64(t *testing.T) {

all := test.NewNumbers(metric.Float64NumberKind)

ctx := context.Background()
agg := New()

for _, f := range fpsf(1) {
Expand All @@ -288,7 +281,7 @@ func TestArrayFloat64(t *testing.T) {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
}

agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)

all.Sort()

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *Aggregator) toNumber(f float64) metric.Number {

// Checkpoint saves the current state and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
replace := sdk.NewDDSketch(c.cfg)

c.lock.Lock()
Expand Down
10 changes: 3 additions & 7 deletions sdk/metric/aggregator/ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ddsketch

import (
"context"
"fmt"
"testing"

Expand All @@ -31,8 +30,6 @@ type updateTest struct {
}

func (ut *updateTest) run(t *testing.T, profile test.Profile) {
ctx := context.Background()

descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, NewDefaultConfig())

Expand All @@ -47,7 +44,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}

agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)

all.Sort()

Expand Down Expand Up @@ -91,7 +88,6 @@ type mergeTest struct {
}

func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

agg1 := New(descriptor, NewDefaultConfig())
Expand Down Expand Up @@ -122,8 +118,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}

agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)

test.CheckedMerge(t, agg1, agg2, descriptor)

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
c.lock.Unlock()
Expand Down
14 changes: 4 additions & 10 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package histogram_test

import (
"context"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -81,7 +80,6 @@ func TestHistogramPositiveAndNegative(t *testing.T) {

// Validates count, sum and buckets for a given profile and policy
func testHistogram(t *testing.T, profile test.Profile, policy policy) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

agg := histogram.New(descriptor, boundaries)
Expand All @@ -94,7 +92,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}

agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)

all.Sort()

Expand Down Expand Up @@ -137,8 +135,6 @@ func TestHistogramInitial(t *testing.T) {
}

func TestHistogramMerge(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

Expand All @@ -158,8 +154,8 @@ func TestHistogramMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}

agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)

test.CheckedMerge(t, agg1, agg2, descriptor)

Expand Down Expand Up @@ -192,13 +188,11 @@ func TestHistogramMerge(t *testing.T) {
}

func TestHistogramNotSet(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

agg := histogram.New(descriptor, boundaries)
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)

asum, err := agg.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
}

// Checkpoint atomically saves the current value.
func (g *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
func (g *Aggregator) Checkpoint(*metric.Descriptor) {
g.checkpoint = atomic.LoadPointer(&g.current)
}

Expand Down
Loading