diff --git a/CHANGELOG.md b/CHANGELOG.md index f98e86d234ee..4823123c5714 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - `prometheusremotewriteexporter`: Translate resource attributes to the target info metric (#8493) - `podmanreceiver`: Add API timeout configuration option (#9014) - `cmd/mdatagen`: Add `sem_conv_version` field to metadata.yaml that is used to set metrics SchemaURL (#9010) +- `splunkheceporter`: Add an option to disable log or profiling data (#9065) ### 🛑 Breaking changes 🛑 diff --git a/exporter/splunkhecexporter/README.md b/exporter/splunkhecexporter/README.md index 76208b743e38..1a5fdd13bc3b 100644 --- a/exporter/splunkhecexporter/README.md +++ b/exporter/splunkhecexporter/README.md @@ -28,7 +28,11 @@ The following configuration options can also be configured: - `max_content_length_logs` (default: 2097152): Maximum log data size in bytes per HTTP post limited to 2097152 bytes (2 MiB). - `max_content_length_metrics` (default: 2097152): Maximum metric data size in bytes per HTTP post limited to 2097152 bytes (2 MiB). - `splunk_app_name` (default: "OpenTelemetry Collector Contrib") App name is used to track telemetry information for Splunk App's using HEC by App name. -- `splunk_app_version` (default: Current OpenTelemetry Collector Contrib Build Version): App version is used to track telemetry information for Splunk App's using HEC by App version. +- `splunk_app_version` (default: Current OpenTelemetry Collector Contrib Build Version): App version is used to track telemetry information for Splunk App's using HEC by App version. +- `log_data_enabled` (default: true): Specifies whether the log data is exported. Set it to `false` if you want the log + data to be dropped instead. Applicable in the `logs` pipeline only. +- `profiling_data_enabled` (default: true): Specifies whether the profiling data is exported. Set it to `false` if + you want the profiling data to be dropped instead. Applicable in the `logs` pipeline only. - `hec_metadata_to_otel_attrs/source` (default = 'com.splunk.source'): Specifies the mapping of a specific unified model attribute value to the standard source field of a HEC event. - `hec_metadata_to_otel_attrs/sourcetype` (default = 'com.splunk.sourcetype'): Specifies the mapping of a specific unified model attribute value to the standard sourcetype field of a HEC event. - `hec_metadata_to_otel_attrs/index` (default = 'com.splunk.index'): Specifies the mapping of a specific unified model attribute value to the standard index field of a HEC event. diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 8d5a1886aff6..516abcdc79fb 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -258,6 +258,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f var permanentErrors []error var rls = ld.ResourceLogs() + var droppedProfilingDataRecords, droppedLogRecords int for i := 0; i < rls.Len(); i++ { ills := rls.At(i).ScopeLogs() for j := 0; j < ills.Len(); j++ { @@ -265,25 +266,40 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f var newPermanentErrors []error if isProfilingData(ills.At(j)) { + if !c.config.ProfilingDataEnabled { + droppedProfilingDataRecords += ills.At(j).LogRecords().Len() + continue + } profilingBufState.resource, profilingBufState.library = i, j newPermanentErrors, err = c.pushLogRecords(ctx, rls, &profilingBufState, profilingHeaders, send) } else { + if !c.config.LogDataEnabled { + droppedLogRecords += ills.At(j).LogRecords().Len() + continue + } bufState.resource, bufState.library = i, j newPermanentErrors, err = c.pushLogRecords(ctx, rls, &bufState, nil, send) } if err != nil { - return consumererror.NewLogs(err, *subLogs(&ld, bufState.bufFront, profilingBufState.bufFront)) + return consumererror.NewLogs(err, *c.subLogs(&ld, bufState.bufFront, profilingBufState.bufFront)) } permanentErrors = append(permanentErrors, newPermanentErrors...) } } + if droppedProfilingDataRecords != 0 { + c.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", droppedProfilingDataRecords)) + } + if droppedLogRecords != 0 { + c.logger.Debug("Log data is not allowed", zap.Int("dropped_records", droppedLogRecords)) + } + // There's some leftover unsent non-profiling data if bufState.buf.Len() > 0 { if err := send(ctx, bufState.buf, nil); err != nil { - return consumererror.NewLogs(err, *subLogs(&ld, bufState.bufFront, profilingBufState.bufFront)) + return consumererror.NewLogs(err, *c.subLogs(&ld, bufState.bufFront, profilingBufState.bufFront)) } } @@ -291,7 +307,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f if profilingBufState.buf.Len() > 0 { if err := send(ctx, profilingBufState.buf, profilingHeaders); err != nil { // Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above. - return consumererror.NewLogs(err, *subLogs(&ld, nil, profilingBufState.bufFront)) + return consumererror.NewLogs(err, *c.subLogs(&ld, nil, profilingBufState.bufFront)) } } @@ -600,14 +616,18 @@ func (c *client) postEvents(ctx context.Context, events io.Reader, headers map[s // subLogs returns a subset of `ld` starting from `profilingBufFront` for profiling data // plus starting from `bufFront` for non-profiling data. Both can be nil, in which case they are ignored -func subLogs(ld *pdata.Logs, bufFront *index, profilingBufFront *index) *pdata.Logs { +func (c *client) subLogs(ld *pdata.Logs, bufFront *index, profilingBufFront *index) *pdata.Logs { if ld == nil { return ld } subset := pdata.NewLogs() - subLogsByType(ld, bufFront, &subset, false) - subLogsByType(ld, profilingBufFront, &subset, true) + if c.config.LogDataEnabled { + subLogsByType(ld, bufFront, &subset, false) + } + if c.config.ProfilingDataEnabled { + subLogsByType(ld, profilingBufFront, &subset, true) + } return &subset } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 48fb508ad91b..972ca264dba1 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -162,9 +162,14 @@ func createLogDataWithCustomLibraries(numResources int, libraries []string, numR return logs } +type receivedRequest struct { + body []byte + headers http.Header +} + type CapturingData struct { testing *testing.T - receivedRequest chan []byte + receivedRequest chan receivedRequest statusCode int checkCompression bool } @@ -182,12 +187,12 @@ func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) { panic(err) } go func() { - c.receivedRequest <- body + c.receivedRequest <- receivedRequest{body, r.Header} }() w.WriteHeader(c.statusCode) } -func runMetricsExport(cfg *Config, metrics pdata.Metrics, t *testing.T) ([][]byte, error) { +func runMetricsExport(cfg *Config, metrics pdata.Metrics, t *testing.T) ([]receivedRequest, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -197,8 +202,8 @@ func runMetricsExport(cfg *Config, metrics pdata.Metrics, t *testing.T) ([][]byt cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" - receivedRequest := make(chan []byte) - capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression} + rr := make(chan receivedRequest) + capture := CapturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression} s := &http.Server{ Handler: &capture, } @@ -214,10 +219,10 @@ func runMetricsExport(cfg *Config, metrics pdata.Metrics, t *testing.T) ([][]byt err = exporter.ConsumeMetrics(context.Background(), metrics) assert.NoError(t, err) - var requests [][]byte + var requests []receivedRequest for { select { - case request := <-receivedRequest: + case request := <-rr: requests = append(requests, request) case <-time.After(1 * time.Second): if len(requests) == 0 { @@ -228,7 +233,7 @@ func runMetricsExport(cfg *Config, metrics pdata.Metrics, t *testing.T) ([][]byt } } -func runTraceExport(testConfig *Config, traces pdata.Traces, t *testing.T) ([][]byte, error) { +func runTraceExport(testConfig *Config, traces pdata.Traces, t *testing.T) ([]receivedRequest, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -241,8 +246,8 @@ func runTraceExport(testConfig *Config, traces pdata.Traces, t *testing.T) ([][] cfg.MaxContentLengthTraces = testConfig.MaxContentLengthTraces cfg.Token = "1234-1234" - receivedRequest := make(chan []byte) - capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression} + rr := make(chan receivedRequest) + capture := CapturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression} s := &http.Server{ Handler: &capture, } @@ -258,10 +263,10 @@ func runTraceExport(testConfig *Config, traces pdata.Traces, t *testing.T) ([][] err = exporter.ConsumeTraces(context.Background(), traces) assert.NoError(t, err) - var requests [][]byte + var requests []receivedRequest for { select { - case request := <-receivedRequest: + case request := <-rr: requests = append(requests, request) case <-time.After(1 * time.Second): if len(requests) == 0 { @@ -272,7 +277,7 @@ func runTraceExport(testConfig *Config, traces pdata.Traces, t *testing.T) ([][] } } -func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([][]byte, error) { +func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]receivedRequest, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -281,8 +286,8 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([][]byte, error) { cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" - receivedRequest := make(chan []byte) - capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression} + rr := make(chan receivedRequest) + capture := CapturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression} s := &http.Server{ Handler: &capture, } @@ -299,10 +304,10 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([][]byte, error) { err = exporter.ConsumeLogs(context.Background(), ld) assert.NoError(t, err) - var requests [][]byte + var requests []receivedRequest for { select { - case request := <-receivedRequest: + case request := <-rr: requests = append(requests, request) case <-time.After(1 * time.Second): if len(requests) == 0 { @@ -425,10 +430,10 @@ func TestReceiveTracesBatches(t *testing.T) { for i := 0; i < test.want.numBatches; i++ { require.NotZero(t, got[i]) if test.want.compressed { - validateCompressedContains(t, test.want.batches[i], got[i]) + validateCompressedContains(t, test.want.batches[i], got[i].body) } else { for _, expected := range test.want.batches[i] { - assert.Contains(t, string(got[i]), expected) + assert.Contains(t, string(got[i].body), expected) } } } @@ -548,10 +553,10 @@ func TestReceiveLogs(t *testing.T) { for i := 0; i < test.want.numBatches; i++ { require.NotZero(t, got[i]) if test.want.compressed { - validateCompressedContains(t, test.want.batches[i], got[i]) + validateCompressedContains(t, test.want.batches[i], got[i].body) } else { for _, expected := range test.want.batches[i] { - assert.Contains(t, string(got[i]), expected) + assert.Contains(t, string(got[i].body), expected) } } } @@ -566,7 +571,7 @@ func TestReceiveMetrics(t *testing.T) { actual, err := runMetricsExport(cfg, md, t) assert.Len(t, actual, 1) assert.NoError(t, err) - msg := string(actual[0]) + msg := string(actual[0].body) assert.Contains(t, msg, "\"event\":\"metric\"") assert.Contains(t, msg, "\"time\":1.001") assert.Contains(t, msg, "\"time\":2.002") @@ -681,10 +686,10 @@ func TestReceiveBatchedMetrics(t *testing.T) { for i := 0; i < test.want.numBatches; i++ { require.NotZero(t, got[i]) if test.want.compressed { - validateCompressedContains(t, test.want.batches[i], got[i]) + validateCompressedContains(t, test.want.batches[i], got[i].body) } else { for _, expected := range test.want.batches[i] { - assert.Contains(t, string(got[i]), expected) + assert.Contains(t, string(got[i].body), expected) } } } @@ -700,8 +705,8 @@ func TestReceiveMetricsWithCompression(t *testing.T) { } func TestErrorReceived(t *testing.T) { - receivedRequest := make(chan []byte) - capture := CapturingData{receivedRequest: receivedRequest, statusCode: 500} + rr := make(chan receivedRequest) + capture := CapturingData{receivedRequest: rr, statusCode: 500} listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -734,7 +739,7 @@ func TestErrorReceived(t *testing.T) { err = exporter.ConsumeTraces(context.Background(), td) select { - case <-receivedRequest: + case <-rr: case <-time.After(5 * time.Second): t.Fatal("Should have received request") } @@ -907,7 +912,7 @@ func Test_pushLogData_InvalidLog(t *testing.T) { zippers: sync.Pool{New: func() interface{} { return gzip.NewWriter(nil) }}, - config: &Config{}, + config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), } @@ -918,7 +923,7 @@ func Test_pushLogData_InvalidLog(t *testing.T) { err := c.pushLogData(context.Background(), logs) - assert.Contains(t, err.Error(), "Permanent error: dropped log event: &{ unknown +Inf map[]}, error: splunk.Event.Event: unsupported value: +Inf") + assert.Error(t, err, "Permanent error: dropped log event: &{ unknown +Inf map[]}, error: splunk.Event.Event: unsupported value: +Inf") } func Test_pushLogData_PostError(t *testing.T) { @@ -1140,13 +1145,67 @@ func Test_pushLogData_Small_MaxContentLength(t *testing.T) { } } +func TestAllowedLogDataTypes(t *testing.T) { + tests := []struct { + name string + allowProfilingData bool + allowLogData bool + wantProfilingRecords int + wantLogRecords int + }{ + { + name: "both_allowed", + allowProfilingData: true, + allowLogData: true, + }, + { + name: "logs_allowed", + allowProfilingData: false, + allowLogData: true, + }, + { + name: "profiling_allowed", + allowProfilingData: true, + allowLogData: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logs := createLogDataWithCustomLibraries(1, []string{"otel.logs", "otel.profiling"}, []int{1, 1}) + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.LogDataEnabled = test.allowLogData + cfg.ProfilingDataEnabled = test.allowProfilingData + + requests, err := runLogExport(cfg, logs, t) + assert.NoError(t, err) + + seenLogs := false + seenProfiling := false + for _, r := range requests { + if r.headers.Get(libraryHeaderName) == profilingLibraryName { + seenProfiling = true + } else { + seenLogs = true + } + } + assert.Equal(t, test.allowLogData, seenLogs) + assert.Equal(t, test.allowProfilingData, seenProfiling) + }) + } +} + func TestSubLogs(t *testing.T) { // Creating 12 logs (2 resources x 2 libraries x 3 records) logs := createLogData(2, 2, 3) + c := client{ + config: NewFactory().CreateDefaultConfig().(*Config), + } + // Logs subset from leftmost index (resource 0, library 0, record 0). _0_0_0 := &index{resource: 0, library: 0, record: 0} //revive:disable-line:var-naming - got := subLogs(&logs, _0_0_0, nil) + got := c.subLogs(&logs, _0_0_0, nil) // Number of logs in subset should equal original logs. assert.Equal(t, logs.LogRecordCount(), got.LogRecordCount()) @@ -1160,7 +1219,7 @@ func TestSubLogs(t *testing.T) { // Logs subset from some mid index (resource 0, library 1, log 2). _0_1_2 := &index{resource: 0, library: 1, record: 2} //revive:disable-line:var-naming - got = subLogs(&logs, _0_1_2, nil) + got = c.subLogs(&logs, _0_1_2, nil) assert.Equal(t, 7, got.LogRecordCount()) @@ -1173,7 +1232,7 @@ func TestSubLogs(t *testing.T) { // Logs subset from rightmost index (resource 1, library 1, log 2). _1_1_2 := &index{resource: 1, library: 1, record: 2} //revive:disable-line:var-naming - got = subLogs(&logs, _1_1_2, nil) + got = c.subLogs(&logs, _1_1_2, nil) // Number of logs in subset should be 1. assert.Equal(t, 1, got.LogRecordCount()) @@ -1187,7 +1246,7 @@ func TestSubLogs(t *testing.T) { slice := &index{resource: 1, library: 0, record: 5} profSlice := &index{resource: 0, library: 1, record: 8} - got = subLogs(&logs, slice, profSlice) + got = c.subLogs(&logs, slice, profSlice) assert.Equal(t, 5+2+10, got.LogRecordCount()) assert.Equal(t, "otel.logs", got.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Name()) diff --git a/exporter/splunkhecexporter/config.go b/exporter/splunkhecexporter/config.go index df8be937ebfb..710e9f1f720f 100644 --- a/exporter/splunkhecexporter/config.go +++ b/exporter/splunkhecexporter/config.go @@ -52,6 +52,12 @@ type Config struct { exporterhelper.QueueSettings `mapstructure:"sending_queue"` exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` + // LogDataEnabled can be used to disable sending logs by the exporter. + LogDataEnabled bool `mapstructure:"log_data_enabled"` + + // ProfilingDataEnabled can be used to disable sending profiling data by the exporter. + ProfilingDataEnabled bool `mapstructure:"profiling_data_enabled"` + // HEC Token is the authentication token provided by Splunk: https://docs.splunk.com/Documentation/Splunk/latest/Data/UsetheHTTPEventCollector. Token string `mapstructure:"token"` @@ -155,5 +161,8 @@ func (cfg *Config) Validate() error { if err := cfg.QueueSettings.Validate(); err != nil { return fmt.Errorf("sending_queue settings has invalid configuration: %w", err) } + if !cfg.LogDataEnabled && !cfg.ProfilingDataEnabled { + return errors.New(`either "log_data_enabled" or "profiling_data_enabled" has to be true`) + } return nil } diff --git a/exporter/splunkhecexporter/config_test.go b/exporter/splunkhecexporter/config_test.go index e9aee9d8d6ee..9d1b3e34129b 100644 --- a/exporter/splunkhecexporter/config_test.go +++ b/exporter/splunkhecexporter/config_test.go @@ -61,6 +61,8 @@ func TestLoadConfig(t *testing.T) { Index: "metrics", SplunkAppName: "OpenTelemetry-Collector Splunk Exporter", SplunkAppVersion: "v0.0.1", + LogDataEnabled: true, + ProfilingDataEnabled: true, MaxConnections: 100, MaxContentLengthLogs: 2 * 1024 * 1024, MaxContentLengthMetrics: 2 * 1024 * 1024, diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index 3682406d8f4c..f03bab0e6ec3 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -60,7 +60,9 @@ func NewFactory() component.ExporterFactory { func createDefaultConfig() config.Exporter { return &Config{ - ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + LogDataEnabled: true, + ProfilingDataEnabled: true, + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), TimeoutSettings: exporterhelper.TimeoutSettings{ Timeout: defaultHTTPTimeout, }, diff --git a/exporter/splunkhecexporter/testdata/config.yaml b/exporter/splunkhecexporter/testdata/config.yaml index 7346e86bdca5..491fd7e2faca 100644 --- a/exporter/splunkhecexporter/testdata/config.yaml +++ b/exporter/splunkhecexporter/testdata/config.yaml @@ -14,6 +14,8 @@ exporters: source: "otel" sourcetype: "otel" index: "metrics" + log_data_enabled: true + profiling_data_enabled: true tls: insecure_skip_verify: false ca_file: ""