From 08363abfad1af9c37fc2b5cd852223b50ab58681 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Wed, 5 Feb 2025 16:20:38 -0500 Subject: [PATCH] Fixes Failing test: Jest Integration Tests.x-pack/platform/plugins/shared/task_manager/server/integration_tests - capacity based claiming should claim tasks to full capacity (#201681) Resolves https://github.com/elastic/kibana/issues/205949, https://github.com/elastic/kibana/issues/191117 ## Summary Trying to fix flaky integration test by performing a bulk create for the test tasks instead of creating one by one. After making this change, was able to run the integration test ~100 times without failure. --------- Co-authored-by: Elastic Machine (cherry picked from commit 7f28ae63e36d73ec471df7109909b1249f7edafd) # Conflicts: # x-pack/platform/plugins/shared/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts --- .../server/integration_tests/lib/index.ts | 2 +- .../integration_tests/lib/inject_task.ts | 24 +++++++++++++++++++ ...sk_manager_capacity_based_claiming.test.ts | 21 +++++++++------- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/index.ts b/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/index.ts index ab7f23c98e06f..deac3430efa94 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/index.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/index.ts @@ -5,6 +5,6 @@ * 2.0. */ -export { injectTask } from './inject_task'; +export { injectTask, injectTaskBulk } from './inject_task'; export { setupTestServers } from './setup_test_servers'; export { retry } from './retry'; diff --git a/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/inject_task.ts b/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/inject_task.ts index 8427d5c51e323..bef8e2d461663 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/inject_task.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/integration_tests/lib/inject_task.ts @@ -33,3 +33,27 @@ export async function injectTask( }, }); } + +export async function injectTaskBulk(esClient: ElasticsearchClient, tasks: ConcreteTaskInstance[]) { + const bulkRequest = []; + for (const task of tasks) { + bulkRequest.push({ create: { _id: `task:${task.id}` } }); + bulkRequest.push({ + references: [], + type: 'task', + updated_at: new Date().toISOString(), + task: { + ...task, + state: JSON.stringify(task.state), + params: JSON.stringify(task.params), + runAt: task.runAt.toISOString(), + scheduledAt: task.scheduledAt.toISOString(), + partition: murmurhash.v3(task.id) % MAX_PARTITIONS, + }, + }); + } + await esClient.bulk({ + index: '.kibana_task_manager', + body: bulkRequest, + }); +} diff --git a/x-pack/platform/plugins/shared/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts b/x-pack/platform/plugins/shared/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts index be83234e488e2..2c8e6fc43122f 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts @@ -12,7 +12,7 @@ import { times } from 'lodash'; import { TaskCost, TaskStatus } from '../task'; import type { TaskClaimingOpts } from '../queries/task_claiming'; import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin'; -import { injectTask, setupTestServers, retry } from './lib'; +import { injectTaskBulk, setupTestServers, retry } from './lib'; import { CreateMonitoringStatsOpts } from '../monitoring'; import { filter, map } from 'rxjs'; import { isTaskManagerWorkerUtilizationStatEvent } from '../task_events'; @@ -94,8 +94,7 @@ jest.mock('../queries/task_claiming', () => { const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start'); -// FLAKY: https://github.com/elastic/kibana/issues/205949 -describe.skip('capacity based claiming', () => { +describe('capacity based claiming', () => { const taskIdsToRemove: string[] = []; let esServer: TestElasticsearchUtils; let kibanaServer: TestKibanaUtils; @@ -170,9 +169,10 @@ describe.skip('capacity based claiming', () => { times(10, () => ids.push(uuidV4())); const now = new Date(); - const runAt = new Date(now.valueOf() + 5000); + const runAt = new Date(now.valueOf() + 6000); + const tasks = []; for (const id of ids) { - await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + tasks.push({ id, taskType: '_normalCostType', params: {}, @@ -190,6 +190,8 @@ describe.skip('capacity based claiming', () => { taskIdsToRemove.push(id); } + await injectTaskBulk(kibanaServer.coreStart.elasticsearch.client.asInternalUser, tasks); + await retry(async () => { expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(10); }); @@ -235,8 +237,9 @@ describe.skip('capacity based claiming', () => { const ids: string[] = []; times(6, () => ids.push(uuidV4())); const runAt1 = new Date(now.valueOf() - 5); + const tasks = []; for (const id of ids) { - await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + tasks.push({ id, taskType: '_normalCostType', params: {}, @@ -257,7 +260,7 @@ describe.skip('capacity based claiming', () => { // inject 1 XL cost task that will put us over the max cost capacity of 20 const xlid = uuidV4(); const runAt2 = now; - await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + tasks.push({ id: xlid, taskType: '_xlCostType', params: {}, @@ -277,7 +280,7 @@ describe.skip('capacity based claiming', () => { // inject one more normal cost task const runAt3 = new Date(now.valueOf() + 5); const lastid = uuidV4(); - await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + tasks.push({ id: lastid, taskType: '_normalCostType', params: {}, @@ -294,6 +297,8 @@ describe.skip('capacity based claiming', () => { }); taskIdsToRemove.push(lastid); + await injectTaskBulk(kibanaServer.coreStart.elasticsearch.client.asInternalUser, tasks); + // retry until all tasks have been run await retry(async () => { expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(7);