Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] fix removing ingest pipelines from elasticsearch #75092

Merged
merged 2 commits into from
Aug 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

export { installPipelines } from './install';

export { deletePipelines, deletePipeline } from './remove';
export { deletePreviousPipelines, deletePipeline } from './remove';
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,43 @@ import { SavedObjectsClientContract } from 'src/core/server';
import { appContextService } from '../../../';
import { CallESAsCurrentUser, ElasticsearchAssetType } from '../../../../types';
import { getInstallation } from '../../packages/get';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common';
import { PACKAGES_SAVED_OBJECT_TYPE, EsAssetReference } from '../../../../../common';

export const deletePipelines = async (
export const deletePreviousPipelines = async (
callCluster: CallESAsCurrentUser,
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
pkgVersion: string
previousPkgVersion: string
) => {
const logger = appContextService.getLogger();
const previousPipelinesPattern = `*-${pkgName}.*-${pkgVersion}`;

const installation = await getInstallation({ savedObjectsClient, pkgName });
if (!installation) return;
const installedEsAssets = installation.installed_es;
const installedPipelines = installedEsAssets.filter(
({ type, id }) =>
type === ElasticsearchAssetType.ingestPipeline && id.includes(previousPkgVersion)
);
const deletePipelinePromises = installedPipelines.map(({ type, id }) => {
return deletePipeline(callCluster, id);
});
try {
await deletePipeline(callCluster, previousPipelinesPattern);
await Promise.all(deletePipelinePromises);
} catch (e) {
logger.error(e);
}
try {
await deletePipelineRefs(savedObjectsClient, pkgName, pkgVersion);
await deletePipelineRefs(savedObjectsClient, installedEsAssets, pkgName, previousPkgVersion);
} catch (e) {
logger.error(e);
}
};

export const deletePipelineRefs = async (
savedObjectsClient: SavedObjectsClientContract,
installedEsAssets: EsAssetReference[],
pkgName: string,
pkgVersion: string
) => {
const installation = await getInstallation({ savedObjectsClient, pkgName });
if (!installation) return;
const installedEsAssets = installation.installed_es;
const filteredAssets = installedEsAssets.filter(({ type, id }) => {
if (type !== ElasticsearchAssetType.ingestPipeline) return true;
if (!id.includes(pkgVersion)) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import * as Registry from '../registry';
import { getInstallation, getInstallationObject, isRequiredPackage } from './index';
import { installTemplates } from '../elasticsearch/template/install';
import { generateESIndexPatterns } from '../elasticsearch/template/template';
import { installPipelines, deletePipelines } from '../elasticsearch/ingest_pipeline/';
import { installPipelines, deletePreviousPipelines } from '../elasticsearch/ingest_pipeline/';
import { installILMPolicy } from '../elasticsearch/ilm/install';
import {
installKibanaAssets,
Expand Down Expand Up @@ -183,7 +183,7 @@ export async function installPackage({

// if this is an update, delete the previous version's pipelines
if (installedPkg && !reinstall) {
await deletePipelines(
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ export default function (providerContext: FtrProviderContext) {
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}`,
});
expect(res.statusCode).equal(200);
const resPipeline1 = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`,
});
expect(resPipeline1.statusCode).equal(200);
const resPipeline2 = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`,
});
expect(resPipeline2.statusCode).equal(200);
});
it('should have installed the template components', async function () {
const res = await es.transport.request({
Expand Down Expand Up @@ -147,6 +157,14 @@ export default function (providerContext: FtrProviderContext) {
id: 'logs-all_assets.test_logs-0.1.0',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs-0.1.0-pipeline1',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs-0.1.0-pipeline2',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs',
type: 'index_template',
Expand Down Expand Up @@ -207,6 +225,26 @@ export default function (providerContext: FtrProviderContext) {
}
);
expect(res.statusCode).equal(404);
const resPipeline1 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`,
},
{
ignore: [404],
}
);
expect(resPipeline1.statusCode).equal(404);
const resPipeline2 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`,
},
{
ignore: [404],
}
);
expect(resPipeline2.statusCode).equal(404);
});
it('should have uninstalled the kibana assets', async function () {
let resDashboard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,49 @@ export default function (providerContext: FtrProviderContext) {
},
});
});
it('should have installed the new versionized pipeline', async function () {
it('should have installed the new versionized pipelines', async function () {
const res = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgUpdateVersion}`,
});
expect(res.statusCode).equal(200);
const resPipeline1 = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgUpdateVersion}-pipeline1`,
});
expect(resPipeline1.statusCode).equal(200);
});
it('should have removed the old versionized pipelines', async function () {
let res;
try {
res = await es.transport.request({
const res = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}`,
});
} catch (err) {
res = err;
}
},
{
ignore: [404],
}
);
expect(res.statusCode).equal(404);
const resPipeline1 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`,
},
{
ignore: [404],
}
);
expect(resPipeline1.statusCode).equal(404);
const resPipeline2 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`,
},
{
ignore: [404],
}
);
expect(resPipeline2.statusCode).equal(404);
});
it('should have updated the template components', async function () {
const res = await es.transport.request({
Expand Down Expand Up @@ -272,6 +297,10 @@ export default function (providerContext: FtrProviderContext) {
id: 'logs-all_assets.test_logs-0.2.0',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs-0.2.0-pipeline1',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs',
type: 'index_template',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
description: Pipeline test
processors:
- remove:
field: messag
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
description: Pipeline test
processors:
- remove:
field: messag
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
description: Pipeline test
processors:
- remove:
field: messag
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"