From 2759c2ec38208272fc41a69c0764c50571bc25cb Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Fri, 20 May 2022 16:55:48 +0100 Subject: [PATCH 01/19] init taskManager bulk --- .../alerting/server/routes/bulk_edit_rules.ts | 9 ++++ .../server/rules_client/rules_client.ts | 33 +++++++++++- x-pack/plugins/task_manager/server/plugin.ts | 3 +- .../task_manager/server/task_scheduling.ts | 54 +++++++++++++++++-- 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts index 6588a46e1d9141..4925ee12b2bd0d 100644 --- a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts +++ b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts @@ -12,6 +12,10 @@ import { ILicenseState, RuleTypeDisabledError } from '../lib'; import { verifyAccessAndContext, rewriteRule, handleDisabledApiKeysError } from './lib'; import { AlertingRequestHandlerContext, INTERNAL_BASE_ALERTING_API_PATH } from '../types'; +const scheduleSchema = schema.object({ + interval: schema.string(), +}); + const ruleActionSchema = schema.object({ group: schema.string(), id: schema.string(), @@ -34,6 +38,11 @@ const operationsSchema = schema.arrayOf( field: schema.literal('actions'), value: schema.arrayOf(ruleActionSchema), }), + schema.object({ + operation: schema.literal('set'), + field: schema.literal('schedule'), + value: scheduleSchema, + }), ]), { minSize: 1 } ); diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index 4e248412eae156..10040d36596918 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -211,7 +211,7 @@ export interface FindOptions extends IndexType { filter?: string; } -export type BulkEditFields = keyof Pick; +export type BulkEditFields = keyof Pick; export type BulkEditOperation = | { @@ -223,6 +223,11 @@ export type BulkEditOperation = operation: 'add' | 'set'; field: Extract; value: NormalizedAlertAction[]; + } + | { + operation: 'set'; + field: Extract; + value: Rule['schedule']; }; // schedule, throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented @@ -1494,6 +1499,32 @@ export class RulesClient { ); }); + // update schedules only if schedule operation is present + const scheduleOperation = options.operations.find( + (op): op is Extract }> => + op.field === 'schedule' + ); + + if (scheduleOperation?.value) { + const taskIds = updatedRules.reduce((acc, rule) => { + if (rule.scheduledTaskId) { + acc.push(rule.scheduledTaskId); + } + return acc; + }, []); + + try { + await this.taskManager.bulkUpdateSchedules(taskIds, scheduleOperation.value); + } catch (error) { + this.auditLogger?.log( + ruleAuditEvent({ + action: RuleAuditAction.BULK_EDIT, + error, + }) + ); + } + } + return { rules: updatedRules, errors, total }; } diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 3694f01227178e..0a7e25230b6d5d 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -48,7 +48,7 @@ export interface TaskManagerSetupContract { export type TaskManagerStartContract = Pick< TaskScheduling, - 'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' + 'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules' > & Pick & { removeIfExists: TaskStore['remove']; @@ -238,6 +238,7 @@ export class TaskManagerPlugin schedule: (...args) => taskScheduling.schedule(...args), ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args), runNow: (...args) => taskScheduling.runNow(...args), + bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args), ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task), supportsEphemeralTasks: () => this.config.ephemeral_tasks.enabled, }; diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index a38e2d23fccec1..a1653012b7adbe 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -6,15 +6,16 @@ */ import { filter, take } from 'rxjs/operators'; - +import pMap from 'p-map'; import { pipe } from 'fp-ts/lib/pipeable'; import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option'; import uuid from 'uuid'; -import { pick } from 'lodash'; +import { pick, chunk } from 'lodash'; import { merge, Subject } from 'rxjs'; import agent from 'elastic-apm-node'; import { Logger } from '@kbn/core/server'; +import { mustBeAllOf } from './queries/query_clauses'; import { asOk, either, map, mapErr, promiseResult, isErr } from './lib/result_type'; import { isTaskRunEvent, @@ -28,6 +29,7 @@ import { TaskClaimErrorType, } from './task_events'; import { Middleware } from './lib/middleware'; +import { parseIntervalAsMillisecond } from './lib/intervals'; import { ConcreteTaskInstance, TaskInstanceWithId, @@ -36,8 +38,9 @@ import { TaskLifecycleResult, TaskStatus, EphemeralTask, + IntervalSchedule, } from './task'; -import { TaskStore } from './task_store'; +import { TaskStore, BulkUpdateResult } from './task_store'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle'; import { TaskTypeDictionary } from './task_type_dictionary'; @@ -111,6 +114,51 @@ export class TaskScheduling { }); } + /** + * Bulk updates schedules for tasks by ids. + * + * @param taskIs - list of task ids + * @param schedule - new schedule + * @returns {Promise} + */ + public async bulkUpdateSchedules( + taskIds: string[], + schedule: IntervalSchedule + ): Promise { + const tasks = await pMap(chunk(taskIds, 100), async (taskIdsChunk) => + this.store.fetch({ + query: mustBeAllOf( + { + terms: { + _id: taskIdsChunk.map((taskId) => `task:${taskId}`), + }, + }, + { + term: { + 'task.status': 'idle', + }, + } + ), + size: 100, + }) + ); + + const updatedTasks = tasks + .flatMap(({ docs }) => docs) + .map((task) => { + const oldInterval = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); + + const newRunAtInMs = Math.max( + Date.now(), + task.runAt.getTime() - oldInterval + parseIntervalAsMillisecond(schedule.interval) + ); + + return { ...task, schedule, runAt: new Date(newRunAtInMs) }; + }); + + return this.store.bulkUpdate(updatedTasks); + } + /** * Run task. * From d24c2f30b4b7f57b1b2af70519111e9495018cf0 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 23 May 2022 12:48:21 +0100 Subject: [PATCH 02/19] updates and fixes --- x-pack/plugins/task_manager/server/mocks.ts | 1 + .../task_manager/server/task_scheduling.ts | 37 ++++++++++--------- .../group1/tests/alerting/bulk_edit.ts | 2 +- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 2db8cdd6268c79..2870111ebafefe 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -27,6 +27,7 @@ const createStartMock = () => { ensureScheduled: jest.fn(), removeIfExists: jest.fn(), supportsEphemeralTasks: jest.fn(), + bulkUpdateSchedules: jest.fn(), }; return mock; }; diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index a1653012b7adbe..2e27ff723912fd 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -125,32 +125,35 @@ export class TaskScheduling { taskIds: string[], schedule: IntervalSchedule ): Promise { - const tasks = await pMap(chunk(taskIds, 100), async (taskIdsChunk) => - this.store.fetch({ - query: mustBeAllOf( - { - terms: { - _id: taskIdsChunk.map((taskId) => `task:${taskId}`), - }, - }, - { - term: { - 'task.status': 'idle', + const tasks = await pMap( + chunk(taskIds, 100), + async (taskIdsChunk) => + this.store.fetch({ + query: mustBeAllOf( + { + terms: { + _id: taskIdsChunk.map((taskId) => `task:${taskId}`), + }, }, - } - ), - size: 100, - }) + { + term: { + 'task.status': 'idle', + }, + } + ), + size: 100, + }), + { concurrency: 10 } ); const updatedTasks = tasks .flatMap(({ docs }) => docs) .map((task) => { - const oldInterval = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); + const oldIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); const newRunAtInMs = Math.max( Date.now(), - task.runAt.getTime() - oldInterval + parseIntervalAsMillisecond(schedule.interval) + task.runAt.getTime() - oldIntervalInMs + parseIntervalAsMillisecond(schedule.interval) ); return { ...task, schedule, runAt: new Date(newRunAtInMs) }; diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts index 6eeafe84724992..89c14d55ce41c2 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts @@ -437,7 +437,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; From 6efc5fb9a8383e3ee2725319822032aa8237276d Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 23 May 2022 14:28:23 +0100 Subject: [PATCH 03/19] fix the rest of tests --- .../security_and_spaces/group1/tests/alerting/bulk_edit.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts index 89c14d55ce41c2..35ea847f70ef45 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts @@ -482,7 +482,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; @@ -520,7 +520,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; From 7e1e7c77c379638b30f94f62676649bc577854a1 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Thu, 26 May 2022 17:14:22 +0100 Subject: [PATCH 04/19] add unit tests --- .../server/task_scheduling.test.ts | 94 +++++++++++++++++++ .../task_manager/server/task_scheduling.ts | 4 +- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index 6fe368d495adea..db7db2df52ec47 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -27,6 +27,9 @@ import { TaskRunResult } from './task_running'; import { mockLogger } from './test_utils'; import { TaskTypeDictionary } from './task_type_dictionary'; import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock'; +import { mustBeAllOf } from './queries/query_clauses'; + +const ONE_HOUR_IN_MS = 60 * 60 * 1000; jest.mock('uuid', () => ({ v4: () => 'v4uuid', @@ -134,6 +137,97 @@ describe('TaskScheduling', () => { }); }); + describe('bulkUpdateSchedules', () => { + const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; + + beforeEach(() => {}); + + test('should search for tasks by ids and idle status', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkUpdateSchedules([id], { interval: '1h' }); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1); + expect(mockTaskStore.fetch).toHaveBeenCalledWith({ + query: mustBeAllOf( + { + terms: { + _id: [`task:${id}`], + }, + }, + { + term: { + 'task.status': 'idle', + }, + } + ), + size: 100, + }); + }); + + test('should split search on chunks when input ids array too large', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkUpdateSchedules(Array.from({ length: 1250 }), { interval: '1h' }); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13); + }); + + test('should postpone task run if new interval is greater than previous', async () => { + // task set to be run in one 1hr from now + const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '5h' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload).toHaveLength(1); + expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' }); + // if we update rule with schedule of '5h' and prev interval was 3h, task will be run in 2 hours later + expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe(2 * ONE_HOUR_IN_MS); + }); + + test('should set task run sooner if new interval is lesser than previous', async () => { + // task set to be run in one 2hrs from now + const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '2h' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' }); + // if we update rule with schedule of '2h' and prev interval was 3h, task will be run in 1 hour sooner + expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe(ONE_HOUR_IN_MS); + }); + + test('should set task run to now if time that passed from last run is greater than new interval', async () => { + // task set to be run in one 1hr from now + const runInOneHr = new Date(Date.now() + ONE_HOUR_IN_MS); + const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '30m' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '30m' }); + + // if time that passed from last rule task is greater than new interval, task should be set to run at now time + expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now()); + }); + }); describe('runNow', () => { test('resolves when the task run succeeds', () => { const events$ = new Subject(); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 2e27ff723912fd..98ec77aeb6fb61 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -117,8 +117,8 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. * - * @param taskIs - list of task ids - * @param schedule - new schedule + * @param taskIss string[] - list of task ids + * @param schedule IntervalSchedule - new schedule * @returns {Promise} */ public async bulkUpdateSchedules( From 815854b08994e3a6233fb5d27e8759b76ad2a523 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 30 May 2022 20:10:37 +0100 Subject: [PATCH 05/19] tests!!! --- x-pack/plugins/task_manager/server/index.ts | 2 +- .../server/task_scheduling.test.ts | 48 ++++++++++--- .../task_manager/server/task_scheduling.ts | 25 +++++-- .../sample_task_plugin/server/init_routes.ts | 25 +++++++ .../task_manager/task_management.ts | 68 ++++++++++++++++++- 5 files changed, 152 insertions(+), 16 deletions(-) diff --git a/x-pack/plugins/task_manager/server/index.ts b/x-pack/plugins/task_manager/server/index.ts index f6cb3a6e6b1d57..88a27d040b1769 100644 --- a/x-pack/plugins/task_manager/server/index.ts +++ b/x-pack/plugins/task_manager/server/index.ts @@ -30,7 +30,7 @@ export { throwUnrecoverableError, isEphemeralTaskRejectedDueToCapacityError, } from './task_running'; -export type { RunNowResult } from './task_scheduling'; +export type { RunNowResult, BulkUpdateSchedulesResult } from './task_scheduling'; export { getOldestIdleActionTask } from './queries/oldest_idle_action_task'; export { IdleTaskWithExpiredRunAt, diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index db7db2df52ec47..fbc271c0ed4ee8 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -7,6 +7,7 @@ import { Subject } from 'rxjs'; import { none, some } from 'fp-ts/lib/Option'; +import moment from 'moment'; import { asTaskMarkRunningEvent, @@ -29,8 +30,6 @@ import { TaskTypeDictionary } from './task_type_dictionary'; import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock'; import { mustBeAllOf } from './queries/query_clauses'; -const ONE_HOUR_IN_MS = 60 * 60 * 1000; - jest.mock('uuid', () => ({ v4: () => 'v4uuid', })); @@ -139,8 +138,11 @@ describe('TaskScheduling', () => { describe('bulkUpdateSchedules', () => { const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; - - beforeEach(() => {}); + beforeEach(() => { + mockTaskStore.bulkUpdate.mockImplementation(() => + Promise.resolve([{ tag: 'ok', value: mockTask() }]) + ); + }); test('should search for tasks by ids and idle status', async () => { mockTaskStore.fetch.mockResolvedValue({ docs: [] }); @@ -175,9 +177,31 @@ describe('TaskScheduling', () => { expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13); }); + test('should transform response into correct format', async () => { + const successfulTask = mockTask({ id: 'task-1', schedule: { interval: '1h' } }); + const failedTask = mockTask({ id: 'task-2', schedule: { interval: '1h' } }); + mockTaskStore.bulkUpdate.mockImplementation(() => + Promise.resolve([ + { tag: 'ok', value: successfulTask }, + { tag: 'err', error: { entity: failedTask, error: new Error('fail') } }, + ]) + ); + mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + const result = await taskScheduling.bulkUpdateSchedules([successfulTask.id, failedTask.id], { + interval: '1h', + }); + + expect(result).toEqual({ + tasks: [successfulTask], + errors: [{ task: failedTask, error: new Error('fail') }], + }); + }); + test('should postpone task run if new interval is greater than previous', async () => { - // task set to be run in one 1hr from now - const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + // task set to be run in 2 hrs from now + const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); @@ -190,12 +214,14 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload).toHaveLength(1); expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' }); // if we update rule with schedule of '5h' and prev interval was 3h, task will be run in 2 hours later - expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe(2 * ONE_HOUR_IN_MS); + expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe( + moment.duration(2, 'hours').asMilliseconds() + ); }); test('should set task run sooner if new interval is lesser than previous', async () => { // task set to be run in one 2hrs from now - const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); @@ -207,12 +233,14 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' }); // if we update rule with schedule of '2h' and prev interval was 3h, task will be run in 1 hour sooner - expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe(ONE_HOUR_IN_MS); + expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe( + moment.duration(1, 'hour').asMilliseconds() + ); }); test('should set task run to now if time that passed from last run is greater than new interval', async () => { // task set to be run in one 1hr from now - const runInOneHr = new Date(Date.now() + ONE_HOUR_IN_MS); + const runInOneHr = new Date(Date.now() + moment.duration(1, 'hour').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr }); mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 98ec77aeb6fb61..2241a35a6813b2 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -40,7 +40,7 @@ import { EphemeralTask, IntervalSchedule, } from './task'; -import { TaskStore, BulkUpdateResult } from './task_store'; +import { TaskStore } from './task_store'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle'; import { TaskTypeDictionary } from './task_type_dictionary'; @@ -59,6 +59,10 @@ export interface TaskSchedulingOpts { taskManagerId: string; } +export interface BulkUpdateSchedulesResult { + tasks: ConcreteTaskInstance[]; + errors: Array<{ task: ConcreteTaskInstance; error: Error }>; +} export interface RunNowResult { id: ConcreteTaskInstance['id']; state?: ConcreteTaskInstance['state']; @@ -117,14 +121,14 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. * - * @param taskIss string[] - list of task ids + * @param taskIds string[] - list of task ids * @param schedule IntervalSchedule - new schedule * @returns {Promise} */ public async bulkUpdateSchedules( taskIds: string[], schedule: IntervalSchedule - ): Promise { + ): Promise { const tasks = await pMap( chunk(taskIds, 100), async (taskIdsChunk) => @@ -159,7 +163,20 @@ export class TaskScheduling { return { ...task, schedule, runAt: new Date(newRunAtInMs) }; }); - return this.store.bulkUpdate(updatedTasks); + const result: BulkUpdateSchedulesResult = { + tasks: [], + errors: [], + }; + + (await this.store.bulkUpdate(updatedTasks)).forEach((task) => { + if (task.tag === 'ok') { + result.tasks.push(task.value); + } else { + result.errors.push({ error: task.error.error, task: task.error.entity }); + } + }); + + return result; } /** diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts index 2d98a58d29d7c1..539cef69d92eb6 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts @@ -111,6 +111,31 @@ export function initRoutes( } ); + router.post( + { + path: `/api/sample_tasks/bulk_update_schedules`, + validate: { + body: schema.object({ + taskIds: schema.arrayOf(schema.string()), + schedule: schema.object({ interval: schema.string() }), + }), + }, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ) { + const { taskIds, schedule } = req.body; + try { + const taskManager = await taskManagerStart; + return res.ok({ body: await taskManager.bulkUpdateSchedules(taskIds, schedule) }); + } catch (err) { + return res.ok({ body: { taskIds, error: `${err}` } }); + } + } + ); + router.post( { path: `/api/sample_tasks/ephemeral_run_now`, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index 51fb161ac0d6a9..d155b76fc0101c 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -5,12 +5,13 @@ * 2.0. */ +import moment from 'moment'; import { random, times } from 'lodash'; import expect from '@kbn/expect'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json'; import { DEFAULT_MAX_WORKERS, DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config'; -import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; +import { ConcreteTaskInstance, BulkUpdateSchedulesResult } from '@kbn/task-manager-plugin/server'; import { FtrProviderContext } from '../../ftr_provider_context'; const { @@ -177,6 +178,15 @@ export default function ({ getService }: FtrProviderContext) { .then((response) => response.body); } + function bulkUpdateSchedules(taskIds: string[], schedule: { interval: string }) { + return supertest + .post('/api/sample_tasks/bulk_update_schedules') + .set('kbn-xsrf', 'xxx') + .send({ taskIds, schedule }) + .expect(200) + .then((response: { body: BulkUpdateSchedulesResult }) => response.body); + } + // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 // function runEphemeralTaskNow(task: { // taskType: string; @@ -899,6 +909,62 @@ export default function ({ getService }: FtrProviderContext) { }); }); + it('should bulk updates schedules for multiple tasks', async () => { + const initialTime = Date.now(); + const tasks = await Promise.all([ + scheduleTask({ + taskType: 'sampleTask', + schedule: { interval: '1h' }, + params: {}, + }), + + scheduleTask({ + taskType: 'sampleTask', + schedule: { interval: '5m' }, + params: {}, + }), + ]); + + const taskIds = tasks.map(({ id }) => id); + + await retry.try(async () => { + // ensure each task has ran at least once and been rescheduled for future run + for (const task of tasks) { + const { state } = await currentTask<{ count: number }>(task.id); + expect(state.count).to.be(1); + } + + // first task to be scheduled in 1h + expect(Date.parse((await currentTask(tasks[0].id)).runAt) - initialTime).to.be.greaterThan( + moment.duration(1, 'hour').asMilliseconds() + ); + + // second task to be scheduled in 5m + expect(Date.parse((await currentTask(tasks[1].id)).runAt) - initialTime).to.be.greaterThan( + moment.duration(5, 'minutes').asMilliseconds() + ); + }); + + await retry.try(async () => { + const updates = await bulkUpdateSchedules(taskIds, { interval: '3h' }); + + expect(updates.tasks.length).to.be(2); + expect(updates.errors.length).to.be(0); + }); + + await retry.try(async () => { + const updatedTasks = (await currentTasks()).docs; + + updatedTasks.forEach((task) => { + expect(task.schedule).to.eql({ interval: '3h' }); + // should be scheduled to run in 3 hours + expect(Date.parse(task.runAt) - initialTime).to.be.greaterThan( + moment.duration(3, 'hours').asMilliseconds() + ); + }); + }); + }); + // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 // it('should return the resulting task state when asked to run an ephemeral task now', async () => { // const ephemeralTask = await runEphemeralTaskNow({ From 79fc692a4588018920b759015e0a4d50f9d9e817 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 30 May 2022 20:15:42 +0100 Subject: [PATCH 06/19] refactor it --- .../task_manager/server/task_scheduling.ts | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 2241a35a6813b2..6f67d47124cd5d 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -163,20 +163,18 @@ export class TaskScheduling { return { ...task, schedule, runAt: new Date(newRunAtInMs) }; }); - const result: BulkUpdateSchedulesResult = { - tasks: [], - errors: [], - }; - - (await this.store.bulkUpdate(updatedTasks)).forEach((task) => { - if (task.tag === 'ok') { - result.tasks.push(task.value); - } else { - result.errors.push({ error: task.error.error, task: task.error.entity }); - } - }); + return (await this.store.bulkUpdate(updatedTasks)).reduce( + (acc, task) => { + if (task.tag === 'ok') { + acc.tasks.push(task.value); + } else { + acc.errors.push({ error: task.error.error, task: task.error.entity }); + } - return result; + return acc; + }, + { tasks: [], errors: [] } + ); } /** From e9d44266d68eb447f5d2bd383a21c06cd680beb9 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Tue, 31 May 2022 10:54:06 +0100 Subject: [PATCH 07/19] add test to rukes_client --- .../server/rules_client/rules_client.ts | 13 ++-- .../rules_client/tests/bulk_edit.test.ts | 77 +++++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index 9869601dee932b..6ebb675ab633a7 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -230,12 +230,7 @@ export type BulkEditOperation = value: Rule['schedule']; }; -// schedule, throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented -// | { -// operation: 'set'; -// field: Extract; -// value: Rule['schedule']; -// } +// throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented // | { // operation: 'set'; // field: Extract; @@ -1501,8 +1496,10 @@ export class RulesClient { // update schedules only if schedule operation is present const scheduleOperation = options.operations.find( - (op): op is Extract }> => - op.field === 'schedule' + ( + operation + ): operation is Extract }> => + operation.field === 'schedule' ); if (scheduleOperation?.value) { diff --git a/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts index e878fd3f79e176..fe5f934ce4ab38 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts @@ -899,4 +899,81 @@ describe('bulkEdit()', () => { ); }); }); + + describe('task manager', () => { + test('should call task manager method bulkUpdateSchedules if operation set new schedules', async () => { + unsecuredSavedObjectsClient.bulkUpdate.mockResolvedValue({ + saved_objects: [ + { + id: '1', + type: 'alert', + attributes: { + enabled: true, + tags: ['foo'], + alertTypeId: 'myType', + schedule: { interval: '1m' }, + consumer: 'myApp', + scheduledTaskId: 'task-123', + params: { index: ['test-index-*'] }, + throttle: null, + notifyWhen: null, + actions: [], + }, + references: [], + version: '123', + }, + ], + }); + + await rulesClient.bulkEdit({ + operations: [ + { + field: 'schedule', + operation: 'set', + value: { interval: '10m' }, + }, + ], + }); + + expect(taskManager.bulkUpdateSchedules).toHaveBeenCalledWith(['task-123'], { + interval: '10m', + }); + }); + + test('should not call task manager method bulkUpdateSchedules if operation is not set schedule', async () => { + unsecuredSavedObjectsClient.bulkUpdate.mockResolvedValue({ + saved_objects: [ + { + id: '1', + type: 'alert', + attributes: { + enabled: true, + tags: ['foo'], + alertTypeId: 'myType', + schedule: { interval: '1m' }, + consumer: 'myApp', + params: { index: ['test-index-*'] }, + throttle: null, + notifyWhen: null, + actions: [], + }, + references: [], + version: '123', + }, + ], + }); + + await rulesClient.bulkEdit({ + operations: [ + { + field: 'tags', + operation: 'set', + value: ['test-tag'], + }, + ], + }); + + expect(taskManager.bulkUpdateSchedules).not.toHaveBeenCalled(); + }); + }); }); From 0b96f460d54adee56ce313cca7ef75dace08127b Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Tue, 31 May 2022 12:09:42 +0100 Subject: [PATCH 08/19] tests, more tests --- .../spaces_only/tests/alerting/bulk_edit.ts | 137 ++++++++++++++---- 1 file changed, 112 insertions(+), 25 deletions(-) diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts index 3150925e2e49eb..aa1f6bfff25886 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts @@ -16,7 +16,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const supertest = getService('supertest'); // FLAKY: https://github.com/elastic/kibana/issues/132195 - describe.skip('bulkEdit', () => { + describe('bulkEdit', () => { const objectRemover = new ObjectRemover(supertest); after(() => objectRemover.removeAll()); @@ -25,7 +25,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: ['default'] })); + .send(getTestRuleData({ enabled: false, tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -71,7 +71,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: [`multiple-rules-edit`] })) + .send(getTestRuleData({ enabled: false, tags: [`multiple-rules-edit`] })) .expect(200) ) ) @@ -119,11 +119,11 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { }); }); - it(`shouldn't bulk edit rule from another space`, async () => { + it('should bulk edit rule with schedule operation', async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: ['default'] })); + .send(getTestRuleData({ enabled: false, schedule: { interval: '10m' } })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -131,27 +131,42 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ids: [createdRule.id], operations: [ { - operation: 'add', - field: 'tags', - value: ['tag-1'], + operation: 'set', + field: 'schedule', + value: { interval: '1h' }, }, ], }; - await supertest - .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload) - .expect(200, { rules: [], errors: [], total: 0 }); + .send(payload); + + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].schedule).to.eql({ interval: '1h' }); + + const { body: updatedRule } = await supertest + .get(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rule/${createdRule.id}`) + .set('kbn-xsrf', 'foo'); + + expect(updatedRule.schedule).to.eql({ interval: '1h' }); + + // Ensure AAD isn't broken + await checkAAD({ + supertest, + spaceId: Spaces.space1.id, + type: 'alert', + id: createdRule.id, + }); }); - it('should return mapped params after bulk edit', async () => { + it(`shouldn't bulk edit rule from another space`, async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send( - getTestRuleData({ tags: ['default'], params: { risk_score: 40, severity: 'medium' } }) - ); + .send(getTestRuleData({ enabled: false, tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -166,17 +181,89 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ], }; - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + await supertest + .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload); + .send(payload) + .expect(200, { rules: [], errors: [], total: 0 }); + }); - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ - risk_score: 40, - severity: '40-medium', + // for test purpose only, will be removed + for (let i = 0; i < 100; i++) { + it(`should return mapped params after bulk edit #${i}`, async () => { + const { body: createdRule } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + enabled: false, + tags: ['default'], + params: { risk_score: 40, severity: 'medium' }, + }) + ); + + objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); + + const payload = { + ids: [createdRule.id], + operations: [ + { + operation: 'add', + field: 'tags', + value: ['tag-1'], + }, + ], + }; + + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + .set('kbn-xsrf', 'foo') + .send(payload); + + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ + risk_score: 40, + severity: '40-medium', + }); }); - }); + + it(`should return mapped params after bulk edit ENABLED #${i}`, async () => { + const { body: createdRule } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + tags: ['default'], + params: { risk_score: 40, severity: 'medium' }, + }) + ); + + objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); + + const payload = { + ids: [createdRule.id], + operations: [ + { + operation: 'add', + field: 'tags', + value: ['tag-1'], + }, + ], + }; + + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + .set('kbn-xsrf', 'foo') + .send(payload); + + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ + risk_score: 40, + severity: '40-medium', + }); + }); + } }); } From 9f1ccf71bf636e85a8e3b18004e88bad8f21e13b Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Tue, 31 May 2022 17:05:41 +0100 Subject: [PATCH 09/19] README, docs --- x-pack/plugins/task_manager/README.md | 30 +++++++++++++++++++ .../task_manager/server/task_scheduling.ts | 10 +++++++ .../spaces_only/tests/alerting/bulk_edit.ts | 2 +- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index 350a53c660bc71..f345b5647219ea 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -328,6 +328,9 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's runNow: (taskId: string) => { // ... }, + bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => { + // ... + }, ensureScheduled: (taskInstance: TaskInstanceWithId, options?: any) => { // ... }, @@ -415,6 +418,33 @@ export class Plugin { } ``` +#### bulkUpdateSchedules +Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status. +When interval updated, new `runAt` will be computed and task will be updated with that value + +```js +export class Plugin { + constructor() { + } + + public setup(core: CoreSetup, plugins: { taskManager }) { + } + + public start(core: CoreStart, plugins: { taskManager }) { + try { + const bulkUpdateResults = await taskManager.bulkUpdateSchedule( + ['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'], + { interval: '10m' }, + ); + // If no error is thrown, the bulkUpdateSchedule has completed successfully. + // But some updates of some tasks can be failed, due to OCC 409 conflict for example + } catch(err: Error) { + // if error is caught, means the whole method requested has failed and tasks weren't updated + } + } +} +``` + #### more options More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 6f67d47124cd5d..e4a6f8c7c12fb9 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -59,8 +59,18 @@ export interface TaskSchedulingOpts { taskManagerId: string; } +/** + * return type of TaskScheduling.bulkUpdateSchedules method + */ export interface BulkUpdateSchedulesResult { + /** + * list of successfully updated tasks + */ tasks: ConcreteTaskInstance[]; + + /** + * list of failed tasks and error caused failure + */ errors: Array<{ task: ConcreteTaskInstance; error: Error }>; } export interface RunNowResult { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts index aa1f6bfff25886..91fb33362e47d5 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts @@ -189,7 +189,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { }); // for test purpose only, will be removed - for (let i = 0; i < 100; i++) { + for (let i = 0; i < 200; i++) { it(`should return mapped params after bulk edit #${i}`, async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) From edc78900895cb9618b9638b6b8ae69c12b722da3 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 1 Jun 2022 09:05:06 +0100 Subject: [PATCH 10/19] skip again --- .../spaces_only/tests/alerting/bulk_edit.ts | 137 ++++-------------- 1 file changed, 25 insertions(+), 112 deletions(-) diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts index 91fb33362e47d5..3150925e2e49eb 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts @@ -16,7 +16,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const supertest = getService('supertest'); // FLAKY: https://github.com/elastic/kibana/issues/132195 - describe('bulkEdit', () => { + describe.skip('bulkEdit', () => { const objectRemover = new ObjectRemover(supertest); after(() => objectRemover.removeAll()); @@ -25,7 +25,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, tags: ['default'] })); + .send(getTestRuleData({ tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -71,7 +71,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, tags: [`multiple-rules-edit`] })) + .send(getTestRuleData({ tags: [`multiple-rules-edit`] })) .expect(200) ) ) @@ -119,11 +119,11 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { }); }); - it('should bulk edit rule with schedule operation', async () => { + it(`shouldn't bulk edit rule from another space`, async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, schedule: { interval: '10m' } })); + .send(getTestRuleData({ tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -131,42 +131,27 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ids: [createdRule.id], operations: [ { - operation: 'set', - field: 'schedule', - value: { interval: '1h' }, + operation: 'add', + field: 'tags', + value: ['tag-1'], }, ], }; - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + await supertest + .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload); - - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].schedule).to.eql({ interval: '1h' }); - - const { body: updatedRule } = await supertest - .get(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rule/${createdRule.id}`) - .set('kbn-xsrf', 'foo'); - - expect(updatedRule.schedule).to.eql({ interval: '1h' }); - - // Ensure AAD isn't broken - await checkAAD({ - supertest, - spaceId: Spaces.space1.id, - type: 'alert', - id: createdRule.id, - }); + .send(payload) + .expect(200, { rules: [], errors: [], total: 0 }); }); - it(`shouldn't bulk edit rule from another space`, async () => { + it('should return mapped params after bulk edit', async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, tags: ['default'] })); + .send( + getTestRuleData({ tags: ['default'], params: { risk_score: 40, severity: 'medium' } }) + ); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -181,89 +166,17 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ], }; - await supertest - .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload) - .expect(200, { rules: [], errors: [], total: 0 }); - }); - - // for test purpose only, will be removed - for (let i = 0; i < 200; i++) { - it(`should return mapped params after bulk edit #${i}`, async () => { - const { body: createdRule } = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) - .set('kbn-xsrf', 'foo') - .send( - getTestRuleData({ - enabled: false, - tags: ['default'], - params: { risk_score: 40, severity: 'medium' }, - }) - ); - - objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); - - const payload = { - ids: [createdRule.id], - operations: [ - { - operation: 'add', - field: 'tags', - value: ['tag-1'], - }, - ], - }; - - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) - .set('kbn-xsrf', 'foo') - .send(payload); - - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ - risk_score: 40, - severity: '40-medium', - }); - }); - - it(`should return mapped params after bulk edit ENABLED #${i}`, async () => { - const { body: createdRule } = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) - .set('kbn-xsrf', 'foo') - .send( - getTestRuleData({ - tags: ['default'], - params: { risk_score: 40, severity: 'medium' }, - }) - ); - - objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); - - const payload = { - ids: [createdRule.id], - operations: [ - { - operation: 'add', - field: 'tags', - value: ['tag-1'], - }, - ], - }; - - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) - .set('kbn-xsrf', 'foo') - .send(payload); + .send(payload); - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ - risk_score: 40, - severity: '40-medium', - }); + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ + risk_score: 40, + severity: '40-medium', }); - } + }); }); } From d20f2fd5093593fdb5831cdfa26e3cf3dbf1bcc7 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 1 Jun 2022 09:07:18 +0100 Subject: [PATCH 11/19] add rest of ops --- .../server/rules_client/rules_client.ts | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index 6ebb675ab633a7..fc4c900e21f149 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -211,7 +211,10 @@ export interface FindOptions extends IndexType { filter?: string; } -export type BulkEditFields = keyof Pick; +export type BulkEditFields = keyof Pick< + Rule, + 'actions' | 'tags' | 'schedule' | 'throttle' | 'notifyWhen' +>; export type BulkEditOperation = | { @@ -228,20 +231,18 @@ export type BulkEditOperation = operation: 'set'; field: Extract; value: Rule['schedule']; + } + | { + operation: 'set'; + field: Extract; + value: Rule['throttle']; + } + | { + operation: 'set'; + field: Extract; + value: Rule['notifyWhen']; }; -// throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented -// | { -// operation: 'set'; -// field: Extract; -// value: Rule['throttle']; -// } -// | { -// operation: 'set'; -// field: Extract; -// value: Rule['notifyWhen']; -// }; - type RuleParamsModifier = (params: Params) => Promise; export interface BulkEditOptionsFilter { From f2bc69607f24f4d4898469ef3b961209301b00b1 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 1 Jun 2022 16:51:00 +0100 Subject: [PATCH 12/19] tests --- .../alerting/server/routes/bulk_edit_rules.ts | 16 ++++++++++++++ .../group1/tests/alerting/bulk_edit.ts | 21 +++++++------------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts index 4925ee12b2bd0d..a56acd15140117 100644 --- a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts +++ b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts @@ -43,6 +43,22 @@ const operationsSchema = schema.arrayOf( field: schema.literal('schedule'), value: scheduleSchema, }), + schema.object({ + operation: schema.literal('set'), + field: schema.literal('throttle'), + value: schema.nullable(schema.string()), + }), + schema.object({ + operation: schema.literal('set'), + field: schema.literal('notifyWhen'), + value: schema.nullable( + schema.oneOf([ + schema.literal('onActionGroupChange'), + schema.literal('onActiveAlert'), + schema.literal('onThrottleInterval'), + ]) + ), + }), ]), { minSize: 1 } ); diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts index 35ea847f70ef45..6ae46cfd8d7bfd 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts @@ -437,7 +437,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; @@ -446,21 +446,14 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { } }); - it('should handle bulk edit of rules when operation field is invalid', async () => { - const { body: createdRule } = await supertest - .post(`${getUrlPrefix(space.id)}/api/alerting/rule`) - .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: ['foo'] })) - .expect(200); - objectRemover.add(space.id, createdRule.id, 'rule', 'alerting'); - + it('should handle bulk edit of rules when operation value type is incorrect', async () => { const payload = { - ids: [createdRule.id], + filter: '', operations: [ { operation: 'add', - field: 'test', - value: ['test'], + field: 'tags', + value: 'not an array', }, ], }; @@ -482,7 +475,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.value]: could not parse array value from json input\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; @@ -520,7 +513,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; From 0b44ef184fed3bb48dc783a889d032c2788f4e26 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 6 Jun 2022 12:00:39 +0100 Subject: [PATCH 13/19] comments updates --- .../plugins/task_manager/server/task_scheduling.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index fbc271c0ed4ee8..79773e68955166 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -213,7 +213,7 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload).toHaveLength(1); expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' }); - // if we update rule with schedule of '5h' and prev interval was 3h, task will be run in 2 hours later + // if tasks updated with schedule interval of '5h' and previous interval was 3h, task will be scheduled to run in 2 hours later expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe( moment.duration(2, 'hours').asMilliseconds() ); @@ -232,14 +232,14 @@ describe('TaskScheduling', () => { const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' }); - // if we update rule with schedule of '2h' and prev interval was 3h, task will be run in 1 hour sooner + // if tasks updated with schedule interval of '2h' and previous interval was 3h, task will be scheduled to run in 1 hour sooner expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe( moment.duration(1, 'hour').asMilliseconds() ); }); test('should set task run to now if time that passed from last run is greater than new interval', async () => { - // task set to be run in one 1hr from now + // task set to be run in one 1hr from now. With interval of '2h', it means last run happened 1 hour ago const runInOneHr = new Date(Date.now() + moment.duration(1, 'hour').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr }); @@ -252,7 +252,7 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '30m' }); - // if time that passed from last rule task is greater than new interval, task should be set to run at now time + // if time that passed from last task run is greater than new interval, task should be set to run at now time expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now()); }); }); From cb5411a71d298fa37754d9e0a6fd1052e2e9d055 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 6 Jun 2022 12:09:11 +0100 Subject: [PATCH 14/19] JSDoc --- x-pack/plugins/task_manager/server/task_scheduling.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index e4a6f8c7c12fb9..1fddc44c9e4b55 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -69,7 +69,7 @@ export interface BulkUpdateSchedulesResult { tasks: ConcreteTaskInstance[]; /** - * list of failed tasks and error caused failure + * list of failed tasks and errors caused failure */ errors: Array<{ task: ConcreteTaskInstance; error: Error }>; } @@ -131,9 +131,9 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. * - * @param taskIds string[] - list of task ids - * @param schedule IntervalSchedule - new schedule - * @returns {Promise} + * @param {string[]} taskIds - list of task ids + * @param {IntervalSchedule} schedule - new schedule + * @returns {Promise} */ public async bulkUpdateSchedules( taskIds: string[], From bdeadf66d451e6c0097b83f1468e51a4747fada0 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 6 Jun 2022 15:01:19 +0100 Subject: [PATCH 15/19] few perf improvements --- .../task_manager/server/task_scheduling.test.ts | 13 +++++++++++++ .../plugins/task_manager/server/task_scheduling.ts | 14 +++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index 79773e68955166..371266fc872fff 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -199,6 +199,19 @@ describe('TaskScheduling', () => { }); }); + test('should not update task if new interval is equal to previous', async () => { + const task = mockTask({ id, schedule: { interval: '3h' } }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '3h' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload).toHaveLength(0); + }); + test('should postpone task run if new interval is greater than previous', async () => { // task set to be run in 2 hrs from now const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds()); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 1fddc44c9e4b55..02a6f55bdc5ef3 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -162,16 +162,24 @@ export class TaskScheduling { const updatedTasks = tasks .flatMap(({ docs }) => docs) - .map((task) => { + .reduce((acc, task) => { + // if task schedule interval is the same, no need to update it + if (task.schedule?.interval === schedule.interval) { + return acc; + } + const oldIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); + // computing new runAt using formula: + // newRunAt = oldRunAt - oldInterval + newInterval const newRunAtInMs = Math.max( Date.now(), task.runAt.getTime() - oldIntervalInMs + parseIntervalAsMillisecond(schedule.interval) ); - return { ...task, schedule, runAt: new Date(newRunAtInMs) }; - }); + acc.push({ ...task, schedule, runAt: new Date(newRunAtInMs) }); + return acc; + }, []); return (await this.store.bulkUpdate(updatedTasks)).reduce( (acc, task) => { From 64e7adba1acefc3501114eed784ec47ae317e8a6 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 8 Jun 2022 17:28:47 +0100 Subject: [PATCH 16/19] CR: replace auditLogger with logger.error --- .../alerting/server/rules_client/rules_client.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index fc4c900e21f149..fe61148b6350d6 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -1513,12 +1513,14 @@ export class RulesClient { try { await this.taskManager.bulkUpdateSchedules(taskIds, scheduleOperation.value); + this.logger.debug( + `Successfully updated schedules for underlying tasks: ${taskIds.join(', ')}` + ); } catch (error) { - this.auditLogger?.log( - ruleAuditEvent({ - action: RuleAuditAction.BULK_EDIT, - error, - }) + this.logger.error( + `Failure to update schedules for underlying tasks: ${taskIds.join( + ', ' + )}. TaskManager bulkUpdateSchedules failed with Error: ${error.message}` ); } } From fc2f1194b777975ebd01f5ffb6f51dade040018f Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 13 Jun 2022 11:15:17 +0100 Subject: [PATCH 17/19] CR: minor suggestions addressed --- x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts | 8 ++------ x-pack/plugins/task_manager/README.md | 9 +++++++-- x-pack/plugins/task_manager/server/task_scheduling.ts | 2 ++ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts index a56acd15140117..1fe3aca7fd9eac 100644 --- a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts +++ b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts @@ -8,14 +8,10 @@ import { schema } from '@kbn/config-schema'; import { IRouter } from '@kbn/core/server'; -import { ILicenseState, RuleTypeDisabledError } from '../lib'; +import { ILicenseState, RuleTypeDisabledError, validateDurationSchema } from '../lib'; import { verifyAccessAndContext, rewriteRule, handleDisabledApiKeysError } from './lib'; import { AlertingRequestHandlerContext, INTERNAL_BASE_ALERTING_API_PATH } from '../types'; -const scheduleSchema = schema.object({ - interval: schema.string(), -}); - const ruleActionSchema = schema.object({ group: schema.string(), id: schema.string(), @@ -41,7 +37,7 @@ const operationsSchema = schema.arrayOf( schema.object({ operation: schema.literal('set'), field: schema.literal('schedule'), - value: scheduleSchema, + value: schema.string({ validate: validateDurationSchema }), }), schema.object({ operation: schema.literal('set'), diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index f345b5647219ea..a3f739d58e4ebc 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -419,9 +419,14 @@ export class Plugin { ``` #### bulkUpdateSchedules -Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status. -When interval updated, new `runAt` will be computed and task will be updated with that value +Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status +(for the tasks which have `running` status, `schedule` and `runAt` will be recalculated after task run finishes). +When interval updated, new `runAt` will be computed and task will be updated with that value, using formula +``` +newRunAt = oldRunAt - oldInterval + newInterval +``` +Example: ```js export class Plugin { constructor() { diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 02a6f55bdc5ef3..31662ee8bce641 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -130,6 +130,8 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. + * Only tasks with `idle` status will be updated, as for the tasks which have `running` status, + * `schedule` and `runAt` will be recalculated after task run finishes * * @param {string[]} taskIds - list of task ids * @param {IntervalSchedule} schedule - new schedule From 0e2d63afbc03cb8ade78363384c4c66a4ba6f22a Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 13 Jun 2022 12:07:15 +0100 Subject: [PATCH 18/19] CR: fix tests --- x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts index 1fe3aca7fd9eac..42946027a555e0 100644 --- a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts +++ b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts @@ -37,7 +37,7 @@ const operationsSchema = schema.arrayOf( schema.object({ operation: schema.literal('set'), field: schema.literal('schedule'), - value: schema.string({ validate: validateDurationSchema }), + value: schema.object({ interval: schema.string({ validate: validateDurationSchema }) }), }), schema.object({ operation: schema.literal('set'), From 52b794af1f0ccd156a6df7113f8a59dc8e70b8a9 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 13 Jun 2022 12:44:04 +0100 Subject: [PATCH 19/19] CR: add functional test for task in running status --- .../task_manager/task_management.ts | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index d155b76fc0101c..a649cec15d6cd9 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -909,7 +909,7 @@ export default function ({ getService }: FtrProviderContext) { }); }); - it('should bulk updates schedules for multiple tasks', async () => { + it('should bulk update schedules for multiple tasks', async () => { const initialTime = Date.now(); const tasks = await Promise.all([ scheduleTask({ @@ -965,6 +965,45 @@ export default function ({ getService }: FtrProviderContext) { }); }); + it('should not bulk update schedules for task in running status', async () => { + // this task should be in running status for 60s until it will be time outed + const longRunningTask = await scheduleTask({ + taskType: 'sampleRecurringTaskWhichHangs', + schedule: { interval: '1h' }, + params: {}, + }); + + runTaskNow({ id: longRunningTask.id }); + + let scheduledRunAt: string; + // ensure task is running and store scheduled runAt + await retry.try(async () => { + const task = await currentTask(longRunningTask.id); + + expect(task.status).to.be('running'); + + scheduledRunAt = task.runAt; + }); + + await retry.try(async () => { + const updates = await bulkUpdateSchedules([longRunningTask.id], { interval: '3h' }); + + // length should be 0, as task in running status won't be updated + expect(updates.tasks.length).to.be(0); + expect(updates.errors.length).to.be(0); + }); + + // ensure task wasn't updated + await retry.try(async () => { + const task = await currentTask(longRunningTask.id); + + // interval shouldn't be changed + expect(task.schedule).to.eql({ interval: '1h' }); + + // scheduledRunAt shouldn't be changed + expect(task.runAt).to.eql(scheduledRunAt); + }); + }); // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 // it('should return the resulting task state when asked to run an ephemeral task now', async () => { // const ephemeralTask = await runEphemeralTaskNow({