diff --git a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/index.ts b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/index.ts index 6450f7303dd885..4f2c7c4c339f12 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/index.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/index.ts @@ -6,4 +6,4 @@ export { installPipelines } from './install'; -export { deletePipelines, deletePipeline } from './remove'; +export { deletePreviousPipelines, deletePipeline } from './remove'; diff --git a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/remove.ts b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/remove.ts index 8be3a1beab3927..836b53b5a9225c 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/remove.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/ingest_pipeline/remove.ts @@ -8,24 +8,32 @@ import { SavedObjectsClientContract } from 'src/core/server'; import { appContextService } from '../../../'; import { CallESAsCurrentUser, ElasticsearchAssetType } from '../../../../types'; import { getInstallation } from '../../packages/get'; -import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common'; +import { PACKAGES_SAVED_OBJECT_TYPE, EsAssetReference } from '../../../../../common'; -export const deletePipelines = async ( +export const deletePreviousPipelines = async ( callCluster: CallESAsCurrentUser, savedObjectsClient: SavedObjectsClientContract, pkgName: string, - pkgVersion: string + previousPkgVersion: string ) => { const logger = appContextService.getLogger(); - const previousPipelinesPattern = `*-${pkgName}.*-${pkgVersion}`; - + const installation = await getInstallation({ savedObjectsClient, pkgName }); + if (!installation) return; + const installedEsAssets = installation.installed_es; + const installedPipelines = installedEsAssets.filter( + ({ type, id }) => + type === ElasticsearchAssetType.ingestPipeline && id.includes(previousPkgVersion) + ); + const deletePipelinePromises = installedPipelines.map(({ type, id }) => { + return deletePipeline(callCluster, id); + }); try { - await deletePipeline(callCluster, previousPipelinesPattern); + await Promise.all(deletePipelinePromises); } catch (e) { logger.error(e); } try { - await deletePipelineRefs(savedObjectsClient, pkgName, pkgVersion); + await deletePipelineRefs(savedObjectsClient, installedEsAssets, pkgName, previousPkgVersion); } catch (e) { logger.error(e); } @@ -33,12 +41,10 @@ export const deletePipelines = async ( export const deletePipelineRefs = async ( savedObjectsClient: SavedObjectsClientContract, + installedEsAssets: EsAssetReference[], pkgName: string, pkgVersion: string ) => { - const installation = await getInstallation({ savedObjectsClient, pkgName }); - if (!installation) return; - const installedEsAssets = installation.installed_es; const filteredAssets = installedEsAssets.filter(({ type, id }) => { if (type !== ElasticsearchAssetType.ingestPipeline) return true; if (!id.includes(pkgVersion)) return true; diff --git a/x-pack/plugins/ingest_manager/server/services/epm/packages/install.ts b/x-pack/plugins/ingest_manager/server/services/epm/packages/install.ts index 0911aaf248e7a3..6bc461845f1244 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/packages/install.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/packages/install.ts @@ -22,7 +22,7 @@ import * as Registry from '../registry'; import { getInstallation, getInstallationObject, isRequiredPackage } from './index'; import { installTemplates } from '../elasticsearch/template/install'; import { generateESIndexPatterns } from '../elasticsearch/template/template'; -import { installPipelines, deletePipelines } from '../elasticsearch/ingest_pipeline/'; +import { installPipelines, deletePreviousPipelines } from '../elasticsearch/ingest_pipeline/'; import { installILMPolicy } from '../elasticsearch/ilm/install'; import { installKibanaAssets, @@ -183,7 +183,7 @@ export async function installPackage({ // if this is an update, delete the previous version's pipelines if (installedPkg && !reinstall) { - await deletePipelines( + await deletePreviousPipelines( callCluster, savedObjectsClient, pkgName, diff --git a/x-pack/test/ingest_manager_api_integration/apis/epm/install_remove_assets.ts b/x-pack/test/ingest_manager_api_integration/apis/epm/install_remove_assets.ts index 7fb8b0a2b17084..d94f7adeab7a91 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/epm/install_remove_assets.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/epm/install_remove_assets.ts @@ -61,6 +61,16 @@ export default function (providerContext: FtrProviderContext) { path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}`, }); expect(res.statusCode).equal(200); + const resPipeline1 = await es.transport.request({ + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`, + }); + expect(resPipeline1.statusCode).equal(200); + const resPipeline2 = await es.transport.request({ + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`, + }); + expect(resPipeline2.statusCode).equal(200); }); it('should have installed the template components', async function () { const res = await es.transport.request({ @@ -147,6 +157,14 @@ export default function (providerContext: FtrProviderContext) { id: 'logs-all_assets.test_logs-0.1.0', type: 'ingest_pipeline', }, + { + id: 'logs-all_assets.test_logs-0.1.0-pipeline1', + type: 'ingest_pipeline', + }, + { + id: 'logs-all_assets.test_logs-0.1.0-pipeline2', + type: 'ingest_pipeline', + }, { id: 'logs-all_assets.test_logs', type: 'index_template', @@ -207,6 +225,26 @@ export default function (providerContext: FtrProviderContext) { } ); expect(res.statusCode).equal(404); + const resPipeline1 = await es.transport.request( + { + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`, + }, + { + ignore: [404], + } + ); + expect(resPipeline1.statusCode).equal(404); + const resPipeline2 = await es.transport.request( + { + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`, + }, + { + ignore: [404], + } + ); + expect(resPipeline2.statusCode).equal(404); }); it('should have uninstalled the kibana assets', async function () { let resDashboard; diff --git a/x-pack/test/ingest_manager_api_integration/apis/epm/update_assets.ts b/x-pack/test/ingest_manager_api_integration/apis/epm/update_assets.ts index 59ad7a9744ae1f..8ad6fe12dcd43f 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/epm/update_assets.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/epm/update_assets.ts @@ -154,24 +154,49 @@ export default function (providerContext: FtrProviderContext) { }, }); }); - it('should have installed the new versionized pipeline', async function () { + it('should have installed the new versionized pipelines', async function () { const res = await es.transport.request({ method: 'GET', path: `/_ingest/pipeline/${logsTemplateName}-${pkgUpdateVersion}`, }); expect(res.statusCode).equal(200); + const resPipeline1 = await es.transport.request({ + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgUpdateVersion}-pipeline1`, + }); + expect(resPipeline1.statusCode).equal(200); }); it('should have removed the old versionized pipelines', async function () { - let res; - try { - res = await es.transport.request({ + const res = await es.transport.request( + { method: 'GET', path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}`, - }); - } catch (err) { - res = err; - } + }, + { + ignore: [404], + } + ); expect(res.statusCode).equal(404); + const resPipeline1 = await es.transport.request( + { + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`, + }, + { + ignore: [404], + } + ); + expect(resPipeline1.statusCode).equal(404); + const resPipeline2 = await es.transport.request( + { + method: 'GET', + path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`, + }, + { + ignore: [404], + } + ); + expect(resPipeline2.statusCode).equal(404); }); it('should have updated the template components', async function () { const res = await es.transport.request({ @@ -272,6 +297,10 @@ export default function (providerContext: FtrProviderContext) { id: 'logs-all_assets.test_logs-0.2.0', type: 'ingest_pipeline', }, + { + id: 'logs-all_assets.test_logs-0.2.0-pipeline1', + type: 'ingest_pipeline', + }, { id: 'logs-all_assets.test_logs', type: 'index_template', diff --git a/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.1.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline1.yml b/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.1.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline1.yml new file mode 100644 index 00000000000000..c2471c56ee22a3 --- /dev/null +++ b/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.1.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline1.yml @@ -0,0 +1,9 @@ +--- +description: Pipeline test +processors: +- remove: + field: messag +on_failure: + - set: + field: error.message + value: "{{ _ingest.on_failure_message }}" \ No newline at end of file diff --git a/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.1.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline2.yml b/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.1.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline2.yml new file mode 100644 index 00000000000000..c2471c56ee22a3 --- /dev/null +++ b/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.1.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline2.yml @@ -0,0 +1,9 @@ +--- +description: Pipeline test +processors: +- remove: + field: messag +on_failure: + - set: + field: error.message + value: "{{ _ingest.on_failure_message }}" \ No newline at end of file diff --git a/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.2.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline1.yml b/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.2.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline1.yml new file mode 100644 index 00000000000000..c2471c56ee22a3 --- /dev/null +++ b/x-pack/test/ingest_manager_api_integration/apis/fixtures/test_packages/all_assets/0.2.0/dataset/test_logs/elasticsearch/ingest_pipeline/pipeline1.yml @@ -0,0 +1,9 @@ +--- +description: Pipeline test +processors: +- remove: + field: messag +on_failure: + - set: + field: error.message + value: "{{ _ingest.on_failure_message }}" \ No newline at end of file