Skip to content

Commit

Permalink
[Fleet] Improving bulk actions for more than 10k agents (#134565)
Browse files Browse the repository at this point in the history
* changed getAllAgentsByKuery to query all agents with pit and search_after

* added internal api to test pit query

* changed reassign to work on batches of 10k

* unenroll in batches

* upgrade in batches

* fixed upgrade

* added tests

* cleanup

* revert changes in getAllAgentsByKuery

* renamed perPage to batchSize in bulk actions

* fixed test

* try catch around close pit

Co-authored-by: Kibana Machine <[email protected]>
(cherry picked from commit 2732f26)
  • Loading branch information
juliaElastic committed Jun 24, 2022
1 parent 9e8cb3d commit bf5be86
Show file tree
Hide file tree
Showing 15 changed files with 515 additions and 86 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/common/types/models/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export interface Agent extends AgentBase {
access_api_key?: string;
status?: AgentStatus;
packages: string[];
sort?: Array<number | string | null>;
}

export interface AgentSOAttributes extends AgentBase {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/server/routes/agent/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export const postBulkAgentsReassignHandler: RequestHandler<
const results = await AgentService.reassignAgents(
soClient,
esClient,
agentOptions,
{ ...agentOptions, batchSize: request.body.batchSize },
request.body.policy_id
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export const postBulkAgentsUnenrollHandler: RequestHandler<
...agentOptions,
revoke: request.body?.revoke,
force: request.body?.force,
batchSize: request.body?.batchSize,
});
const body = results.items.reduce<PostBulkAgentUnenrollResponse>((acc, so) => {
acc[so.id] = {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/fleet/server/routes/agent/upgrade_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export const postBulkAgentsUpgradeHandler: RequestHandler<
force,
rollout_duration_seconds: upgradeDurationSeconds,
start_time: startTime,
batchSize,
} = request.body;
const kibanaVersion = appContextService.getKibanaVersion();
try {
Expand All @@ -122,6 +123,7 @@ export const postBulkAgentsUpgradeHandler: RequestHandler<
force,
upgradeDurationSeconds,
startTime,
batchSize,
};
const results = await AgentService.sendUpgradeAgentsActions(soClient, esClient, upgradeOptions);
const body = results.items.reduce<PostBulkAgentUpgradeResponse>((acc, so) => {
Expand Down
121 changes: 102 additions & 19 deletions x-pack/plugins/fleet/server/services/agents/crud.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { ElasticsearchClient } from '@kbn/core/server';

import type { Agent } from '../../types';

import { getAgentsByKuery } from './crud';
import { errorsToResults, getAgentsByKuery, processAgentsInBatches } from './crud';

jest.mock('../../../common', () => ({
...jest.requireActual('../../../common'),
Expand All @@ -19,26 +19,28 @@ jest.mock('../../../common', () => ({
describe('Agents CRUD test', () => {
let esClientMock: ElasticsearchClient;
let searchMock: jest.Mock;
describe('getAgentsByKuery', () => {
beforeEach(() => {
searchMock = jest.fn();
esClientMock = {
search: searchMock,
} as unknown as ElasticsearchClient;
});

function getEsResponse(ids: string[], total: number) {
return {
hits: {
total,
hits: ids.map((id: string) => ({
_id: id,
_source: {},
})),
},
};
}
beforeEach(() => {
searchMock = jest.fn();
esClientMock = {
search: searchMock,
openPointInTime: jest.fn().mockResolvedValue({ id: '1' }),
closePointInTime: jest.fn(),
} as unknown as ElasticsearchClient;
});

function getEsResponse(ids: string[], total: number) {
return {
hits: {
total,
hits: ids.map((id: string) => ({
_id: id,
_source: {},
})),
},
};
}
describe('getAgentsByKuery', () => {
it('should return upgradeable on first page', async () => {
searchMock
.mockImplementationOnce(() => Promise.resolve(getEsResponse(['1', '2', '3', '4', '5'], 7)))
Expand Down Expand Up @@ -192,4 +194,85 @@ describe('Agents CRUD test', () => {
});
});
});

describe('processAgentsInBatches', () => {
const mockProcessAgents = (agents: Agent[]) =>
Promise.resolve({ items: agents.map((agent) => ({ id: agent.id, success: true })) });
it('should return results for multiple batches', async () => {
searchMock
.mockImplementationOnce(() => Promise.resolve(getEsResponse(['1', '2'], 3)))
.mockImplementationOnce(() => Promise.resolve(getEsResponse(['3'], 3)));

const response = await processAgentsInBatches(
esClientMock,
{
kuery: 'active:true',
batchSize: 2,
showInactive: false,
},
mockProcessAgents
);
expect(response).toEqual({
items: [
{ id: '1', success: true },
{ id: '2', success: true },
{ id: '3', success: true },
],
});
});

it('should return results for one batch', async () => {
searchMock.mockImplementationOnce(() => Promise.resolve(getEsResponse(['1', '2', '3'], 3)));

const response = await processAgentsInBatches(
esClientMock,
{
kuery: 'active:true',
showInactive: false,
},
mockProcessAgents
);
expect(response).toEqual({
items: [
{ id: '1', success: true },
{ id: '2', success: true },
{ id: '3', success: true },
],
});
});
});

describe('errorsToResults', () => {
it('should transform errors to results', () => {
const results = errorsToResults([{ id: '1' } as Agent, { id: '2' } as Agent], {
'1': new Error('error'),
});
expect(results).toEqual([
{ id: '1', success: false, error: new Error('error') },
{ id: '2', success: true },
]);
});

it('should transform errors to results with skip success', () => {
const results = errorsToResults(
[{ id: '1' } as Agent, { id: '2' } as Agent],
{ '1': new Error('error') },
undefined,
true
);
expect(results).toEqual([{ id: '1', success: false, error: new Error('error') }]);
});

it('should transform errors to results preserve order', () => {
const results = errorsToResults(
[{ id: '1' } as Agent, { id: '2' } as Agent],
{ '1': new Error('error') },
['2', '1']
);
expect(results).toEqual([
{ id: '2', success: true },
{ id: '1', success: false, error: new Error('error') },
]);
});
});
});
186 changes: 186 additions & 0 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import Boom from '@hapi/boom';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { SortResults } from '@elastic/elasticsearch/lib/api/types';
import type { SavedObjectsClientContract, ElasticsearchClient } from '@kbn/core/server';

import type { KueryNode } from '@kbn/es-query';
Expand Down Expand Up @@ -68,6 +69,7 @@ export type GetAgentsOptions =
| {
kuery: string;
showInactive?: boolean;
perPage?: number;
};

export async function getAgents(esClient: ElasticsearchClient, options: GetAgentsOptions) {
Expand All @@ -90,6 +92,113 @@ export async function getAgents(esClient: ElasticsearchClient, options: GetAgent
return agents;
}

export async function getAgentsByKueryPit(
esClient: ElasticsearchClient,
options: ListWithKuery & {
showInactive: boolean;
pitId: string;
searchAfter?: SortResults;
}
): Promise<{
agents: Agent[];
total: number;
page: number;
perPage: number;
}> {
const {
page = 1,
perPage = 20,
sortField = 'enrolled_at',
sortOrder = 'desc',
kuery,
showInactive = false,
showUpgradeable,
searchAfter,
} = options;
const { pitId } = options;
const filters = [];

if (kuery && kuery !== '') {
filters.push(kuery);
}

if (showInactive === false) {
filters.push(ACTIVE_AGENT_CONDITION);
}

const kueryNode = _joinFilters(filters);
const body = kueryNode ? { query: toElasticsearchQuery(kueryNode) } : {};

const queryAgents = async (from: number, size: number) => {
return esClient.search<FleetServerAgent, {}>({
from,
size,
track_total_hits: true,
rest_total_hits_as_int: true,
body: {
...body,
sort: [{ [sortField]: { order: sortOrder } }],
},
pit: {
id: pitId,
keep_alive: '1m',
},
...(searchAfter ? { search_after: searchAfter, from: 0 } : {}),
});
};

const res = await queryAgents((page - 1) * perPage, perPage);

let agents = res.hits.hits.map(searchHitToAgent);
let total = res.hits.total as number;

// filtering for a range on the version string will not work,
// nor does filtering on a flattened field (local_metadata), so filter here
if (showUpgradeable) {
// query all agents, then filter upgradeable, and return the requested page and correct total
// if there are more than SO_SEARCH_LIMIT agents, the logic falls back to same as before
if (total < SO_SEARCH_LIMIT) {
const response = await queryAgents(0, SO_SEARCH_LIMIT);
agents = response.hits.hits
.map(searchHitToAgent)
.filter((agent) => isAgentUpgradeable(agent, appContextService.getKibanaVersion()));
total = agents.length;
const start = (page - 1) * perPage;
agents = agents.slice(start, start + perPage);
} else {
agents = agents.filter((agent) =>
isAgentUpgradeable(agent, appContextService.getKibanaVersion())
);
}
}

return {
agents,
total,
page,
perPage,
};
}

export async function openAgentsPointInTime(esClient: ElasticsearchClient): Promise<string> {
const pitKeepAlive = '10m';
const pitRes = await esClient.openPointInTime({
index: AGENTS_INDEX,
keep_alive: pitKeepAlive,
});
return pitRes.id;
}

export async function closeAgentsPointInTime(esClient: ElasticsearchClient, pitId: string) {
try {
await esClient.closePointInTime({ id: pitId });
} catch (error) {
appContextService
.getLogger()
.warn(`Error closing point in time with id: ${pitId}. Error: ${error.message}`);
}
}

export async function getAgentsByKuery(
esClient: ElasticsearchClient,
options: ListWithKuery & {
Expand Down Expand Up @@ -168,6 +277,83 @@ export async function getAgentsByKuery(
};
}

export async function processAgentsInBatches(
esClient: ElasticsearchClient,
options: Omit<ListWithKuery, 'page' | 'perPage'> & {
showInactive: boolean;
batchSize?: number;
},
processAgents: (
agents: Agent[],
includeSuccess: boolean
) => Promise<{ items: BulkActionResult[] }>
): Promise<{ items: BulkActionResult[] }> {
const pitId = await openAgentsPointInTime(esClient);

const perPage = options.batchSize ?? SO_SEARCH_LIMIT;

const res = await getAgentsByKueryPit(esClient, {
...options,
page: 1,
perPage,
pitId,
});

let currentAgents = res.agents;
// include successful agents if total agents does not exceed 10k
const skipSuccess = res.total > SO_SEARCH_LIMIT;

let results = await processAgents(currentAgents, skipSuccess);
let allAgentsProcessed = currentAgents.length;

while (allAgentsProcessed < res.total) {
const lastAgent = currentAgents[currentAgents.length - 1];
const nextPage = await getAgentsByKueryPit(esClient, {
...options,
page: 1,
perPage,
pitId,
searchAfter: lastAgent.sort!,
});
currentAgents = nextPage.agents;
const currentResults = await processAgents(currentAgents, skipSuccess);
results = { items: results.items.concat(currentResults.items) };
allAgentsProcessed += currentAgents.length;
}

await closeAgentsPointInTime(esClient, pitId);

return results;
}

export function errorsToResults(
agents: Agent[],
errors: Record<Agent['id'], Error>,
agentIds?: string[],
skipSuccess?: boolean
): BulkActionResult[] {
if (!skipSuccess) {
const givenOrder = agentIds ? agentIds : agents.map((agent) => agent.id);
return givenOrder.map((agentId) => {
const hasError = agentId in errors;
const result: BulkActionResult = {
id: agentId,
success: !hasError,
};
if (hasError) {
result.error = errors[agentId];
}
return result;
});
} else {
return Object.entries(errors).map(([agentId, error]) => ({
id: agentId,
success: false,
error,
}));
}
}

export async function getAllAgentsByKuery(
esClient: ElasticsearchClient,
options: Omit<ListWithKuery, 'page' | 'perPage'> & {
Expand Down
Loading

0 comments on commit bf5be86

Please sign in to comment.