Skip to content

Commit

Permalink
workflow auto simulate orchestration changes (#6309)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwdchang authored Jan 30, 2025
1 parent 5eb2ebc commit a1f8b3a
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const SimulateCiemssOperation: Operation = {
isOptional: true
}
],
outputs: [{ type: 'datasetId' }],
outputs: [{ type: 'datasetId', label: 'Dataset' }],
isRunnable: true,

initState: () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const SimulateEnsembleCiemssOperation: Operation = {
documentationUrl: DOCUMENTATION_URL,
imageUrl: simulateEnsembleCiemss,
inputs: [{ type: 'modelConfigId', label: 'Model configuration' }],
outputs: [{ type: 'datasetId' }],
outputs: [{ type: 'datasetId', label: 'Dataset' }],
isRunnable: true,
uniqueInputs: true,

Expand Down
107 changes: 66 additions & 41 deletions packages/client/hmi-client/src/components/workflow/tera-workflow.vue
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
@dragging="(event) => updatePosition(node, event)"
@dragend="
isDragging = false;
saveWorkflowHandler();
debounceSaveWorkflowPositions();
"
>
<tera-operator
Expand Down Expand Up @@ -180,7 +180,7 @@

<script setup lang="ts">
import { cloneDeep, isArray, intersection, debounce } from 'lodash';
import { computed, onMounted, onUnmounted, ref, watch } from 'vue';
import { computed, onMounted, onUnmounted, ref, watch, nextTick } from 'vue';
import TeraInfiniteCanvas from '@/components/widgets/tera-infinite-canvas.vue';
import TeraCanvasItem from '@/components/widgets/tera-canvas-item.vue';
import type { Position } from '@/types/common';
Expand Down Expand Up @@ -215,7 +215,7 @@ import { useRouter, useRoute } from 'vue-router';
import { MenuItem } from 'primevue/menuitem';
import * as EventService from '@/services/event';
import { useProjects } from '@/composables/project';
import useAuthStore from '@/stores/auth';
// import useAuthStore from '@/stores/auth';
import { cloneNoteBookSession } from '@/services/notebook-session';
import * as SimulateCiemssOp from '@/components/workflow/ops/simulate-ciemss/mod';
import * as StratifyMiraOp from '@/components/workflow/ops/stratify-mira/mod';
Expand All @@ -239,7 +239,7 @@ import { activeProjectId } from '@/composables/activeProject';
const WORKFLOW_SAVE_INTERVAL = 4000;
const currentUserId = useAuthStore().user?.id;
// const currentUserId = useAuthStore().user?.id;
const registry = new workflowService.WorkflowRegistry();
registry.registerOp(SimulateCiemssOp);
Expand Down Expand Up @@ -306,33 +306,32 @@ async function updateWorkflowName(newName: string) {
wf.value.load(await workflowService.getWorkflow(props.assetId));
}
// eslint-disable-next-line
const _saveWorkflow = async () => {
await workflowService.saveWorkflow(wf.value.dump(), currentProjectId.value ?? undefined);
// wf.value.update(updated);
};
// eslint-disable-next-line
const _updateWorkflow = (event: ClientEvent<any>) => {
if (event.data.id !== wf.value.getId()) {
return;
}
const delayUpdate = isDragging || event.userId === currentUserId;
wf.value.update(event.data as Workflow, delayUpdate);
// const delayUpdate = isDragging || event.userId === currentUserId;
wf.value.update(event.data as Workflow);
};
const nodeStateMap: Map<string, any> = new Map();
const saveWorkflowDebounced = debounce(_saveWorkflow, 400);
const updateWorkflowHandler = debounce(_updateWorkflow, 250);
const saveNodeStateHandler = debounce(async () => {
const updatedWorkflow = await workflowService.updateState(wf.value.getId(), nodeStateMap);
nodeStateMap.clear();
wf.value.update(updatedWorkflow, false);
(updatedWorkflow as Workflow).nodes.forEach((node) => {
if (node.isDeleted === false) {
wf.value.updateNodeState(node.id, node.state);
}
});
}, 250);
const saveWorkflowHandler = () => {
saveWorkflowDebounced();
};
const debounceSaveWorkflowPositions = debounce(() => {
saveWorkflowPositions();
}, 100);
async function appendInput(
node: WorkflowNode<any>,
Expand All @@ -352,7 +351,7 @@ async function appendInput(
};
const updatedWorkflow = await workflowService.appendInput(wf.value.getId(), node.id, inputPort);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}
/**
Expand Down Expand Up @@ -389,7 +388,14 @@ async function appendOutput(
};
const updatedWorkflow = await workflowService.appendOutput(wf.value.getId(), node.id, outputPort, newState);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
// We want to try to wait here, because we replace default dummy outputs we might
// try to do id-lookup to a non-existing element
nextTick().then(() => {
relinkEdges(null);
debounceSaveWorkflowPositions();
});
}
function updateWorkflowNodeState(node: WorkflowNode<any> | null, state: any) {
Expand All @@ -414,12 +420,12 @@ async function updateWorkflowNodeStatus(node: WorkflowNode<any> | null, status:
const payload: Map<string, OperatorStatus> = new Map([[node.id, status]]);
const updatedWorkflow = await workflowService.updateStatus(wf.value.getId(), payload);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}
async function selectOutput(node: WorkflowNode<any> | null, selectedOutputId: string) {
const updatedWorkflow = await workflowService.selectOutput(wf.value.getId(), node!.id, selectedOutputId);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}
// Route is mutated then watcher is triggered to open or close the drilldown
Expand Down Expand Up @@ -456,14 +462,16 @@ const closeDrilldown = async () => {
const removeNode = async (nodeId: string) => {
const updatedWorkflow = await workflowService.removeNodes(wf.value.getId(), [nodeId]);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};
const duplicateBranch = (nodeId: string) => {
const duplicateBranch = async (nodeId: string) => {
wf.value.branchWorkflow(nodeId);
cloneNoteBookSessions();
saveWorkflowHandler();
const updatedWorkflow = await workflowService.saveWorkflow(wf.value.dump(), currentProjectId.value ?? undefined);
wf.value.update(updatedWorkflow);
};
// We need to clone data-transform sessions, unlike other operators that are
Expand Down Expand Up @@ -498,7 +506,7 @@ const addOperatorToWorkflow: Function =
size: nodeSize
});
const updatedWorkflow = await workflowService.addNode(wf.value.getId(), node);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
return node;
};
Expand Down Expand Up @@ -533,15 +541,16 @@ async function onMenuSelection(operatorType: string, menuNode: WorkflowNode<any>
target: newNode.id,
targetPortId: inputPorts[0].id,
points: [
{ x: currentPortPosition.x, y: currentPortPosition.y },
{ x: currentPortPosition.x, y: currentPortPosition.y }
{ x: 0, y: 0 },
{ x: 0, y: 0 }
]
};
const updatedWorkflow = await workflowService.addEdge(wf.value.getId(), edgePayload);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
// Force edges to re-evaluate
await nextTick();
relinkEdges(null);
}
}
Expand Down Expand Up @@ -684,7 +693,7 @@ async function onDrop(event: DragEvent) {
}
const operator = workflowService.newOperator(wf.value.getId(), operation, newNodePosition, { state });
const updatedWorkflow = await workflowService.addNode(wf.value.getId(), operator);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}
}
Expand Down Expand Up @@ -740,7 +749,7 @@ async function createNewEdge(node: WorkflowNode<any>, port: WorkflowPort, direct
}
const updatedWorkflow = await workflowService.addEdge(wf.value.getId(), edgePayload);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
cancelNewEdge();
}
}
Expand All @@ -754,7 +763,7 @@ async function removeEdges(portId: string) {
wf.value.getId(),
edges.map((e) => e.id)
);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
}
function onCanvasClick() {
Expand Down Expand Up @@ -802,6 +811,8 @@ function relinkEdges(node: WorkflowNode<any> | null) {
const targetNode = nodeMap.get(edge.target as string);
const targetPortElem = getPortElement(edge.targetPortId as string);
if (!sourcePortElem || !targetPortElem) continue;
edge.points[0].x = sourceNode!.x + sourceNode!.width + sourcePortElem.offsetWidth * 0.5;
edge.points[0].y = sourceNode!.y + sourcePortElem.offsetTop + sourcePortElem.offsetHeight * 0.5;
edge.points[1].x = targetNode!.x + targetPortElem.offsetWidth * 0.5;
Expand Down Expand Up @@ -873,6 +884,24 @@ function updateEdgePositions(node: WorkflowNode<any>, { x, y }) {
});
}
const saveWorkflowPositions = async () => {
const nodes = new Map(wf.value.getNodes().map((n) => [n.id, { x: n.x, y: n.y }]));
const edges = new Map(
wf.value.getEdges().map((e) => {
const start = e.points[0];
const end = e.points[1];
return [
e.id,
[
{ x: start.x, y: start.y },
{ x: end.x, y: end.y }
]
];
})
);
await workflowService.updatePositions(wf.value.getId(), nodes, edges);
};
const updatePosition = (node: WorkflowNode<any>, { x, y }) => {
const teraNode = teraOperatorRefs.value.find((operatorNode) => operatorNode.id === node.id);
if (teraNode.isEditing ?? false) {
Expand All @@ -893,7 +922,7 @@ const addAnnotationToWorkflow = async () => {
type: '',
textSize: 12
});
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};
const updateAnnotationPosition = (annotation: WorkflowAnnotation, event: any) => {
Expand All @@ -903,12 +932,12 @@ const updateAnnotationPosition = (annotation: WorkflowAnnotation, event: any) =>
const updateAnnotation = async (annotation: WorkflowAnnotation) => {
const updatedWorkflow = await workflowService.addOrUpdateAnnotation(wf.value.getId(), annotation);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};
const removeAnnotation = async (annotationId: string) => {
const updatedWorkflow = await workflowService.removeAnnotation(wf.value.getId(), annotationId);
wf.value.update(updatedWorkflow, false);
wf.value.update(updatedWorkflow);
};
function interpolatePointsForCurve(a: Position, b: Position): Position[] {
Expand All @@ -925,10 +954,6 @@ const pathFn = d3
// Get around typescript complaints
const drawPath = (v: any) => pathFn(v) as string;
const unloadCheck = () => {
workflowService.saveWorkflow(wf.value.dump());
};
const handleDrilldown = () => {
const operatorId = route.query?.operator?.toString();
if (operatorId) {
Expand Down Expand Up @@ -979,7 +1004,7 @@ watch(
async (newId, oldId) => {
// Save previous workflow, if applicable
if (newId !== oldId && oldId) {
workflowService.saveWorkflow(wf.value.dump());
saveWorkflowPositions();
workflowService.setLocalStorageTransform(wf.value.getId(), canvasTransform);
}
Expand Down Expand Up @@ -1015,7 +1040,7 @@ onMounted(() => {
}
document.addEventListener('mousemove', mouseUpdate);
window.addEventListener('beforeunload', unloadCheck);
saveTimer = setInterval(async () => {
workflowService.setLocalStorageTransform(wf.value.getId(), canvasTransform);
}, WORKFLOW_SAVE_INTERVAL);
Expand All @@ -1025,7 +1050,7 @@ onMounted(() => {
});
onUnmounted(() => {
workflowService.saveWorkflow(wf.value.dump());
saveWorkflowPositions();
if (saveTimer) {
clearInterval(saveTimer);
}
Expand All @@ -1035,7 +1060,7 @@ onUnmounted(() => {
workflowService.setLocalStorageTransform(wf.value.getId(), canvasTransform);
}
document.removeEventListener('mousemove', mouseUpdate);
window.removeEventListener('beforeunload', unloadCheck);
workflowService.saveWorkflow(wf.value.dump());
});
</script>

Expand Down
Loading

0 comments on commit a1f8b3a

Please sign in to comment.