Skip to content

Commit

Permalink
Invert loop
Browse files Browse the repository at this point in the history
Small refactor to more easily allow us to provide an `update_resource` action that can chain with other transforms.

This passes the existing tests but I need to add some more coverage before I can be confident that this is a non-breaking change.

One caveat with this is that I'm buffering the whole slice of ResourceMetrics up front. Not sure of the performance implications.
  • Loading branch information
jefferbrecht committed May 5, 2022
1 parent f4831d9 commit 51d51a9
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions processor/metricstransformprocessor/metrics_transform_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,27 @@ func (mtp *metricsTransformProcessor) processMetrics(_ context.Context, md pmetr

out := pmetric.NewMetrics()

nodeSlice := make([]*commonpb.Node, rms.Len())
resourceSlice := make([]*resourcepb.Resource, rms.Len())
metricsSlice := make([]*[]*metricspb.Metric, rms.Len())

for i := 0; i < rms.Len(); i++ {
node, resource, metrics := internaldata.ResourceMetricsToOC(rms.At(i))
n, r, m := internaldata.ResourceMetricsToOC(rms.At(i))
nodeSlice[i] = n
resourceSlice[i] = r
metricsSlice[i] = &m
}

nameToMetricMapping := newMetricNameMapping(metrics)
for _, transform := range mtp.transforms {
for _, transform := range mtp.transforms {
for i := 0; i < rms.Len(); i++ {
node, resource, metrics := nodeSlice[i], resourceSlice[i], metricsSlice[i]
nameToMetricMapping := newMetricNameMapping(*metrics)
matchedMetrics := transform.MetricIncludeFilter.getMatches(nameToMetricMapping)

if transform.Action == Group && len(matchedMetrics) > 0 {
nData := mtp.groupMatchedMetrics(node, resource, matchedMetrics, transform)
groupedMds = append(groupedMds, nData)
metrics = mtp.removeMatchedMetrics(metrics, matchedMetrics)
*metrics = mtp.removeMatchedMetrics(*metrics, matchedMetrics)
}

if transform.Action == Combine && len(matchedMetrics) > 0 {
Expand All @@ -237,25 +247,19 @@ func (mtp *metricsTransformProcessor) processMetrics(_ context.Context, md pmetr
}

combined := mtp.combine(matchedMetrics, transform)
metrics = mtp.removeMatchedMetricsAndAppendCombined(metrics, matchedMetrics, combined)
*metrics = mtp.removeMatchedMetricsAndAppendCombined(*metrics, matchedMetrics, combined)

// set matchedMetrics to the combined metric so that any additional operations are performed on
// the combined metric
matchedMetrics = []*match{{metric: combined}}
}

// TODO: process UpdateResource action
// Invert loops: loop over transforms, then rms
// Each transform updates rms in-place within the function so
// the next transform uses up-to-date values
// Keep semantics of current group action: keep same strategy of saving group results to groupedMDs, apply at very end

for _, match := range matchedMetrics {
metricName := match.metric.MetricDescriptor.Name

if transform.Action == Insert {
match.metric = proto.Clone(match.metric).(*metricspb.Metric)
metrics = append(metrics, match.metric)
*metrics = append(*metrics, match.metric)
}

mtp.update(match, transform)
Expand All @@ -268,8 +272,10 @@ func (mtp *metricsTransformProcessor) processMetrics(_ context.Context, md pmetr
}
}
}
}

internaldata.OCToMetrics(node, resource, metrics).ResourceMetrics().MoveAndAppendTo(out.ResourceMetrics())
for i := 0; i < rms.Len(); i++ {
internaldata.OCToMetrics(nodeSlice[i], resourceSlice[i], *metricsSlice[i]).ResourceMetrics().MoveAndAppendTo(out.ResourceMetrics())
}

for i := range groupedMds {
Expand Down

0 comments on commit 51d51a9

Please sign in to comment.