diff --git a/processor/metricstransformprocessor/metrics_transform_processor.go b/processor/metricstransformprocessor/metrics_transform_processor.go index adba428642d9..368efd35da09 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor.go +++ b/processor/metricstransformprocessor/metrics_transform_processor.go @@ -216,17 +216,27 @@ func (mtp *metricsTransformProcessor) processMetrics(_ context.Context, md pmetr out := pmetric.NewMetrics() + nodeSlice := make([]*commonpb.Node, rms.Len()) + resourceSlice := make([]*resourcepb.Resource, rms.Len()) + metricsSlice := make([]*[]*metricspb.Metric, rms.Len()) + for i := 0; i < rms.Len(); i++ { - node, resource, metrics := internaldata.ResourceMetricsToOC(rms.At(i)) + n, r, m := internaldata.ResourceMetricsToOC(rms.At(i)) + nodeSlice[i] = n + resourceSlice[i] = r + metricsSlice[i] = &m + } - nameToMetricMapping := newMetricNameMapping(metrics) - for _, transform := range mtp.transforms { + for _, transform := range mtp.transforms { + for i := 0; i < rms.Len(); i++ { + node, resource, metrics := nodeSlice[i], resourceSlice[i], metricsSlice[i] + nameToMetricMapping := newMetricNameMapping(*metrics) matchedMetrics := transform.MetricIncludeFilter.getMatches(nameToMetricMapping) if transform.Action == Group && len(matchedMetrics) > 0 { nData := mtp.groupMatchedMetrics(node, resource, matchedMetrics, transform) groupedMds = append(groupedMds, nData) - metrics = mtp.removeMatchedMetrics(metrics, matchedMetrics) + *metrics = mtp.removeMatchedMetrics(*metrics, matchedMetrics) } if transform.Action == Combine && len(matchedMetrics) > 0 { @@ -237,25 +247,19 @@ func (mtp *metricsTransformProcessor) processMetrics(_ context.Context, md pmetr } combined := mtp.combine(matchedMetrics, transform) - metrics = mtp.removeMatchedMetricsAndAppendCombined(metrics, matchedMetrics, combined) + *metrics = mtp.removeMatchedMetricsAndAppendCombined(*metrics, matchedMetrics, combined) // set matchedMetrics to the combined metric so that any additional operations are performed on // the combined metric matchedMetrics = []*match{{metric: combined}} } - // TODO: process UpdateResource action - // Invert loops: loop over transforms, then rms - // Each transform updates rms in-place within the function so - // the next transform uses up-to-date values - // Keep semantics of current group action: keep same strategy of saving group results to groupedMDs, apply at very end - for _, match := range matchedMetrics { metricName := match.metric.MetricDescriptor.Name if transform.Action == Insert { match.metric = proto.Clone(match.metric).(*metricspb.Metric) - metrics = append(metrics, match.metric) + *metrics = append(*metrics, match.metric) } mtp.update(match, transform) @@ -268,8 +272,10 @@ func (mtp *metricsTransformProcessor) processMetrics(_ context.Context, md pmetr } } } + } - internaldata.OCToMetrics(node, resource, metrics).ResourceMetrics().MoveAndAppendTo(out.ResourceMetrics()) + for i := 0; i < rms.Len(); i++ { + internaldata.OCToMetrics(nodeSlice[i], resourceSlice[i], *metricsSlice[i]).ResourceMetrics().MoveAndAppendTo(out.ResourceMetrics()) } for i := range groupedMds {