Skip to content

Commit

Permalink
Revert "Futures for parallel _mget calls (#50)"
Browse files Browse the repository at this point in the history
This reverts commit 45999e3.
  • Loading branch information
rockdaboot committed Mar 29, 2022
1 parent 68bcf97 commit 294dd16
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 104 deletions.
39 changes: 0 additions & 39 deletions src/plugins/profiling/server/routes/search_flamechart.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import {
getSampledTraceEventsIndex,
extractFileIDFromFrameID,
DownsampledEventsIndex,
parallelMget,
} from './search_flamechart';
import { ElasticsearchClient } from 'kibana/server';

describe('Using down-sampled indexes', () => {
test('getSampledTraceEventsIndex', () => {
Expand Down Expand Up @@ -88,40 +86,3 @@ describe('Extract FileID from FrameID', () => {
}
});
});

describe('Calling mget from events to stacktraces', () => {
test('parallel queries to ES are resolved as promises', async () => {
const numberOfFrames = 4;
const mock = mockClient(numberOfFrames) as unknown as ElasticsearchClient;
const futures = parallelMget(4, Array.from(['a', 'b', 'c', 'd']), 1, mock);
const results = futures();
expect(mock.mget).toBeCalledTimes(4);
expect(results.length).toEqual(4);
Promise.all(results).then((all) => {
all.forEach((a) => {
expect(a.body.docs[0].found).toBe(true);
expect(a.body.docs[0]._source.FrameID.length).toEqual(numberOfFrames);
});
});
});
});

const mockClient = (frames: number) => {
const mockEsQueryMgetResult = (): Promise<any> => {
const framesArray = [...Array(frames).keys()].map((i) => i);
return new Promise((resolve) => {
return resolve({
body: {
docs: [{ found: true, _source: { FrameID: framesArray } }],
// testing
hits: {
hits: [{ fields: { FrameID: framesArray } }],
},
},
});
});
};
return {
mget: jest.fn().mockResolvedValue(mockEsQueryMgetResult()),
};
};
134 changes: 71 additions & 63 deletions src/plugins/profiling/server/routes/search_flamechart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,31 +168,6 @@ function getNumberOfUniqueStacktracesWithoutLeafNode(
return stackTracesNoLeaf.size;
}

export function parallelMget(
nQueries: number,
stackTraceIDs: StackTraceID[],
chunkSize: number,
client: ElasticsearchClient
) {
return (): Array<Promise<any>> => {
const futures: Array<Promise<any>> = [];
for (let i = 0; i < nQueries; i++) {
const func = async () => {
const chunk = stackTraceIDs.slice(chunkSize * i, chunkSize * (i + 1));
return client.mget({
index: 'profiling-stacktraces',
ids: [...chunk],
_source_includes: ['FrameID', 'Type'],
});
};

// Build and send the queries asynchronously.
futures.push(func());
}
return futures;
};
}

async function queryFlameGraph(
logger: Logger,
client: ElasticsearchClient,
Expand Down Expand Up @@ -296,54 +271,87 @@ async function queryFlameGraph(
logger.info('unique downsampled stacktraces: ' + stackTraceEvents.size);
}

// profiling-stacktraces is configured with 16 shards
const nQueries = 16;
const chunkSize = Math.floor(stackTraceEvents.size / nQueries);
const stackTraceIDs = [...stackTraceEvents.keys()];
const nQueries = 4;
const results = new Array(nQueries);

// Create a lookup map StackTraceID -> StackTrace.
const stackTraces = new Map<StackTraceID, StackTrace>();
await logExecutionLatency(
logger,
'mget query for ' + stackTraceEvents.size + ' stacktraces',
async () => {
return await Promise.all(parallelMget(nQueries, stackTraceIDs, chunkSize, client)())
.then((results) => {
results.map((result) => {
if (testing) {
for (const trace of result.body.hits.hits) {
const frameIDs = trace.fields.FrameID as string[];
const fileIDs = extractFileIDArrayFromFrameIDArray(frameIDs);
stackTraces.set(trace._id, {
FileID: fileIDs,
FrameID: frameIDs,
Type: trace.fields.Type,
});
}
} else {
for (const trace of result.body.docs) {
// Sometimes we don't find the trace.
// This is due to ES delays writing (data is not immediately seen after write).
// Also, ES doesn't know about transactions.
if (trace.found) {
const frameIDs = trace._source.FrameID as string[];
const fileIDs = extractFileIDArrayFromFrameIDArray(frameIDs);
stackTraces.set(trace._id, {
FileID: fileIDs,
FrameID: frameIDs,
Type: trace._source.Type,
});
}
}
}
const promises = new Array(nQueries);
const chunkSize = Math.floor(stackTraceEvents.size / nQueries);
const stackTraceIDs = [...stackTraceEvents.keys()];

logger.info('A');

for (let i = 0; i < nQueries; i++) {
const func = async () => {
const chunk = stackTraceIDs.slice(chunkSize * i, chunkSize * (i + 1));
return client.mget({
index: 'profiling-stacktraces',
ids: [...chunk],
_source_includes: ['FrameID', 'Type'],
});
})
.catch((err) => {
logger.error('Failed to get stacktraces from _mget: ' + err.message);
};

// Build and send the queries asynchronously.
promises[i] = func();
}

logger.info('B');

/* for (let i = 0; i < nQueries; i++) {
await Promise.any(promises).then((res) => {
results[i] = res;
logger.info('Got result ' + res.body.docs.length);
});
}*/

/* await Promise.all(promises).then((res) => {
results.push(res);
logger.info('Got result');
logger.info(`Results: ` + res);
});
*/
for (let i = 0; i < nQueries; i++) {
results[i] = await promises[i];
}
}
);

logger.info('results len ' + results.length);

// Create a lookup map StackTraceID -> StackTrace.
const stackTraces = new Map<StackTraceID, StackTrace>();
for (let i = 0; i < nQueries; i++) {
if (testing) {
for (const trace of results[i].body.hits.hits) {
const frameIDs = trace.fields.FrameID as string[];
const fileIDs = extractFileIDArrayFromFrameIDArray(frameIDs);
stackTraces.set(trace._id, {
FileID: fileIDs,
FrameID: frameIDs,
Type: trace.fields.Type,
});
}
} else {
for (const trace of results[i].body.docs) {
// Sometimes we don't find the trace.
// This is due to ES delays writing (data is not immediately seen after write).
// Also, ES doesn't know about transactions.
if (trace.found) {
const frameIDs = trace._source.FrameID as string[];
const fileIDs = extractFileIDArrayFromFrameIDArray(frameIDs);
stackTraces.set(trace._id, {
FileID: fileIDs,
FrameID: frameIDs,
Type: trace._source.Type,
});
}
}
}
}

if (stackTraces.size < stackTraceEvents.size) {
logger.info(
'failed to find ' +
Expand Down
2 changes: 0 additions & 2 deletions src/plugins/profiling/server/routes/search_topn.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ describe('TopN data from Elasticsearch', () => {
'123',
'456',
'789',
200,
'field',
kibanaResponseFactory
);
Expand All @@ -113,7 +112,6 @@ describe('TopN data from Elasticsearch', () => {
'123',
'456',
'789',
200,
'StackTraceID',
kibanaResponseFactory
);
Expand Down

0 comments on commit 294dd16

Please sign in to comment.