Skip to content

Commit

Permalink
Merge pull request #808 from jmacd/jmacd/agg_refactor
Browse files Browse the repository at this point in the history
Add aggregation.Kind and rename sdk/export/metric/aggregator to aggregation
  • Loading branch information
MrAlias committed Jun 10, 2020
2 parents e53841a + 8b58f4f commit 3b22e73
Show file tree
Hide file tree
Showing 26 changed files with 249 additions and 138 deletions.
18 changes: 9 additions & 9 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
Expand Down Expand Up @@ -216,11 +216,11 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {

desc := c.toDesc(record, labelKeys)

if hist, ok := agg.(aggregator.Histogram); ok {
if hist, ok := agg.(aggregation.Histogram); ok {
if err := c.exportHistogram(ch, hist, numberKind, desc, labels); err != nil {
return fmt.Errorf("exporting histogram: %w", err)
}
} else if dist, ok := agg.(aggregator.Distribution); ok {
} else if dist, ok := agg.(aggregation.Distribution); ok {
// TODO: summaries values are never being resetted.
// As measurements are recorded, new records starts to have less impact on these summaries.
// We should implement an solution that is similar to the Prometheus Clients
Expand All @@ -232,11 +232,11 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
if err := c.exportSummary(ch, dist, numberKind, desc, labels); err != nil {
return fmt.Errorf("exporting summary: %w", err)
}
} else if sum, ok := agg.(aggregator.Sum); ok {
} else if sum, ok := agg.(aggregation.Sum); ok {
if err := c.exportCounter(ch, sum, numberKind, desc, labels); err != nil {
return fmt.Errorf("exporting counter: %w", err)
}
} else if lastValue, ok := agg.(aggregator.LastValue); ok {
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
if err := c.exportLastValue(ch, lastValue, numberKind, desc, labels); err != nil {
return fmt.Errorf("exporting last value: %w", err)
}
Expand All @@ -248,7 +248,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
}

func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregator.LastValue, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregation.LastValue, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
lv, _, err := lvagg.LastValue()
if err != nil {
return fmt.Errorf("error retrieving last value: %w", err)
Expand All @@ -263,7 +263,7 @@ func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregato
return nil
}

func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Sum, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
v, err := sum.Sum()
if err != nil {
return fmt.Errorf("error retrieving counter: %w", err)
Expand All @@ -278,7 +278,7 @@ func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Su
return nil
}

func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.Distribution, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregation.Distribution, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
count, err := dist.Count()
if err != nil {
return fmt.Errorf("error retrieving count: %w", err)
Expand All @@ -305,7 +305,7 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D
return nil
}

func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator.Histogram, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregation.Histogram, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
buckets, err := hist.Histogram()
if err != nil {
return fmt.Errorf("error retrieving histogram: %w", err)
Expand Down
12 changes: 6 additions & 6 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/otel/api/label"

export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
Expand Down Expand Up @@ -98,7 +98,7 @@ func NewRawExporter(config Config) (*Exporter, error) {
} else {
for _, q := range config.Quantiles {
if q < 0 || q > 1 {
return nil, aggregator.ErrInvalidQuantile
return nil, aggregation.ErrInvalidQuantile
}
}
}
Expand Down Expand Up @@ -164,15 +164,15 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)

var expose expoLine

if sum, ok := agg.(aggregator.Sum); ok {
if sum, ok := agg.(aggregation.Sum); ok {
value, err := sum.Sum()
if err != nil {
return err
}
expose.Sum = value.AsInterface(kind)
}

if mmsc, ok := agg.(aggregator.MinMaxSumCount); ok {
if mmsc, ok := agg.(aggregation.MinMaxSumCount); ok {
count, err := mmsc.Count()
if err != nil {
return err
Expand All @@ -191,7 +191,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
}
expose.Min = min.AsInterface(kind)

if dist, ok := agg.(aggregator.Distribution); ok && len(e.config.Quantiles) != 0 {
if dist, ok := agg.(aggregation.Distribution); ok && len(e.config.Quantiles) != 0 {
summary := make([]expoQuantile, len(e.config.Quantiles))
expose.Quantiles = summary

Expand All @@ -208,7 +208,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
}
}
}
} else if lv, ok := agg.(aggregator.LastValue); ok {
} else if lv, ok := agg.(aggregation.LastValue); ok {
value, timestamp, err := lv.LastValue()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.opentelemetry.io/otel/exporters/metric/stdout"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestStdoutInvalidQuantile(t *testing.T) {
Quantiles: []float64{1.1, 0.9},
})
require.Error(t, err, "Invalid quantile error expected")
require.Equal(t, aggregator.ErrInvalidQuantile, err)
require.Equal(t, aggregation.ErrInvalidQuantile, err)
}

func TestStdoutTimestamp(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
Expand Down Expand Up @@ -117,7 +117,7 @@ func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.

func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
for _, r := range p.updates {
if err := f(r); err != nil && !errors.Is(err, aggregator.ErrNoData) {
if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) {
return err
}
}
Expand Down
12 changes: 6 additions & 6 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -230,17 +230,17 @@ func Record(r export.Record) (*metricpb.Metric, error) {
d := r.Descriptor()
l := r.Labels()
switch a := r.Aggregator().(type) {
case aggregator.MinMaxSumCount:
case aggregation.MinMaxSumCount:
return minMaxSumCount(d, l, a)
case aggregator.Sum:
case aggregation.Sum:
return sum(d, l, a)
default:
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
}
}

// sum transforms a Sum Aggregator into an OTLP Metric.
func sum(desc *metric.Descriptor, labels *label.Set, a aggregator.Sum) (*metricpb.Metric, error) {
func sum(desc *metric.Descriptor, labels *label.Set, a aggregation.Sum) (*metricpb.Metric, error) {
sum, err := a.Sum()
if err != nil {
return nil, err
Expand Down Expand Up @@ -275,7 +275,7 @@ func sum(desc *metric.Descriptor, labels *label.Set, a aggregator.Sum) (*metricp

// minMaxSumCountValue returns the values of the MinMaxSumCount Aggregator
// as discret values.
func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum metric.Number, count int64, err error) {
func minMaxSumCountValues(a aggregation.MinMaxSumCount) (min, max, sum metric.Number, count int64, err error) {
if min, err = a.Min(); err != nil {
return
}
Expand All @@ -292,7 +292,7 @@ func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum metric.Num
}

// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregator.MinMaxSumCount) (*metricpb.Metric, error) {
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) {
min, max, sum, count, err := minMaxSumCountValues(a)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestMinMaxSumCountValue(t *testing.T) {

// Prior to checkpointing ErrNoData should be returned.
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.EqualError(t, err, aggregator.ErrNoData.Error())
assert.EqualError(t, err, aggregation.ErrNoData.Error())

// Checkpoint to set non-zero values
mmsc.Checkpoint(&metric.Descriptor{})
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
mmsc := minmaxsumcount.New(&metric.Descriptor{})
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err)
assert.Equal(t, aggregator.ErrNoData, err)
assert.Equal(t, aggregation.ErrNoData, err)
}

func TestSumMetricDescriptor(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -67,7 +67,7 @@ type checkpointSet struct {

func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
for _, r := range m.records {
if err := fn(r); err != nil && err != aggregator.ErrNoData {
if err := fn(r); err != nil && err != aggregation.ErrNoData {
return err
}
}
Expand Down
Loading

0 comments on commit 3b22e73

Please sign in to comment.