Skip to content

Commit

Permalink
fix(frontend): retrieve archived logs from correct location (#11010)
Browse files Browse the repository at this point in the history
* fix(frontend): retrieve archived logs from correct location

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>

* Add namespace tag handling and validation

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>

* Remove whitespace from keyFormat

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>

* Update frontend unit tests

Signed-off-by: droctothorpe <[email protected]>

* Remove superfluous log statements

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: quinnovator <[email protected]>

* Add link to keyFormat in manifests

Signed-off-by: droctothorpe <[email protected]>

* Fix workflow parsing for log artifact

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: quinnovator <[email protected]>

* Fix unit test

Signed-off-by: droctothorpe <[email protected]>

---------

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>
Co-authored-by: quinnovator <[email protected]>
  • Loading branch information
4 people authored Aug 15, 2024
1 parent 000ef60 commit 2e6e634
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 101 deletions.
11 changes: 7 additions & 4 deletions frontend/server/configs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
ARGO_ARCHIVE_ARTIFACTORY = 'minio',
/** Bucket to retrive logs from */
ARGO_ARCHIVE_BUCKETNAME = 'mlpipeline',
/** Prefix to logs. */
ARGO_ARCHIVE_PREFIX = 'logs',
/** This should match the keyFormat specified in the Argo workflow-controller-configmap.
* It's set here in the manifests:
* https://github.com/kubeflow/pipelines/blob/7b7918ebf8c30e6ceec99283ef20dbc02fdf6a42/manifests/kustomize/third-party/argo/base/workflow-controller-configmap-patch.yaml#L28
*/
ARGO_KEYFORMAT = 'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
/** Should use server API for log streaming? */
STREAM_LOGS_FROM_SERVER_API = 'false',
/** The main container name of a pod where logs are retrieved */
Expand Down Expand Up @@ -127,7 +130,7 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
archiveArtifactory: ARGO_ARCHIVE_ARTIFACTORY,
archiveBucketName: ARGO_ARCHIVE_BUCKETNAME,
archiveLogs: asBool(ARGO_ARCHIVE_LOGS),
archivePrefix: ARGO_ARCHIVE_PREFIX,
keyFormat: ARGO_KEYFORMAT,
},
pod: {
logContainerName: POD_LOG_CONTAINER_NAME,
Expand Down Expand Up @@ -253,7 +256,7 @@ export interface ArgoConfigs {
archiveLogs: boolean;
archiveArtifactory: string;
archiveBucketName: string;
archivePrefix: string;
keyFormat: string;
}
export interface ServerConfigs {
basePath: string;
Expand Down
13 changes: 7 additions & 6 deletions frontend/server/handlers/pod-logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ export function getPodLogsHandler(
},
podLogContainerName: string,
): Handler {
const { archiveLogs, archiveArtifactory, archiveBucketName, archivePrefix = '' } = argoOptions;
const { archiveLogs, archiveArtifactory, archiveBucketName, keyFormat } = argoOptions;

// get pod log from the provided bucket and prefix.
// get pod log from the provided bucket and keyFormat.
const getPodLogsStreamFromArchive = toGetPodLogsStream(
createPodLogsMinioRequestConfig(
archiveArtifactory === 'minio' ? artifactsOptions.minio : artifactsOptions.aws,
archiveBucketName,
archivePrefix,
keyFormat,
),
);

// get the pod log stream (with fallbacks).
const getPodLogsStream = composePodLogsStreamHandler(
(podName: string, namespace?: string) => {
return getPodLogsStreamFromK8s(podName, namespace, podLogContainerName);
(podName: string, createdAt: string, namespace?: string) => {
return getPodLogsStreamFromK8s(podName, createdAt, namespace, podLogContainerName);
},
// if archive logs flag is set, then final attempt will try to retrieve the artifacts
// from the bucket and prefix provided in the config. Otherwise, only attempts
Expand All @@ -69,13 +69,14 @@ export function getPodLogsHandler(
return;
}
const podName = decodeURIComponent(req.query.podname);
const createdAt = decodeURIComponent(req.query.createdat);

// This is optional.
// Note decodeURIComponent(undefined) === 'undefined', so I cannot pass the argument directly.
const podNamespace = decodeURIComponent(req.query.podnamespace || '') || undefined;

try {
const stream = await getPodLogsStream(podName, podNamespace);
const stream = await getPodLogsStream(podName, createdAt, podNamespace);
stream.on('error', err => {
if (
err?.message &&
Expand Down
93 changes: 56 additions & 37 deletions frontend/server/workflow-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,49 @@ describe('workflow-helper', () => {
describe('composePodLogsStreamHandler', () => {
it('returns the stream from the default handler if there is no errors.', async () => {
const defaultStream = new PassThrough();
const defaultHandler = jest.fn((_podName: string, _namespace?: string) =>
const defaultHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.resolve(defaultStream),
);
const stream = await composePodLogsStreamHandler(defaultHandler)('podName', 'namespace');
expect(defaultHandler).toBeCalledWith('podName', 'namespace');
const stream = await composePodLogsStreamHandler(defaultHandler)(
'podName',
'2024-08-13',
'namespace',
);
expect(defaultHandler).toBeCalledWith('podName', '2024-08-13', 'namespace');
expect(stream).toBe(defaultStream);
});

it('returns the stream from the fallback handler if there is any error.', async () => {
const fallbackStream = new PassThrough();
const defaultHandler = jest.fn((_podName: string, _namespace?: string) =>
const defaultHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.reject('unknown error'),
);
const fallbackHandler = jest.fn((_podName: string, _namespace?: string) =>
const fallbackHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.resolve(fallbackStream),
);
const stream = await composePodLogsStreamHandler(defaultHandler, fallbackHandler)(
'podName',
'2024-08-13',
'namespace',
);
expect(defaultHandler).toBeCalledWith('podName', 'namespace');
expect(fallbackHandler).toBeCalledWith('podName', 'namespace');
expect(defaultHandler).toBeCalledWith('podName', '2024-08-13', 'namespace');
expect(fallbackHandler).toBeCalledWith('podName', '2024-08-13', 'namespace');
expect(stream).toBe(fallbackStream);
});

it('throws error if both handler and fallback fails.', async () => {
const defaultHandler = jest.fn((_podName: string, _namespace?: string) =>
const defaultHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.reject('unknown error for default'),
);
const fallbackHandler = jest.fn((_podName: string, _namespace?: string) =>
const fallbackHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.reject('unknown error for fallback'),
);
await expect(
composePodLogsStreamHandler(defaultHandler, fallbackHandler)('podName', 'namespace'),
composePodLogsStreamHandler(defaultHandler, fallbackHandler)(
'podName',
'2024-08-13',
'namespace',
),
).rejects.toEqual('unknown error for fallback');
});
});
Expand All @@ -82,7 +91,7 @@ describe('workflow-helper', () => {
const mockedGetPodLogs: jest.Mock = getPodLogs as any;
mockedGetPodLogs.mockResolvedValueOnce('pod logs');

const stream = await getPodLogsStreamFromK8s('podName', 'namespace');
const stream = await getPodLogsStreamFromK8s('podName', '', 'namespace');
expect(mockedGetPodLogs).toBeCalledWith('podName', 'namespace', 'main');
expect(stream.read().toString()).toBe('pod logs');
});
Expand All @@ -101,24 +110,34 @@ describe('workflow-helper', () => {
client,
key: 'folder/key',
};
const createRequest = jest.fn((_podName: string, _namespace?: string) =>
const createRequest = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.resolve(configs),
);
const stream = await toGetPodLogsStream(createRequest)('podName', 'namespace');
const stream = await toGetPodLogsStream(createRequest)('podName', '2024-08-13', 'namespace');
expect(mockedClientGetObject).toBeCalledWith('bucket', 'folder/key');
});
});

describe('createPodLogsMinioRequestConfig', () => {
it('returns a MinioRequestConfig factory with the provided minioClientOptions, bucket, and prefix.', async () => {
const mockedClient: jest.Mock = MinioClient as any;
const requestFunc = await createPodLogsMinioRequestConfig(minioConfig, 'bucket', 'prefix');
const request = await requestFunc('workflow-name-abc', 'namespace');
const requestFunc = await createPodLogsMinioRequestConfig(
minioConfig,
'bucket',
'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
);
const request = await requestFunc(
'workflow-name-system-container-impl-foo',
'2024-08-13',
'namespace',
);

expect(mockedClient).toBeCalledWith(minioConfig);
expect(request.client).toBeInstanceOf(MinioClient);
expect(request.bucket).toBe('bucket');
expect(request.key).toBe('prefix/workflow-name/workflow-name-abc/main.log');
expect(request.key).toBe(
'artifacts/workflow-name/2024/08/13/workflow-name-system-container-impl-foo/main.log',
);
});
});

Expand All @@ -128,31 +147,28 @@ describe('workflow-helper', () => {
apiVersion: 'argoproj.io/v1alpha1',
kind: 'Workflow',
status: {
artifactRepositoryRef: {
artifactRepository: {
archiveLogs: true,
s3: {
accessKeySecret: { key: 'accessKey', name: 'accessKeyName' },
bucket: 'bucket',
endpoint: 'minio-service.kubeflow',
insecure: true,
key:
'prefix/workflow-name/workflow-name-system-container-impl-abc/some-artifact.csv',
secretKeySecret: { key: 'secretKey', name: 'secretKeyName' },
},
},
},
nodes: {
'workflow-name-abc': {
outputs: {
artifacts: [
{
name: 'some-artifact.csv',
s3: {
accessKeySecret: { key: 'accessKey', name: 'accessKeyName' },
bucket: 'bucket',
endpoint: 'minio-service.kubeflow',
insecure: true,
key: 'prefix/workflow-name/workflow-name-abc/some-artifact.csv',
secretKeySecret: { key: 'secretKey', name: 'secretKeyName' },
},
},
{
archiveLogs: true,
name: 'main.log',
name: 'main-logs',
s3: {
accessKeySecret: { key: 'accessKey', name: 'accessKeyName' },
bucket: 'bucket',
endpoint: 'minio-service.kubeflow',
insecure: true,
key: 'prefix/workflow-name/workflow-name-abc/main.log',
secretKeySecret: { key: 'secretKey', name: 'secretKeyName' },
key: 'prefix/workflow-name/workflow-name-system-container-impl-abc/main.log',
},
},
],
Expand All @@ -174,7 +190,10 @@ describe('workflow-helper', () => {
mockedClientGetObject.mockResolvedValueOnce(objStream);
objStream.end('some fake logs.');

const stream = await getPodLogsStreamFromWorkflow('workflow-name-abc');
const stream = await getPodLogsStreamFromWorkflow(
'workflow-name-system-container-impl-abc',
'2024-07-09',
);

expect(mockedGetArgoWorkflow).toBeCalledWith('workflow-name');

Expand All @@ -193,7 +212,7 @@ describe('workflow-helper', () => {
expect(mockedClientGetObject).toBeCalledTimes(1);
expect(mockedClientGetObject).toBeCalledWith(
'bucket',
'prefix/workflow-name/workflow-name-abc/main.log',
'prefix/workflow-name/workflow-name-system-container-impl-abc/main.log',
);
});
});
Expand Down
Loading

0 comments on commit 2e6e634

Please sign in to comment.