Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/splunkhec] Add an option to disable log or profiling data #9065

Merged
merged 1 commit into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- `prometheusremotewriteexporter`: Translate resource attributes to the target info metric (#8493)
- `podmanreceiver`: Add API timeout configuration option (#9014)
- `cmd/mdatagen`: Add `sem_conv_version` field to metadata.yaml that is used to set metrics SchemaURL (#9010)
- `splunkheceporter`: Add an option to disable log or profiling data (#9065)

### 🛑 Breaking changes 🛑

Expand Down
6 changes: 5 additions & 1 deletion exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ The following configuration options can also be configured:
- `max_content_length_logs` (default: 2097152): Maximum log data size in bytes per HTTP post limited to 2097152 bytes (2 MiB).
- `max_content_length_metrics` (default: 2097152): Maximum metric data size in bytes per HTTP post limited to 2097152 bytes (2 MiB).
- `splunk_app_name` (default: "OpenTelemetry Collector Contrib") App name is used to track telemetry information for Splunk App's using HEC by App name.
- `splunk_app_version` (default: Current OpenTelemetry Collector Contrib Build Version): App version is used to track telemetry information for Splunk App's using HEC by App version.
- `splunk_app_version` (default: Current OpenTelemetry Collector Contrib Build Version): App version is used to track telemetry information for Splunk App's using HEC by App version.
- `log_data_enabled` (default: true): Specifies whether the log data is exported. Set it to `false` if you want the log
data to be dropped instead. Applicable in the `logs` pipeline only.
- `profiling_data_enabled` (default: true): Specifies whether the profiling data is exported. Set it to `false` if
you want the profiling data to be dropped instead. Applicable in the `logs` pipeline only.
- `hec_metadata_to_otel_attrs/source` (default = 'com.splunk.source'): Specifies the mapping of a specific unified model attribute value to the standard source field of a HEC event.
- `hec_metadata_to_otel_attrs/sourcetype` (default = 'com.splunk.sourcetype'): Specifies the mapping of a specific unified model attribute value to the standard sourcetype field of a HEC event.
- `hec_metadata_to_otel_attrs/index` (default = 'com.splunk.index'): Specifies the mapping of a specific unified model attribute value to the standard index field of a HEC event.
Expand Down
32 changes: 26 additions & 6 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,40 +258,56 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
var permanentErrors []error

var rls = ld.ResourceLogs()
var droppedProfilingDataRecords, droppedLogRecords int
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).ScopeLogs()
for j := 0; j < ills.Len(); j++ {
var err error
var newPermanentErrors []error

if isProfilingData(ills.At(j)) {
if !c.config.ProfilingDataEnabled {
droppedProfilingDataRecords += ills.At(j).LogRecords().Len()
continue
}
profilingBufState.resource, profilingBufState.library = i, j
newPermanentErrors, err = c.pushLogRecords(ctx, rls, &profilingBufState, profilingHeaders, send)
} else {
if !c.config.LogDataEnabled {
droppedLogRecords += ills.At(j).LogRecords().Len()
continue
}
bufState.resource, bufState.library = i, j
newPermanentErrors, err = c.pushLogRecords(ctx, rls, &bufState, nil, send)
}

if err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufState.bufFront, profilingBufState.bufFront))
return consumererror.NewLogs(err, *c.subLogs(&ld, bufState.bufFront, profilingBufState.bufFront))
}

permanentErrors = append(permanentErrors, newPermanentErrors...)
}
}

if droppedProfilingDataRecords != 0 {
c.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", droppedProfilingDataRecords))
}
if droppedLogRecords != 0 {
c.logger.Debug("Log data is not allowed", zap.Int("dropped_records", droppedLogRecords))
}

// There's some leftover unsent non-profiling data
if bufState.buf.Len() > 0 {
if err := send(ctx, bufState.buf, nil); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufState.bufFront, profilingBufState.bufFront))
return consumererror.NewLogs(err, *c.subLogs(&ld, bufState.bufFront, profilingBufState.bufFront))
}
}

// There's some leftover unsent profiling data
if profilingBufState.buf.Len() > 0 {
if err := send(ctx, profilingBufState.buf, profilingHeaders); err != nil {
// Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above.
return consumererror.NewLogs(err, *subLogs(&ld, nil, profilingBufState.bufFront))
return consumererror.NewLogs(err, *c.subLogs(&ld, nil, profilingBufState.bufFront))
}
}

Expand Down Expand Up @@ -600,14 +616,18 @@ func (c *client) postEvents(ctx context.Context, events io.Reader, headers map[s

// subLogs returns a subset of `ld` starting from `profilingBufFront` for profiling data
// plus starting from `bufFront` for non-profiling data. Both can be nil, in which case they are ignored
func subLogs(ld *pdata.Logs, bufFront *index, profilingBufFront *index) *pdata.Logs {
func (c *client) subLogs(ld *pdata.Logs, bufFront *index, profilingBufFront *index) *pdata.Logs {
if ld == nil {
return ld
}

subset := pdata.NewLogs()
subLogsByType(ld, bufFront, &subset, false)
subLogsByType(ld, profilingBufFront, &subset, true)
if c.config.LogDataEnabled {
subLogsByType(ld, bufFront, &subset, false)
}
if c.config.ProfilingDataEnabled {
subLogsByType(ld, profilingBufFront, &subset, true)
}

return &subset
}
Expand Down
Loading