Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow auto simulate orchestration changes #6309

Merged
merged 17 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const SimulateCiemssOperation: Operation = {
isOptional: true
}
],
outputs: [{ type: 'datasetId' }],
outputs: [{ type: 'datasetId', label: 'Dataset' }],
isRunnable: true,

initState: () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const SimulateEnsembleCiemssOperation: Operation = {
documentationUrl: DOCUMENTATION_URL,
imageUrl: simulateEnsembleCiemss,
inputs: [{ type: 'modelConfigId', label: 'Model configuration' }],
outputs: [{ type: 'datasetId' }],
outputs: [{ type: 'datasetId', label: 'Dataset' }],
isRunnable: true,
uniqueInputs: true,

Expand Down
107 changes: 66 additions & 41 deletions packages/client/hmi-client/src/components/workflow/tera-workflow.vue
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
@dragging="(event) => updatePosition(node, event)"
@dragend="
isDragging = false;
saveWorkflowHandler();
debounceSaveWorkflowPositions();
"
>
<tera-operator
Expand Down Expand Up @@ -180,7 +180,7 @@

<script setup lang="ts">
import { cloneDeep, isArray, intersection, debounce } from 'lodash';
import { computed, onMounted, onUnmounted, ref, watch } from 'vue';
import { computed, onMounted, onUnmounted, ref, watch, nextTick } from 'vue';
import TeraInfiniteCanvas from '@/components/widgets/tera-infinite-canvas.vue';
import TeraCanvasItem from '@/components/widgets/tera-canvas-item.vue';
import type { Position } from '@/types/common';
Expand Down Expand Up @@ -215,7 +215,7 @@ import { useRouter, useRoute } from 'vue-router';
import { MenuItem } from 'primevue/menuitem';
import * as EventService from '@/services/event';
import { useProjects } from '@/composables/project';
import useAuthStore from '@/stores/auth';
// import useAuthStore from '@/stores/auth';
import { cloneNoteBookSession } from '@/services/notebook-session';
import * as SimulateCiemssOp from '@/components/workflow/ops/simulate-ciemss/mod';
import * as StratifyMiraOp from '@/components/workflow/ops/stratify-mira/mod';
Expand All @@ -239,7 +239,7 @@ import { activeProjectId } from '@/composables/activeProject';

const WORKFLOW_SAVE_INTERVAL = 4000;

const currentUserId = useAuthStore().user?.id;
// const currentUserId = useAuthStore().user?.id;

const registry = new workflowService.WorkflowRegistry();
registry.registerOp(SimulateCiemssOp);
Expand Down Expand Up @@ -306,33 +306,32 @@ async function updateWorkflowName(newName: string) {
wf.value.load(await workflowService.getWorkflow(props.assetId));
}

// eslint-disable-next-line
const _saveWorkflow = async () => {
await workflowService.saveWorkflow(wf.value.dump(), currentProjectId.value ?? undefined);
// wf.value.update(updated);
};
// eslint-disable-next-line
const _updateWorkflow = (event: ClientEvent<any>) => {
if (event.data.id !== wf.value.getId()) {
return;
}

const delayUpdate = isDragging || event.userId === currentUserId;
wf.value.update(event.data as Workflow, delayUpdate);
// const delayUpdate = isDragging || event.userId === currentUserId;
wf.value.update(event.data as Workflow);
};

const nodeStateMap: Map<string, any> = new Map();
const saveWorkflowDebounced = debounce(_saveWorkflow, 400);
const updateWorkflowHandler = debounce(_updateWorkflow, 250);
const saveNodeStateHandler = debounce(async () => {
const updatedWorkflow = await workflowService.updateState(wf.value.getId(), nodeStateMap);
nodeStateMap.clear();
wf.value.update(updatedWorkflow, false);

(updatedWorkflow as Workflow).nodes.forEach((node) => {
if (node.isDeleted === false) {
wf.value.updateNodeState(node.id, node.state);
}
});
}, 250);

const saveWorkflowHandler = () => {
saveWorkflowDebounced();
};
const debounceSaveWorkflowPositions = debounce(() => {
saveWorkflowPositions();
}, 100);

async function appendInput(
node: WorkflowNode<any>,
Expand All @@ -352,7 +351,7 @@ async function appendInput(
};

const updatedWorkflow = await workflowService.appendInput(wf.value.getId(), node.id, inputPort);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}

/**
Expand Down Expand Up @@ -389,7 +388,14 @@ async function appendOutput(
};

const updatedWorkflow = await workflowService.appendOutput(wf.value.getId(), node.id, outputPort, newState);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);

// We want to try to wait here, because we replace default dummy outputs we might
// try to do id-lookup to a non-existing element
nextTick().then(() => {
relinkEdges(null);
debounceSaveWorkflowPositions();
});
}

function updateWorkflowNodeState(node: WorkflowNode<any> | null, state: any) {
Expand All @@ -414,12 +420,12 @@ async function updateWorkflowNodeStatus(node: WorkflowNode<any> | null, status:
const payload: Map<string, OperatorStatus> = new Map([[node.id, status]]);

const updatedWorkflow = await workflowService.updateStatus(wf.value.getId(), payload);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}

async function selectOutput(node: WorkflowNode<any> | null, selectedOutputId: string) {
const updatedWorkflow = await workflowService.selectOutput(wf.value.getId(), node!.id, selectedOutputId);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}

// Route is mutated then watcher is triggered to open or close the drilldown
Expand Down Expand Up @@ -456,14 +462,16 @@ const closeDrilldown = async () => {

const removeNode = async (nodeId: string) => {
const updatedWorkflow = await workflowService.removeNodes(wf.value.getId(), [nodeId]);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};

const duplicateBranch = (nodeId: string) => {
const duplicateBranch = async (nodeId: string) => {
wf.value.branchWorkflow(nodeId);

cloneNoteBookSessions();
saveWorkflowHandler();

const updatedWorkflow = await workflowService.saveWorkflow(wf.value.dump(), currentProjectId.value ?? undefined);
wf.value.update(updatedWorkflow);
};

// We need to clone data-transform sessions, unlike other operators that are
Expand Down Expand Up @@ -498,7 +506,7 @@ const addOperatorToWorkflow: Function =
size: nodeSize
});
const updatedWorkflow = await workflowService.addNode(wf.value.getId(), node);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);

return node;
};
Expand Down Expand Up @@ -533,15 +541,16 @@ async function onMenuSelection(operatorType: string, menuNode: WorkflowNode<any>
target: newNode.id,
targetPortId: inputPorts[0].id,
points: [
{ x: currentPortPosition.x, y: currentPortPosition.y },
{ x: currentPortPosition.x, y: currentPortPosition.y }
{ x: 0, y: 0 },
{ x: 0, y: 0 }
]
};

const updatedWorkflow = await workflowService.addEdge(wf.value.getId(), edgePayload);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);

// Force edges to re-evaluate
await nextTick();
relinkEdges(null);
}
}
Expand Down Expand Up @@ -684,7 +693,7 @@ async function onDrop(event: DragEvent) {
}
const operator = workflowService.newOperator(wf.value.getId(), operation, newNodePosition, { state });
const updatedWorkflow = await workflowService.addNode(wf.value.getId(), operator);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}
}

Expand Down Expand Up @@ -740,7 +749,7 @@ async function createNewEdge(node: WorkflowNode<any>, port: WorkflowPort, direct
}

const updatedWorkflow = await workflowService.addEdge(wf.value.getId(), edgePayload);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
cancelNewEdge();
}
}
Expand All @@ -754,7 +763,7 @@ async function removeEdges(portId: string) {
wf.value.getId(),
edges.map((e) => e.id)
);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}

function onCanvasClick() {
Expand Down Expand Up @@ -802,6 +811,8 @@ function relinkEdges(node: WorkflowNode<any> | null) {
const targetNode = nodeMap.get(edge.target as string);
const targetPortElem = getPortElement(edge.targetPortId as string);

if (!sourcePortElem || !targetPortElem) continue;

edge.points[0].x = sourceNode!.x + sourceNode!.width + sourcePortElem.offsetWidth * 0.5;
edge.points[0].y = sourceNode!.y + sourcePortElem.offsetTop + sourcePortElem.offsetHeight * 0.5;
edge.points[1].x = targetNode!.x + targetPortElem.offsetWidth * 0.5;
Expand Down Expand Up @@ -873,6 +884,24 @@ function updateEdgePositions(node: WorkflowNode<any>, { x, y }) {
});
}

const saveWorkflowPositions = async () => {
const nodes = new Map(wf.value.getNodes().map((n) => [n.id, { x: n.x, y: n.y }]));
const edges = new Map(
wf.value.getEdges().map((e) => {
const start = e.points[0];
const end = e.points[1];
return [
e.id,
[
{ x: start.x, y: start.y },
{ x: end.x, y: end.y }
]
];
})
);
await workflowService.updatePositions(wf.value.getId(), nodes, edges);
};

const updatePosition = (node: WorkflowNode<any>, { x, y }) => {
const teraNode = teraOperatorRefs.value.find((operatorNode) => operatorNode.id === node.id);
if (teraNode.isEditing ?? false) {
Expand All @@ -893,7 +922,7 @@ const addAnnotationToWorkflow = async () => {
type: '',
textSize: 12
});
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};

const updateAnnotationPosition = (annotation: WorkflowAnnotation, event: any) => {
Expand All @@ -903,12 +932,12 @@ const updateAnnotationPosition = (annotation: WorkflowAnnotation, event: any) =>

const updateAnnotation = async (annotation: WorkflowAnnotation) => {
const updatedWorkflow = await workflowService.addOrUpdateAnnotation(wf.value.getId(), annotation);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};

const removeAnnotation = async (annotationId: string) => {
const updatedWorkflow = await workflowService.removeAnnotation(wf.value.getId(), annotationId);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};

function interpolatePointsForCurve(a: Position, b: Position): Position[] {
Expand All @@ -925,10 +954,6 @@ const pathFn = d3
// Get around typescript complaints
const drawPath = (v: any) => pathFn(v) as string;

const unloadCheck = () => {
workflowService.saveWorkflow(wf.value.dump());
};

const handleDrilldown = () => {
const operatorId = route.query?.operator?.toString();
if (operatorId) {
Expand Down Expand Up @@ -979,7 +1004,7 @@ watch(
async (newId, oldId) => {
// Save previous workflow, if applicable
if (newId !== oldId && oldId) {
workflowService.saveWorkflow(wf.value.dump());
saveWorkflowPositions();
workflowService.setLocalStorageTransform(wf.value.getId(), canvasTransform);
}

Expand Down Expand Up @@ -1015,7 +1040,7 @@ onMounted(() => {
}

document.addEventListener('mousemove', mouseUpdate);
window.addEventListener('beforeunload', unloadCheck);

saveTimer = setInterval(async () => {
workflowService.setLocalStorageTransform(wf.value.getId(), canvasTransform);
}, WORKFLOW_SAVE_INTERVAL);
Expand All @@ -1025,7 +1050,7 @@ onMounted(() => {
});

onUnmounted(() => {
workflowService.saveWorkflow(wf.value.dump());
saveWorkflowPositions();
if (saveTimer) {
clearInterval(saveTimer);
}
Expand All @@ -1035,7 +1060,7 @@ onUnmounted(() => {
workflowService.setLocalStorageTransform(wf.value.getId(), canvasTransform);
}
document.removeEventListener('mousemove', mouseUpdate);
window.removeEventListener('beforeunload', unloadCheck);
workflowService.saveWorkflow(wf.value.dump());
});
</script>

Expand Down
Loading