Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(frontend): retrieve archived logs from correct location #11010

Merged
merged 8 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions frontend/server/configs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
ARGO_ARCHIVE_ARTIFACTORY = 'minio',
/** Bucket to retrive logs from */
ARGO_ARCHIVE_BUCKETNAME = 'mlpipeline',
/** Prefix to logs. */
ARGO_ARCHIVE_PREFIX = 'logs',
/** This should match the keyFormat specified in the Argo workflow-controller-configmap.
* It's set here in the manifests:
* https://github.com/kubeflow/pipelines/blob/7b7918ebf8c30e6ceec99283ef20dbc02fdf6a42/manifests/kustomize/third-party/argo/base/workflow-controller-configmap-patch.yaml#L28
*/
ARGO_KEYFORMAT = 'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
droctothorpe marked this conversation as resolved.
Show resolved Hide resolved
/** Should use server API for log streaming? */
STREAM_LOGS_FROM_SERVER_API = 'false',
/** The main container name of a pod where logs are retrieved */
Expand Down Expand Up @@ -127,7 +130,7 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
archiveArtifactory: ARGO_ARCHIVE_ARTIFACTORY,
archiveBucketName: ARGO_ARCHIVE_BUCKETNAME,
archiveLogs: asBool(ARGO_ARCHIVE_LOGS),
archivePrefix: ARGO_ARCHIVE_PREFIX,
keyFormat: ARGO_KEYFORMAT,
},
pod: {
logContainerName: POD_LOG_CONTAINER_NAME,
Expand Down Expand Up @@ -253,7 +256,7 @@ export interface ArgoConfigs {
archiveLogs: boolean;
archiveArtifactory: string;
archiveBucketName: string;
archivePrefix: string;
keyFormat: string;
}
export interface ServerConfigs {
basePath: string;
Expand Down
13 changes: 7 additions & 6 deletions frontend/server/handlers/pod-logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ export function getPodLogsHandler(
},
podLogContainerName: string,
): Handler {
const { archiveLogs, archiveArtifactory, archiveBucketName, archivePrefix = '' } = argoOptions;
const { archiveLogs, archiveArtifactory, archiveBucketName, keyFormat } = argoOptions;

// get pod log from the provided bucket and prefix.
// get pod log from the provided bucket and keyFormat.
const getPodLogsStreamFromArchive = toGetPodLogsStream(
createPodLogsMinioRequestConfig(
archiveArtifactory === 'minio' ? artifactsOptions.minio : artifactsOptions.aws,
archiveBucketName,
archivePrefix,
keyFormat,
),
);

// get the pod log stream (with fallbacks).
const getPodLogsStream = composePodLogsStreamHandler(
(podName: string, namespace?: string) => {
return getPodLogsStreamFromK8s(podName, namespace, podLogContainerName);
(podName: string, createdAt: string, namespace?: string) => {
return getPodLogsStreamFromK8s(podName, createdAt, namespace, podLogContainerName);
},
// if archive logs flag is set, then final attempt will try to retrieve the artifacts
// from the bucket and prefix provided in the config. Otherwise, only attempts
Expand All @@ -69,13 +69,14 @@ export function getPodLogsHandler(
return;
}
const podName = decodeURIComponent(req.query.podname);
const createdAt = decodeURIComponent(req.query.createdat);

// This is optional.
// Note decodeURIComponent(undefined) === 'undefined', so I cannot pass the argument directly.
const podNamespace = decodeURIComponent(req.query.podnamespace || '') || undefined;

try {
const stream = await getPodLogsStream(podName, podNamespace);
const stream = await getPodLogsStream(podName, createdAt, podNamespace);
stream.on('error', err => {
if (
err?.message &&
Expand Down
93 changes: 56 additions & 37 deletions frontend/server/workflow-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,49 @@ describe('workflow-helper', () => {
describe('composePodLogsStreamHandler', () => {
it('returns the stream from the default handler if there is no errors.', async () => {
const defaultStream = new PassThrough();
const defaultHandler = jest.fn((_podName: string, _namespace?: string) =>
const defaultHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.resolve(defaultStream),
);
const stream = await composePodLogsStreamHandler(defaultHandler)('podName', 'namespace');
expect(defaultHandler).toBeCalledWith('podName', 'namespace');
const stream = await composePodLogsStreamHandler(defaultHandler)(
'podName',
'2024-08-13',
'namespace',
);
expect(defaultHandler).toBeCalledWith('podName', '2024-08-13', 'namespace');
expect(stream).toBe(defaultStream);
});

it('returns the stream from the fallback handler if there is any error.', async () => {
const fallbackStream = new PassThrough();
const defaultHandler = jest.fn((_podName: string, _namespace?: string) =>
const defaultHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.reject('unknown error'),
);
const fallbackHandler = jest.fn((_podName: string, _namespace?: string) =>
const fallbackHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.resolve(fallbackStream),
);
const stream = await composePodLogsStreamHandler(defaultHandler, fallbackHandler)(
'podName',
'2024-08-13',
'namespace',
);
expect(defaultHandler).toBeCalledWith('podName', 'namespace');
expect(fallbackHandler).toBeCalledWith('podName', 'namespace');
expect(defaultHandler).toBeCalledWith('podName', '2024-08-13', 'namespace');
expect(fallbackHandler).toBeCalledWith('podName', '2024-08-13', 'namespace');
expect(stream).toBe(fallbackStream);
});

it('throws error if both handler and fallback fails.', async () => {
const defaultHandler = jest.fn((_podName: string, _namespace?: string) =>
const defaultHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.reject('unknown error for default'),
);
const fallbackHandler = jest.fn((_podName: string, _namespace?: string) =>
const fallbackHandler = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.reject('unknown error for fallback'),
);
await expect(
composePodLogsStreamHandler(defaultHandler, fallbackHandler)('podName', 'namespace'),
composePodLogsStreamHandler(defaultHandler, fallbackHandler)(
'podName',
'2024-08-13',
'namespace',
),
).rejects.toEqual('unknown error for fallback');
});
});
Expand All @@ -82,7 +91,7 @@ describe('workflow-helper', () => {
const mockedGetPodLogs: jest.Mock = getPodLogs as any;
mockedGetPodLogs.mockResolvedValueOnce('pod logs');

const stream = await getPodLogsStreamFromK8s('podName', 'namespace');
const stream = await getPodLogsStreamFromK8s('podName', '', 'namespace');
expect(mockedGetPodLogs).toBeCalledWith('podName', 'namespace', 'main');
expect(stream.read().toString()).toBe('pod logs');
});
Expand All @@ -101,24 +110,34 @@ describe('workflow-helper', () => {
client,
key: 'folder/key',
};
const createRequest = jest.fn((_podName: string, _namespace?: string) =>
const createRequest = jest.fn((_podName: string, _createdAt: string, _namespace?: string) =>
Promise.resolve(configs),
);
const stream = await toGetPodLogsStream(createRequest)('podName', 'namespace');
const stream = await toGetPodLogsStream(createRequest)('podName', '2024-08-13', 'namespace');
expect(mockedClientGetObject).toBeCalledWith('bucket', 'folder/key');
});
});

describe('createPodLogsMinioRequestConfig', () => {
it('returns a MinioRequestConfig factory with the provided minioClientOptions, bucket, and prefix.', async () => {
const mockedClient: jest.Mock = MinioClient as any;
const requestFunc = await createPodLogsMinioRequestConfig(minioConfig, 'bucket', 'prefix');
const request = await requestFunc('workflow-name-abc', 'namespace');
const requestFunc = await createPodLogsMinioRequestConfig(
minioConfig,
'bucket',
'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
);
const request = await requestFunc(
'workflow-name-system-container-impl-foo',
'2024-08-13',
'namespace',
);

expect(mockedClient).toBeCalledWith(minioConfig);
expect(request.client).toBeInstanceOf(MinioClient);
expect(request.bucket).toBe('bucket');
expect(request.key).toBe('prefix/workflow-name/workflow-name-abc/main.log');
expect(request.key).toBe(
'artifacts/workflow-name/2024/08/13/workflow-name-system-container-impl-foo/main.log',
);
});
});

Expand All @@ -128,31 +147,28 @@ describe('workflow-helper', () => {
apiVersion: 'argoproj.io/v1alpha1',
kind: 'Workflow',
status: {
artifactRepositoryRef: {
artifactRepository: {
archiveLogs: true,
s3: {
accessKeySecret: { key: 'accessKey', name: 'accessKeyName' },
bucket: 'bucket',
endpoint: 'minio-service.kubeflow',
insecure: true,
key:
'prefix/workflow-name/workflow-name-system-container-impl-abc/some-artifact.csv',
secretKeySecret: { key: 'secretKey', name: 'secretKeyName' },
},
},
},
nodes: {
'workflow-name-abc': {
outputs: {
artifacts: [
{
name: 'some-artifact.csv',
s3: {
accessKeySecret: { key: 'accessKey', name: 'accessKeyName' },
bucket: 'bucket',
endpoint: 'minio-service.kubeflow',
insecure: true,
key: 'prefix/workflow-name/workflow-name-abc/some-artifact.csv',
secretKeySecret: { key: 'secretKey', name: 'secretKeyName' },
},
},
{
archiveLogs: true,
name: 'main.log',
name: 'main-logs',
s3: {
accessKeySecret: { key: 'accessKey', name: 'accessKeyName' },
bucket: 'bucket',
endpoint: 'minio-service.kubeflow',
insecure: true,
key: 'prefix/workflow-name/workflow-name-abc/main.log',
secretKeySecret: { key: 'secretKey', name: 'secretKeyName' },
key: 'prefix/workflow-name/workflow-name-system-container-impl-abc/main.log',
},
},
],
Expand All @@ -174,7 +190,10 @@ describe('workflow-helper', () => {
mockedClientGetObject.mockResolvedValueOnce(objStream);
objStream.end('some fake logs.');

const stream = await getPodLogsStreamFromWorkflow('workflow-name-abc');
const stream = await getPodLogsStreamFromWorkflow(
'workflow-name-system-container-impl-abc',
'2024-07-09',
);

expect(mockedGetArgoWorkflow).toBeCalledWith('workflow-name');

Expand All @@ -193,7 +212,7 @@ describe('workflow-helper', () => {
expect(mockedClientGetObject).toBeCalledTimes(1);
expect(mockedClientGetObject).toBeCalledWith(
'bucket',
'prefix/workflow-name/workflow-name-abc/main.log',
'prefix/workflow-name/workflow-name-system-container-impl-abc/main.log',
);
});
});
Expand Down
Loading
Loading