Skip to content

Commit

Permalink
Fixes Failing test: Jest Integration Tests.x-pack/platform/plugins/sh…
Browse files Browse the repository at this point in the history
…ared/task_manager/server/integration_tests - capacity based claiming should claim tasks to full capacity (elastic#201681)

Resolves elastic#205949,
elastic#191117

## Summary

Trying to fix flaky integration test by performing a bulk create for the
test tasks instead of creating one by one. After making this change, was
able to run the integration test ~100 times without failure.

---------

Co-authored-by: Elastic Machine <[email protected]>
(cherry picked from commit 7f28ae6)

# Conflicts:
#	x-pack/platform/plugins/shared/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts
  • Loading branch information
ymao1 committed Feb 5, 2025
1 parent fb535b2 commit 08363ab
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
* 2.0.
*/

export { injectTask } from './inject_task';
export { injectTask, injectTaskBulk } from './inject_task';
export { setupTestServers } from './setup_test_servers';
export { retry } from './retry';
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ export async function injectTask(
},
});
}

export async function injectTaskBulk(esClient: ElasticsearchClient, tasks: ConcreteTaskInstance[]) {
const bulkRequest = [];
for (const task of tasks) {
bulkRequest.push({ create: { _id: `task:${task.id}` } });
bulkRequest.push({
references: [],
type: 'task',
updated_at: new Date().toISOString(),
task: {
...task,
state: JSON.stringify(task.state),
params: JSON.stringify(task.params),
runAt: task.runAt.toISOString(),
scheduledAt: task.scheduledAt.toISOString(),
partition: murmurhash.v3(task.id) % MAX_PARTITIONS,
},
});
}
await esClient.bulk({
index: '.kibana_task_manager',
body: bulkRequest,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { times } from 'lodash';
import { TaskCost, TaskStatus } from '../task';
import type { TaskClaimingOpts } from '../queries/task_claiming';
import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin';
import { injectTask, setupTestServers, retry } from './lib';
import { injectTaskBulk, setupTestServers, retry } from './lib';
import { CreateMonitoringStatsOpts } from '../monitoring';
import { filter, map } from 'rxjs';
import { isTaskManagerWorkerUtilizationStatEvent } from '../task_events';
Expand Down Expand Up @@ -94,8 +94,7 @@ jest.mock('../queries/task_claiming', () => {

const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');

// FLAKY: https://github.com/elastic/kibana/issues/205949
describe.skip('capacity based claiming', () => {
describe('capacity based claiming', () => {
const taskIdsToRemove: string[] = [];
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
Expand Down Expand Up @@ -170,9 +169,10 @@ describe.skip('capacity based claiming', () => {
times(10, () => ids.push(uuidV4()));

const now = new Date();
const runAt = new Date(now.valueOf() + 5000);
const runAt = new Date(now.valueOf() + 6000);
const tasks = [];
for (const id of ids) {
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id,
taskType: '_normalCostType',
params: {},
Expand All @@ -190,6 +190,8 @@ describe.skip('capacity based claiming', () => {
taskIdsToRemove.push(id);
}

await injectTaskBulk(kibanaServer.coreStart.elasticsearch.client.asInternalUser, tasks);

await retry(async () => {
expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(10);
});
Expand Down Expand Up @@ -235,8 +237,9 @@ describe.skip('capacity based claiming', () => {
const ids: string[] = [];
times(6, () => ids.push(uuidV4()));
const runAt1 = new Date(now.valueOf() - 5);
const tasks = [];
for (const id of ids) {
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id,
taskType: '_normalCostType',
params: {},
Expand All @@ -257,7 +260,7 @@ describe.skip('capacity based claiming', () => {
// inject 1 XL cost task that will put us over the max cost capacity of 20
const xlid = uuidV4();
const runAt2 = now;
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id: xlid,
taskType: '_xlCostType',
params: {},
Expand All @@ -277,7 +280,7 @@ describe.skip('capacity based claiming', () => {
// inject one more normal cost task
const runAt3 = new Date(now.valueOf() + 5);
const lastid = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id: lastid,
taskType: '_normalCostType',
params: {},
Expand All @@ -294,6 +297,8 @@ describe.skip('capacity based claiming', () => {
});
taskIdsToRemove.push(lastid);

await injectTaskBulk(kibanaServer.coreStart.elasticsearch.client.asInternalUser, tasks);

// retry until all tasks have been run
await retry(async () => {
expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(7);
Expand Down

0 comments on commit 08363ab

Please sign in to comment.