diff --git a/packages/mysql/data_stream/slowlog/elasticsearch/ingest_pipeline/default.json b/packages/mysql/data_stream/slowlog/elasticsearch/ingest_pipeline/default.json index 204f7d6d5092..b272b9d9fb47 100644 --- a/packages/mysql/data_stream/slowlog/elasticsearch/ingest_pipeline/default.json +++ b/packages/mysql/data_stream/slowlog/elasticsearch/ingest_pipeline/default.json @@ -32,6 +32,30 @@ "ignore_missing": true } }, + { + "script": { + "lang": "painless", + "if": "ctx?.mysql != null", + "source": "for (field in params.fields) { if (ctx.mysql[field] instanceof List) { def vals = ctx.mysql[field]; vals = vals.stream().distinct().sorted().collect(Collectors.toList()); if (vals.size() == 1) { ctx.mysql[field] = vals[0] } else { ctx.mysql[field] = vals } } }", + "params": { + "fields": [ + "thread_id" + ] + } + } + }, + { + "script": { + "lang": "painless", + "if": "ctx?.mysql?.slowlog != null", + "source": "for (field in params.fields) { if (ctx.mysql.slowlog[field] instanceof List) { def vals = ctx.mysql.slowlog[field]; vals = vals.stream().distinct().sorted().collect(Collectors.toList()); if (vals.size() == 1) { ctx.mysql.slowlog[field] = vals[0] } else { ctx.mysql.slowlog[field] = vals } } }", + "params": { + "fields": [ + "schema" + ] + } + } + }, { "remove": { "field": "message"