From 473f29cd0113f1e9386258c4251ff5ecbec30d99 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Tue, 21 May 2024 00:21:55 +0530 Subject: [PATCH 1/4] fix for cluster aggregation performance --- lib/cluster.js | 68 +++++++++++++++++++++++++++++----------- lib/metricAggregators.js | 19 ++++++++--- test/aggregatorsTest.js | 17 +++++----- test/clusterTest.js | 27 ++++++++++------ 4 files changed, 92 insertions(+), 39 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 5cb707ed..996aff1b 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -9,8 +9,11 @@ */ const Registry = require('./registry'); -const { Grouper } = require('./util'); +const { Grouper, hashObject } = require('./util'); const { aggregators } = require('./metricAggregators'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); // We need to lazy-load the 'cluster' module as some application servers - // namely Passenger - crash when it is imported. let cluster = () => { @@ -117,6 +120,7 @@ class AggregatorRegistry extends Registry { // Aggregate gathered metrics. metricsByName.forEach(metrics => { + metrics.workerSize = metricsArr.length; const aggregatorName = metrics[0].aggregator; const aggregatorFn = aggregators[aggregatorName]; if (typeof aggregatorFn !== 'function') { @@ -175,19 +179,29 @@ function addListeners() { request.done(new Error(message.error)); return; } - - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; - - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); - - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); - } + + fs.readFile(message.filename, 'utf8', (err, data) => { + if(err) { + request.done(err); + return; + } else { + const metrics = JSON.parse(data); + metrics.forEach(registry => request.responses.push(registry)); + fs.unlink(message.filename, (err) => { + if(err) console.error(`Error deleting file ${message.filename}:`, err) + }); + request.pending--; + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics(); + request.done(null, promString); + } + } + }); } }); } @@ -198,10 +212,28 @@ function addListeners() { if (message.type === GET_METRICS_REQ) { Promise.all(registries.map(r => r.getMetricsAsJSON())) .then(metrics => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, + metrics.forEach((registry, i) => { + registry.forEach(value => { + const hash = hashObject(value); + const key = `${value.metricName}_${hash}`; + value["hash"] = key; + }); + }); + const filename = path.join(os.tmpdir(), `metrics-${process.pid}}.json`); + fs.writeFile(filename, JSON.stringify(metrics), (err) => { + if(err) { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: err.message, + }); + } else { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + filename, + }); + } }); }) .catch(error => { diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index c010d467..198d1691 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -1,6 +1,6 @@ 'use strict'; -const { Grouper, hashObject } = require('./util'); +const metricMap = new Map(); /** * Returns a new function that applies the `aggregatorFn` to the values. @@ -18,11 +18,22 @@ function AggregatorFactory(aggregatorFn) { aggregator: metrics[0].aggregator, }; // Gather metrics by metricName and labels. - const byLabels = new Grouper(); + if(!metricMap.get(metrics[0].name)) { + metricMap.set(metrics[0].name, new Map()); + } + let byLabels = metricMap.get(metrics[0].name); metrics.forEach(metric => { metric.values.forEach(value => { - const key = hashObject(value.labels); - byLabels.add(`${value.metricName}_${key}`, value); + let valuesArray = byLabels.get(value.hash); + if(!valuesArray) { + byLabels.set(value.hash, [value]); + } else { + if(valuesArray.length < metrics.workerSize) { + valuesArray.push(value); + } else { + byLabels.set(value.hash, [value]); + } + } }); }); // Apply aggregator function to gathered metrics. diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 0010a656..67704db6 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -8,8 +8,8 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 1 }, - { labels: ['label1'], value: 2 }, + { labels: [], value: 1, hash: 'h1' }, + { labels: ['label1'], value: 2, hash: 'h2'}, ], }, { @@ -17,12 +17,14 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 3 }, - { labels: ['label1'], value: 4 }, + { labels: [], value: 3, hash: 'h1'}, + { labels: ['label1'], value: 4, hash: 'h2'}, ], }, ]; + metrics.workerSize = 2; + describe('sum', () => { it('properly sums values', () => { const result = aggregators.sum(metrics); @@ -102,21 +104,22 @@ describe('aggregators', () => { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 1, metricName: 'abc' }], + values: [{ labels: [], value: 1, metricName: 'abc', hash: 'h1' }], }, { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 3, metricName: 'abc' }], + values: [{ labels: [], value: 3, metricName: 'abc', hash: 'h1' }], }, { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 5, metricName: 'def' }], + values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2'}], }, ]; + metrics2.workerSize = 2; const result = aggregators.sum(metrics2); expect(result.values).toEqual([ { value: 4, labels: [], metricName: 'abc' }, diff --git a/test/clusterTest.js b/test/clusterTest.js index d1a58314..81217a38 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -3,6 +3,7 @@ const cluster = require('cluster'); const process = require('process'); const Registry = require('../lib/cluster'); +const { hash } = require('crypto'); describe.each([ ['Prometheus', Registry.PROMETHEUS_CONTENT_TYPE], @@ -61,11 +62,13 @@ describe.each([ labels: { le: 0.1, code: '300' }, value: 0, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="0.1",code="300"}', }, { labels: { le: 10, code: '300' }, value: 1.6486727018068046, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="10",code="300"}', }, ], aggregator: 'sum', @@ -75,9 +78,9 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.47, labels: { method: 'get', code: 200 } }, - { value: 0.64, labels: {} }, - { value: 23, labels: { method: 'post', code: '300' } }, + { value: 0.47, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { value: 0.64, labels: {}, hash: 'test_gauge{}' }, + { value: 23, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, ], aggregator: 'sum', }, @@ -85,14 +88,14 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075832, labels: {} }], + values: [{ value: 1502075832, labels: {}, hash: 'process_start_time_seconds{}' }], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.009, labels: {} }], + values: [{ value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], aggregator: 'average', }, { @@ -103,6 +106,7 @@ describe.each([ { value: 1, labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}', }, ], aggregator: 'first', @@ -118,11 +122,13 @@ describe.each([ labels: { le: 0.1, code: '300' }, value: 0.235151, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="0.1",code="300"}', }, { labels: { le: 10, code: '300' }, value: 1.192591, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="10",code="300"}', }, ], aggregator: 'sum', @@ -132,9 +138,9 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.02, labels: { method: 'get', code: 200 } }, - { value: 0.24, labels: {} }, - { value: 51, labels: { method: 'post', code: '300' } }, + { value: 0.02, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { value: 0.24, labels: {}, hash: 'test_gauge{}' }, + { value: 51, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, ], aggregator: 'sum', }, @@ -142,14 +148,14 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075849, labels: {} }], + values: [{ value: 1502075849, labels: {}, hash: 'process_start_time_seconds{}' }], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.008, labels: {} }], + values: [{ value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], aggregator: 'average', }, { @@ -160,6 +166,7 @@ describe.each([ { value: 1, labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}', }, ], aggregator: 'first', From cbe29f3c4229e253e82ac9163da1070791dae824 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Thu, 23 May 2024 14:44:24 +0530 Subject: [PATCH 2/4] added change log and handling concurrency --- CHANGELOG.md | 4 ++++ lib/cluster.js | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96fcb5cb..5b6f62b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Breaking ### Changed + - Changes for cluster mode + - Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics + - Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers + - Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) ### Added diff --git a/lib/cluster.js b/lib/cluster.js index 996aff1b..b0576ada 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -219,7 +219,8 @@ function addListeners() { value["hash"] = key; }); }); - const filename = path.join(os.tmpdir(), `metrics-${process.pid}}.json`); + // adding request id in file path to handle concurrency + const filename = path.join(os.tmpdir(), `metrics-${process.pid}-${message.requestId}.json`); fs.writeFile(filename, JSON.stringify(metrics), (err) => { if(err) { process.send({ From 78791149a76edb160ba73caf225dba8e9ebab5da Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Fri, 24 May 2024 16:33:09 +0530 Subject: [PATCH 3/4] linting and prettier fixes --- CHANGELOG.md | 9 ++++---- lib/cluster.js | 22 +++++++++--------- lib/metricAggregators.js | 10 ++++----- test/aggregatorsTest.js | 8 +++---- test/clusterTest.js | 48 +++++++++++++++++++++++++++++++++------- 5 files changed, 66 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b6f62b9..739b7032 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,11 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Breaking ### Changed - - Changes for cluster mode - - Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics - - Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers - - Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) + +- Changes for cluster mode +- Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics +- Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers +- Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) ### Added diff --git a/lib/cluster.js b/lib/cluster.js index b0576ada..68923e6d 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -179,23 +179,22 @@ function addListeners() { request.done(new Error(message.error)); return; } - fs.readFile(message.filename, 'utf8', (err, data) => { - if(err) { + if (err) { request.done(err); return; } else { const metrics = JSON.parse(data); metrics.forEach(registry => request.responses.push(registry)); - fs.unlink(message.filename, (err) => { - if(err) console.error(`Error deleting file ${message.filename}:`, err) + fs.unlink(message.filename, e => { + if (e) + console.error(`Error deleting file ${message.filename}:`, e); }); request.pending--; if (request.pending === 0) { // finalize requests.delete(message.requestId); clearTimeout(request.errorTimeout); - const registry = AggregatorRegistry.aggregate(request.responses); const promString = registry.metrics(); request.done(null, promString); @@ -212,17 +211,20 @@ function addListeners() { if (message.type === GET_METRICS_REQ) { Promise.all(registries.map(r => r.getMetricsAsJSON())) .then(metrics => { - metrics.forEach((registry, i) => { + metrics.forEach(registry => { registry.forEach(value => { const hash = hashObject(value); const key = `${value.metricName}_${hash}`; - value["hash"] = key; + value.hash = key; }); }); // adding request id in file path to handle concurrency - const filename = path.join(os.tmpdir(), `metrics-${process.pid}-${message.requestId}.json`); - fs.writeFile(filename, JSON.stringify(metrics), (err) => { - if(err) { + const filename = path.join( + os.tmpdir(), + `metrics-${process.pid}-${message.requestId}.json`, + ); + fs.writeFile(filename, JSON.stringify(metrics), err => { + if (err) { process.send({ type: GET_METRICS_RES, requestId: message.requestId, diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index 198d1691..73ec77c2 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -18,17 +18,17 @@ function AggregatorFactory(aggregatorFn) { aggregator: metrics[0].aggregator, }; // Gather metrics by metricName and labels. - if(!metricMap.get(metrics[0].name)) { + if (!metricMap.get(metrics[0].name)) { metricMap.set(metrics[0].name, new Map()); } - let byLabels = metricMap.get(metrics[0].name); + const byLabels = metricMap.get(metrics[0].name); metrics.forEach(metric => { metric.values.forEach(value => { - let valuesArray = byLabels.get(value.hash); - if(!valuesArray) { + const valuesArray = byLabels.get(value.hash); + if (!valuesArray) { byLabels.set(value.hash, [value]); } else { - if(valuesArray.length < metrics.workerSize) { + if (valuesArray.length < metrics.workerSize) { valuesArray.push(value); } else { byLabels.set(value.hash, [value]); diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 67704db6..85677af0 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -9,7 +9,7 @@ describe('aggregators', () => { type: 'does not matter', values: [ { labels: [], value: 1, hash: 'h1' }, - { labels: ['label1'], value: 2, hash: 'h2'}, + { labels: ['label1'], value: 2, hash: 'h2' }, ], }, { @@ -17,8 +17,8 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 3, hash: 'h1'}, - { labels: ['label1'], value: 4, hash: 'h2'}, + { labels: [], value: 3, hash: 'h1' }, + { labels: ['label1'], value: 4, hash: 'h2' }, ], }, ]; @@ -116,7 +116,7 @@ describe('aggregators', () => { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2'}], + values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }], }, ]; metrics2.workerSize = 2; diff --git a/test/clusterTest.js b/test/clusterTest.js index 81217a38..8596264a 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -78,9 +78,17 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.47, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { + value: 0.47, + labels: { method: 'get', code: 200 }, + hash: 'test_gauge{method="get",code="200"}', + }, { value: 0.64, labels: {}, hash: 'test_gauge{}' }, - { value: 23, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, + { + value: 23, + labels: { method: 'post', code: '300' }, + hash: 'test_gauge{method="post",code="300"}', + }, ], aggregator: 'sum', }, @@ -88,14 +96,22 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075832, labels: {}, hash: 'process_start_time_seconds{}' }], + values: [ + { + value: 1502075832, + labels: {}, + hash: 'process_start_time_seconds{}', + }, + ], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], + values: [ + { value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }, + ], aggregator: 'average', }, { @@ -138,9 +154,17 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.02, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { + value: 0.02, + labels: { method: 'get', code: 200 }, + hash: 'test_gauge{method="get",code="200"}', + }, { value: 0.24, labels: {}, hash: 'test_gauge{}' }, - { value: 51, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, + { + value: 51, + labels: { method: 'post', code: '300' }, + hash: 'test_gauge{method="post",code="300"}', + }, ], aggregator: 'sum', }, @@ -148,14 +172,22 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075849, labels: {}, hash: 'process_start_time_seconds{}' }], + values: [ + { + value: 1502075849, + labels: {}, + hash: 'process_start_time_seconds{}', + }, + ], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], + values: [ + { value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }, + ], aggregator: 'average', }, { From 49da0d97969cf8a048d6821319cc6a014fb38dab Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Sat, 1 Jun 2024 02:39:10 +0530 Subject: [PATCH 4/4] making values array empty in each scrape --- lib/cluster.js | 1 - lib/metricAggregators.js | 7 ++----- test/aggregatorsTest.js | 3 --- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 68923e6d..299e1eae 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -120,7 +120,6 @@ class AggregatorRegistry extends Registry { // Aggregate gathered metrics. metricsByName.forEach(metrics => { - metrics.workerSize = metricsArr.length; const aggregatorName = metrics[0].aggregator; const aggregatorFn = aggregators[aggregatorName]; if (typeof aggregatorFn !== 'function') { diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index 73ec77c2..5f43b554 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -28,11 +28,7 @@ function AggregatorFactory(aggregatorFn) { if (!valuesArray) { byLabels.set(value.hash, [value]); } else { - if (valuesArray.length < metrics.workerSize) { - valuesArray.push(value); - } else { - byLabels.set(value.hash, [value]); - } + valuesArray.push(value); } }); }); @@ -48,6 +44,7 @@ function AggregatorFactory(aggregatorFn) { } // NB: Timestamps are omitted. result.values.push(valObj); + values.length = 0; }); return result; }; diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 85677af0..f0570fb1 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -23,8 +23,6 @@ describe('aggregators', () => { }, ]; - metrics.workerSize = 2; - describe('sum', () => { it('properly sums values', () => { const result = aggregators.sum(metrics); @@ -119,7 +117,6 @@ describe('aggregators', () => { values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }], }, ]; - metrics2.workerSize = 2; const result = aggregators.sum(metrics2); expect(result.values).toEqual([ { value: 4, labels: [], metricName: 'abc' },