Skip to content

Commit

Permalink
Fix workflow parsing for log artifact
Browse files Browse the repository at this point in the history
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: quinnovator <[email protected]>
  • Loading branch information
droctothorpe and quinnovator committed Aug 15, 2024
1 parent 03426c5 commit e7e57b9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 31 deletions.
79 changes: 49 additions & 30 deletions frontend/server/workflow-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,27 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import path from 'path';
import { PassThrough, Stream } from 'stream';
import { ClientOptions as MinioClientOptions } from 'minio';
import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper';
import { createMinioClient, MinioRequestConfig, getObjectStream } from './minio-helper';

export interface PartialArgoWorkflow {
status: {
artifactRepositoryRef?: ArtifactRepositoryRef;
nodes?: ArgoWorkflowStatusNode;
};
}

export interface ArtifactRepositoryRef {
artifactRepository?: ArtifactRepository;
}

export interface ArtifactRepository {
archiveLogs?: boolean;
s3?: S3Artifact;
}

export interface ArgoWorkflowStatusNode {
[key: string]: ArgoWorkflowStatusNodeInfo;
}
Expand All @@ -34,9 +43,12 @@ export interface ArgoWorkflowStatusNodeInfo {
}

export interface ArtifactRecord {
archiveLogs?: boolean;
name: string;
s3?: S3Artifact;
name?: string;
s3: S3Key;
}

export interface S3Key {
key: string;
}

export interface S3Artifact {
Expand Down Expand Up @@ -80,6 +92,7 @@ export function composePodLogsStreamHandler<T = Stream>(
/**
* Returns a stream containing the pod logs using kubernetes api.
* @param podName name of the pod.
* @param createdAt YYYY-MM-DD run was created. Not used.
* @param namespace namespace of the pod (uses the same namespace as the server if not provided).
* @param containerName container's name of the pod, the default value is 'main'.
*/
Expand All @@ -91,14 +104,17 @@ export async function getPodLogsStreamFromK8s(
) {
const stream = new PassThrough();
stream.end(await getPodLogs(podName, namespace, containerName));
console.log(`Getting logs for pod:${podName} in namespace ${namespace}.`);
console.log(
`Getting logs for pod, ${podName}, in namespace, ${namespace}, by calling the Kubernetes API.`,
);
return stream;
}

/**
* Returns a stream containing the pod logs using the information provided in the
* workflow status (uses k8s api to retrieve the workflow and secrets).
* @param podName name of the pod.
* @param createdAt YYYY-MM-DD run was created. Not used.
* @param namespace namespace of the pod (uses the same namespace as the server if not provided).
*/
export const getPodLogsStreamFromWorkflow = toGetPodLogsStream(
Expand All @@ -121,7 +137,7 @@ export function toGetPodLogsStream(
) {
return async (podName: string, createdAt: string, namespace?: string) => {
const request = await getMinioRequestConfig(podName, createdAt, namespace);
console.log(`Getting logs for pod:${podName} from ${request.bucket}/${request.key}.`);
console.log(`Getting logs for pod, ${podName}, from ${request.bucket}/${request.key}.`);
return await getObjectStream(request);
};
}
Expand Down Expand Up @@ -193,33 +209,42 @@ export async function getPodLogsMinioRequestConfigfromWorkflow(
podName: string,
): Promise<MinioRequestConfig> {
let workflow: PartialArgoWorkflow;
// We should probably parameterize this replace statement. It's brittle to
// changes in implementation. But brittle is better than completely broken.
let workflowName = podName.replace(/-system-container-impl-.*/, '');
try {
workflow = await getArgoWorkflow(workflowNameFromPodName(podName));
workflow = await getArgoWorkflow(workflowName);
} catch (err) {
throw new Error(`Unable to retrieve workflow status: ${err}.`);
}

// archiveLogs can be set globally for the workflow as a whole and / or for
// each individual task. The compiler sets it globally so we look for it in
// the global field, which is documented here:
// https://argo-workflows.readthedocs.io/en/release-3.4/fields/#workflow
if (!workflow.status.artifactRepositoryRef?.artifactRepository?.archiveLogs) {
throw new Error('Unable to retrieve logs from artifact store; archiveLogs is disabled.');
}

let artifacts: ArtifactRecord[] | undefined;
// check if required fields are available
if (workflow.status && workflow.status.nodes) {
const node = workflow.status.nodes[podName];
if (node && node.outputs && node.outputs.artifacts) {
artifacts = node.outputs.artifacts;
}
const nodeName = podName.replace('-system-container-impl', '');
const node = workflow.status.nodes[nodeName];
artifacts = node?.outputs?.artifacts || undefined;
}
if (!artifacts) {
throw new Error('Unable to find pod info in workflow status to retrieve logs.');
throw new Error('Unable to find corresponding log artifact in node.');
}

const archiveLogs: ArtifactRecord[] = artifacts.filter((artifact: any) => artifact.archiveLogs);

if (archiveLogs.length === 0) {
throw new Error('Unable to find pod log archive information from workflow status.');
const logKey =
artifacts.find((artifact: ArtifactRecord) => artifact.name === 'main-logs')?.s3.key || false;
if (!logKey) {
throw new Error('No artifact named "main-logs" for node.');
}

const s3Artifact = archiveLogs[0].s3;
const s3Artifact = workflow.status.artifactRepositoryRef.artifactRepository.s3 || false;
if (!s3Artifact) {
throw new Error('Unable to find s3 artifact info from workflow status.');
throw new Error('Unable to find artifact repository information from workflow status.');
}

const { host, port } = urlSplit(s3Artifact.endpoint, s3Artifact.insecure);
Expand All @@ -228,6 +253,10 @@ export async function getPodLogsMinioRequestConfigfromWorkflow(
const client = await createMinioClient(
{
accessKey,
// TODO: endPoint needs to be set to 'localhost' for local development.
// start-proxy-and-server.sh sets MINIO_HOST=localhost, but it doesn't
// seem to be respected when running the server in development mode.
// Investigate and fix this.
endPoint: host,
port,
secretKey,
Expand All @@ -238,7 +267,7 @@ export async function getPodLogsMinioRequestConfigfromWorkflow(
return {
bucket: s3Artifact.bucket,
client,
key: s3Artifact.key,
key: logKey,
};
}

Expand Down Expand Up @@ -268,13 +297,3 @@ function urlSplit(uri: string, insecure: boolean) {
}
return { host: chunks[0], port: parseInt(chunks[1], 10) };
}

/**
* Infers workflow name from pod name.
* @param podName name of the pod.
*/
function workflowNameFromPodName(podName: string) {
const chunks = podName.split('-');
chunks.pop();
return chunks.join('-');
}
2 changes: 2 additions & 0 deletions frontend/src/pages/RunDetails.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,7 @@ describe('RunDetails', () => {
'test-run-id',
'workflow1-template1-node1',
'ns',
'',
);
expect(tree).toMatchSnapshot();
});
Expand Down Expand Up @@ -1255,6 +1256,7 @@ describe('RunDetails', () => {
'test-run-id',
'workflow1-template1-node1',
'username',
'',
);
});

Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/RunDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ class RunDetails extends Page<RunDetailsInternalProps, RunDetailsState> {

try {
const nodeName = getNodeNameFromNodeId(this.state.workflow!, selectedNodeDetails.id);
selectedNodeDetails.logs = await Apis.getPodLogs(runId, nodeName, namespace);
selectedNodeDetails.logs = await Apis.getPodLogs(runId, nodeName, namespace, '');
} catch (err) {
let errMsg = await errorToMessage(err);
logsBannerMessage = 'Failed to retrieve pod logs.';
Expand Down

0 comments on commit e7e57b9

Please sign in to comment.