Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streams] Refactoring streams routes #206526

Merged
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simulateProcessorRoute registration has gone lost, might be good to move it to the management folder, wdyt?
Screenshot 2025-01-14 at 17 39 31

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops... I should have moved this back to "Draft". I just added those while you were reviewing it.

Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,20 @@
* 2.0.
*/

import { dashboardRoutes } from './dashboards/route';
import { esqlRoutes } from './esql/route';
import { deleteStreamRoute } from './streams/delete';
import { streamDetailRoute } from './streams/details';
import { disableStreamsRoute } from './streams/disable';
import { editStreamRoute } from './streams/edit';
import { enableStreamsRoute } from './streams/enable';
import { forkStreamsRoute } from './streams/fork';
import { listStreamsRoute } from './streams/list';
import { readStreamRoute } from './streams/read';
import { resyncStreamsRoute } from './streams/resync';
import { sampleStreamRoute } from './streams/sample';
import { schemaFieldsSimulationRoute } from './streams/schema/fields_simulation';
import { unmappedFieldsRoute } from './streams/schema/unmapped_fields';
import { simulateProcessorRoute } from './streams/processing/simulate';
import { streamsStatusRoutes } from './streams/settings';
import { dashboardRoutes } from './dashboards/route';
import { crudRoutes } from './streams/crud/route';
import { enablementRoutes } from './streams/enablement/route';
import { managementRoutes } from './streams/management/route';
import { schemaRoutes } from './streams/schema/route';

export const streamsRouteRepository = {
...enableStreamsRoute,
...resyncStreamsRoute,
...forkStreamsRoute,
...readStreamRoute,
...editStreamRoute,
...deleteStreamRoute,
...listStreamsRoute,
...streamsStatusRoutes,
...esqlRoutes,
...disableStreamsRoute,
...dashboardRoutes,
...sampleStreamRoute,
...streamDetailRoute,
...unmappedFieldsRoute,
...simulateProcessorRoute,
...schemaFieldsSimulationRoute,
...crudRoutes,
...enablementRoutes,
...managementRoutes,
...schemaRoutes,
};

export type StreamsRouteRepository = typeof streamsRouteRepository;
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,135 @@ import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { badRequest, internal, notFound } from '@hapi/boom';
import {
getParentId,
isRootStream,
isWiredStream,
isWiredStreamConfig,
isWiredReadStream,
isRootStream,
streamConfigDefinitionSchema,
StreamDefinition,
WiredStreamConfigDefinition,
WiredStreamDefinition,
ListStreamsResponse,
FieldDefinitionConfig,
ReadStreamDefinition,
getParentId,
WiredReadStreamDefinition,
} from '@kbn/streams-schema';
import { isEqual } from 'lodash';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
} from '../../../lib/streams/errors';
import { createServerRoute } from '../../create_server_route';
import {
syncStream,
readStream,
listStreams,
readAncestors,
checkStreamExists,
validateAncestorFields,
validateDescendantFields,
} from '../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { MalformedChildren } from '../../lib/streams/errors/malformed_children';
import { AssetClient } from '../../lib/streams/assets/asset_client';
import { validateCondition } from '../../lib/streams/helpers/condition_fields';
deleteStreamObjects,
deleteUnmanagedStreamObjects,
} from '../../../lib/streams/stream_crud';
import { MalformedChildren } from '../../../lib/streams/errors/malformed_children';
import { validateCondition } from '../../../lib/streams/helpers/condition_fields';
import { AssetClient } from '../../../lib/streams/assets/asset_client';

export const readStreamRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<ReadStreamDefinition> => {
try {
const { scopedClusterClient, assetClient } = await getScopedClients({ request });
const streamEntity = await readStream({
scopedClusterClient,
id: params.path.id,
});
const dashboards = await assetClient.getAssetIds({
entityId: streamEntity.name,
entityType: 'stream',
assetType: 'dashboard',
});

if (!isWiredReadStream(streamEntity)) {
return {
...streamEntity,
dashboards,
inherited_fields: {},
};
}

const { ancestors } = await readAncestors({
name: streamEntity.name,
scopedClusterClient,
});

const body: WiredReadStreamDefinition = {
...streamEntity,
dashboards,
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.stream.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
// TODO: replace this with a proper type
}, {} as Record<string, FieldDefinitionConfig & { from: string }>),
};

return body;
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}

throw internal(e);
}
},
});

export const listStreamsRoute = createServerRoute({
endpoint: 'GET /api/streams',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({}),
handler: async ({ request, getScopedClients }): Promise<ListStreamsResponse> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
return await listStreams({ scopedClusterClient });
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}

throw internal(e);
}
},
});

export const editStreamRoute = createServerRoute({
endpoint: 'PUT /api/streams/{id}',
Expand Down Expand Up @@ -80,18 +180,10 @@ export const editStreamRoute = createServerRoute({
})) as WiredStreamDefinition;

if (isRootStream(streamDefinition)) {
await validateRootStreamChanges(
scopedClusterClient,
currentStreamDefinition,
streamDefinition
);
await validateRootStreamChanges(currentStreamDefinition, streamDefinition);
}

await validateStreamChildren(
scopedClusterClient,
currentStreamDefinition,
params.body.ingest.routing
);
await validateStreamChildren(currentStreamDefinition, params.body.ingest.routing);

if (isWiredStreamConfig(params.body)) {
await validateAncestorFields(
Expand Down Expand Up @@ -151,7 +243,7 @@ export const editStreamRoute = createServerRoute({
});

if (parentId) {
parentDefinition = await updateParentStream(
parentDefinition = await updateParentStreamAfterEdit(
scopedClusterClient,
assetClient,
parentId,
Expand Down Expand Up @@ -180,7 +272,7 @@ export const editStreamRoute = createServerRoute({
},
});

async function updateParentStream(
async function updateParentStreamAfterEdit(
scopedClusterClient: IScopedClusterClient,
assetClient: AssetClient,
parentId: string,
Expand Down Expand Up @@ -210,7 +302,6 @@ async function updateParentStream(
}

async function validateStreamChildren(
scopedClusterClient: IScopedClusterClient,
currentStreamDefinition: WiredStreamDefinition,
children: WiredStreamConfigDefinition['ingest']['routing']
) {
Expand Down Expand Up @@ -238,7 +329,6 @@ async function validateStreamChildren(
* Changes to routing rules are allowed.
*/
async function validateRootStreamChanges(
scopedClusterClient: IScopedClusterClient,
currentStreamDefinition: WiredStreamDefinition,
nextStreamDefinition: WiredStreamDefinition
) {
Expand All @@ -260,3 +350,127 @@ async function validateRootStreamChanges(
throw new RootStreamImmutabilityException('Root stream processing rules cannot be changed');
}
}

export const deleteStreamRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
}),
handler: async ({
params,
logger,
request,
getScopedClients,
}): Promise<{ acknowledged: true }> => {
try {
const { scopedClusterClient, assetClient } = await getScopedClients({ request });

const parentId = getParentId(params.path.id);
if (parentId) {
// need to update parent first to cut off documents streaming down
await updateParentStreamAfterDelete(
scopedClusterClient,
assetClient,
params.path.id,
parentId,
logger
);
}

await deleteStream(scopedClusterClient, assetClient, params.path.id, logger);

return { acknowledged: true };
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}

if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
) {
throw badRequest(e);
}

throw internal(e);
}
},
});

export async function deleteStream(
scopedClusterClient: IScopedClusterClient,
assetClient: AssetClient,
id: string,
logger: Logger
) {
try {
const definition = await readStream({ scopedClusterClient, id });
if (!isWiredReadStream(definition)) {
await deleteUnmanagedStreamObjects({ scopedClusterClient, id, logger, assetClient });
return;
}

const parentId = getParentId(id);
if (!parentId) {
throw new MalformedStreamId('Cannot delete root stream');
}

// need to update parent first to cut off documents streaming down
await updateParentStreamAfterDelete(scopedClusterClient, assetClient, id, parentId, logger);
for (const child of definition.stream.ingest.routing) {
await deleteStream(scopedClusterClient, assetClient, child.name, logger);
}
await deleteStreamObjects({ scopedClusterClient, id, logger, assetClient });
} catch (e) {
if (e instanceof DefinitionNotFound) {
logger.debug(`Stream definition for ${id} not found.`);
} else {
throw e;
}
}
}

async function updateParentStreamAfterDelete(
scopedClusterClient: IScopedClusterClient,
assetClient: AssetClient,
id: string,
parentId: string,
logger: Logger
) {
const parentDefinition = await readStream({
scopedClusterClient,
id: parentId,
});

parentDefinition.stream.ingest.routing = parentDefinition.stream.ingest.routing.filter(
(child) => child.name !== id
);

await syncStream({
scopedClusterClient,
assetClient,
definition: parentDefinition,
logger,
});
return parentDefinition;
}

export const crudRoutes = {
...readStreamRoute,
...listStreamsRoute,
...editStreamRoute,
...deleteStreamRoute,
};
Loading