Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Metrics Integrator to Processor #863

Merged
merged 2 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Config struct {
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// using the recommended selector and standard integrator. See the pull.Options.
// using the recommended selector and standard processor. See the pull.Options.
func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) {
if config.Registry == nil {
config.Registry = prometheus.NewRegistry()
Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller

// NewExportPipeline sets up a complete export pipeline with the
// recommended setup, chaining a NewRawExporter into the recommended
// selectors and integrators.
// selectors and processors.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) {
exporter, err := NewRawExporter(config)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return newAgg, true
}

// ForEach does not use ExportKindSelected: use a real Integrator to
// ForEach does not use ExportKindSelected: use a real Processor to
// test ExportKind functionality.
func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Record) error) error {
for _, r := range p.updates {
Expand Down
6 changes: 3 additions & 3 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -117,8 +117,8 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}

selector := simple.NewWithExactDistribution()
integrator := integrator.New(selector, metricsdk.PassThroughExporter)
pusher := push.New(integrator, exp)
processor := processor.New(selector, metricsdk.PassThroughExporter)
pusher := push.New(processor, exp)
pusher.Start()

ctx := context.Background()
Expand Down
28 changes: 14 additions & 14 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)

// Integrator is responsible for deciding which kind of aggregation to
// Processor is responsible for deciding which kind of aggregation to
// use (via AggregatorSelector), gathering exported results from the
// SDK during collection, and deciding over which dimensions to group
// the exported data.
Expand All @@ -42,9 +42,9 @@ import (
//
// The `Process` method is called during collection in a
// single-threaded context from the SDK, after the aggregator is
// checkpointed, allowing the integrator to build the set of metrics
// checkpointed, allowing the processor to build the set of metrics
// currently being exported.
type Integrator interface {
type Processor interface {
// AggregatorSelector is responsible for selecting the
// concrete type of Aggregator used for a metric in the SDK.
//
Expand Down Expand Up @@ -177,18 +177,18 @@ type Exporter interface {
// The Context comes from the controller that initiated
// collection.
//
// The CheckpointSet interface refers to the Integrator that just
// The CheckpointSet interface refers to the Processor that just
// completed collection.
Export(context.Context, CheckpointSet) error

// ExportKindSelector is an interface used by the Integrator
// ExportKindSelector is an interface used by the Processor
// in deciding whether to compute Delta or Cumulative
// Aggregations when passing Records to this Exporter.
ExportKindSelector
}

// ExportKindSelector is a sub-interface of Exporter used to indicate
// whether the Integrator should compute Delta or Cumulative
// whether the Processor should compute Delta or Cumulative
// Aggregations.
type ExportKindSelector interface {
// ExportKindFor should return the correct ExportKind that
Expand All @@ -198,7 +198,7 @@ type ExportKindSelector interface {
}

// CheckpointSet allows a controller to access a complete checkpoint of
// aggregated metrics from the Integrator. This is passed to the
// aggregated metrics from the Processor. This is passed to the
// Exporter which may then use ForEach to iterate over the collection
// of aggregated metrics.
type CheckpointSet interface {
Expand All @@ -219,9 +219,9 @@ type CheckpointSet interface {

// Locker supports locking the checkpoint set. Collection
// into the checkpoint set cannot take place (in case of a
// stateful integrator) while it is locked.
// stateful processor) while it is locked.
//
// The Integrator attached to the Accumulator MUST be called
// The Processor attached to the Accumulator MUST be called
// with the lock held.
sync.Locker

Expand All @@ -232,7 +232,7 @@ type CheckpointSet interface {
}

// Metadata contains the common elements for exported metric data that
// are shared by the Accumulator->Integrator and Integrator->Exporter
// are shared by the Accumulator->Processor and Processor->Exporter
// steps.
type Metadata struct {
descriptor *metric.Descriptor
Expand All @@ -241,14 +241,14 @@ type Metadata struct {
}

// Accumulation contains the exported data for a single metric instrument
// and label set, as prepared by an Accumulator for the Integrator.
// and label set, as prepared by an Accumulator for the Processor.
type Accumulation struct {
Metadata
aggregator Aggregator
}

// Record contains the exported data for a single metric instrument
// and label set, as prepared by the Integrator for the Exporter.
// and label set, as prepared by the Processor for the Exporter.
// This includes the effective start and end time for the aggregation.
type Record struct {
Metadata
Expand All @@ -274,7 +274,7 @@ func (m Metadata) Resource() *resource.Resource {
}

// NewAccumulation allows Accumulator implementations to construct new
// Accumulations to send to Integrators. The Descriptor, Labels, Resource,
// Accumulations to send to Processors. The Descriptor, Labels, Resource,
// and Aggregator represent aggregate metric events received over a single
// collection period.
func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Accumulation {
Expand All @@ -294,7 +294,7 @@ func (r Accumulation) Aggregator() Aggregator {
return r.aggregator
}

// NewRecord allows Integrator implementations to construct export
// NewRecord allows Processor implementations to construct export
// records. The Descriptor, Labels, and Aggregator represent
// aggregate metric events received over a single collection period.
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record {
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
)

type benchFixture struct {
Expand Down
32 changes: 16 additions & 16 deletions sdk/metric/controller/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -32,11 +32,11 @@ import (
const DefaultCachePeriod time.Duration = 10 * time.Second

// Controller manages access to a *sdk.Accumulator and
// *basic.Integrator. Use Provider() for obtaining Meters. Use
// *basic.Processor. Use Provider() for obtaining Meters. Use
// Foreach() for accessing current records.
type Controller struct {
accumulator *sdk.Accumulator
integrator *integrator.Integrator
processor *processor.Processor
provider *registry.Provider
period time.Duration
lastCollect time.Time
Expand All @@ -53,25 +53,25 @@ func New(aselector export.AggregatorSelector, eselector export.ExportKindSelecto
for _, opt := range options {
opt.Apply(config)
}
integrator := integrator.New(aselector, eselector)
processor := processor.New(aselector, eselector)
accum := sdk.NewAccumulator(
integrator,
processor,
sdk.WithResource(config.Resource),
)
return &Controller{
accumulator: accum,
integrator: integrator,
processor: processor,
provider: registry.NewProvider(accum),
period: config.CachePeriod,
checkpoint: integrator.CheckpointSet(),
checkpoint: processor.CheckpointSet(),
clock: controllerTime.RealClock{},
}
}

// SetClock sets the clock used for caching. For testing purposes.
func (c *Controller) SetClock(clock controllerTime.Clock) {
c.integrator.Lock()
defer c.integrator.Unlock()
c.processor.Lock()
defer c.processor.Unlock()
c.clock = clock
}

Expand All @@ -84,17 +84,17 @@ func (c *Controller) Provider() metric.Provider {
// Foreach gives the caller read-locked access to the current
// export.CheckpointSet.
func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error {
c.integrator.RLock()
defer c.integrator.RUnlock()
c.processor.RLock()
defer c.processor.RUnlock()

return c.checkpoint.ForEach(ks, f)
}

// Collect requests a collection. The collection will be skipped if
// the last collection is aged less than the CachePeriod.
func (c *Controller) Collect(ctx context.Context) error {
c.integrator.Lock()
defer c.integrator.Unlock()
c.processor.Lock()
defer c.processor.Unlock()

if c.period > 0 {
now := c.clock.Now()
Expand All @@ -106,9 +106,9 @@ func (c *Controller) Collect(ctx context.Context) error {
c.lastCollect = now
}

c.integrator.StartCollection()
c.processor.StartCollection()
c.accumulator.Collect(ctx)
err := c.integrator.FinishCollection()
c.checkpoint = c.integrator.CheckpointSet()
err := c.processor.FinishCollection()
c.checkpoint = c.processor.CheckpointSet()
return err
}
2 changes: 1 addition & 1 deletion sdk/metric/controller/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

Expand Down
20 changes: 10 additions & 10 deletions sdk/metric/controller/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
"go.opentelemetry.io/otel/sdk/metric/integrator/basic"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
)

// DefaultPushPeriod is the default time interval between pushes.
Expand All @@ -36,7 +36,7 @@ type Controller struct {
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.Provider
integrator *basic.Integrator
processor *basic.Processor
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
Expand All @@ -60,15 +60,15 @@ func New(selector export.AggregatorSelector, exporter export.Exporter, opts ...O
c.Timeout = c.Period
}

integrator := basic.New(selector, exporter)
processor := basic.New(selector, exporter)
impl := sdk.NewAccumulator(
integrator,
processor,
sdk.WithResource(c.Resource),
)
return &Controller{
provider: registry.NewProvider(impl),
accumulator: impl,
integrator: integrator,
processor: processor,
exporter: exporter,
ch: make(chan struct{}),
period: c.Period,
Expand Down Expand Up @@ -139,16 +139,16 @@ func (c *Controller) tick() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

c.integrator.Lock()
defer c.integrator.Unlock()
c.processor.Lock()
defer c.processor.Unlock()

c.integrator.StartCollection()
c.processor.StartCollection()
c.accumulator.Collect(ctx)
if err := c.integrator.FinishCollection(); err != nil {
if err := c.processor.FinishCollection(); err != nil {
global.Handle(err)
}

if err := c.exporter.Export(ctx, c.integrator.CheckpointSet()); err != nil {
if err := c.exporter.Export(ctx, c.processor.CheckpointSet()); err != nil {
global.Handle(err)
}
}
6 changes: 3 additions & 3 deletions sdk/metric/controller/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
integratorTest "go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/test"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -125,7 +125,7 @@ func (e *testExporter) resetRecords() ([]export.Record, int) {

func TestPushDoubleStop(t *testing.T) {
fix := newFixture(t)
p := push.New(integratorTest.AggregatorSelector(), fix.exporter)
p := push.New(processorTest.AggregatorSelector(), fix.exporter)
p.Start()
p.Stop()
p.Stop()
Expand Down
Loading