Skip to content

Commit

Permalink
add automation
Browse files Browse the repository at this point in the history
  • Loading branch information
swk777 committed May 24, 2024
1 parent 6113f7c commit 63cd131
Show file tree
Hide file tree
Showing 26 changed files with 504 additions and 107 deletions.
9 changes: 7 additions & 2 deletions electron/main/init.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DefaultData } from '@/constants/initialData';
import { JSONFilePreset } from 'lowdb/node';
import { addDocuments, createKnowledgeBase } from './knowledgeBase';
import { chat, newConversation } from './workflow/chat';
import { chat, newConversation, runAutomation } from './workflow/execution';

export function initStorage(ipcMain, workspacePath) {
globalThis.workspacePath = workspacePath;
Expand Down Expand Up @@ -73,7 +73,12 @@ export function initStorage(ipcMain, workspacePath) {
await db.read();
const { sessionId, workflowId, query, workflow } = post;
const conversation = await chat(sessionId, workflowId, query, workflow, db);
return conversation?.globalContext.messages;
return conversation?.conversationContext.messages;
});
ipcMain.handle('run-automation', async (event, post) => {
await db.read();
const { workflowId, inputs, workflow } = post;
return await runAutomation(workflowId, inputs, workflow, db);
});
ipcMain.handle('new-conversation', async (event, post) => {
await db.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,40 @@ import _cloneDeep from 'lodash/cloneDeep';
import { curry } from 'ramda';
import { Edge, Node, ReactFlowJsonObject } from 'reactflow';
import { v4 as uuidv4 } from 'uuid';
import { IConversation } from './../../../src/type/conversation';
import { IKnowledgeBase } from './../../../src/type/knowledgeBase';
import { IConversation, IExecution } from '../../../src/type/conversation';
import { IKnowledgeBase } from '../../../src/type/knowledgeBase';
import { InternalNodeletExecutor } from './internal';
import { getNodeInputObj } from './utils';
interface IContext {
knowledgeBases: IKnowledgeBase[];
integrations: IIntegration[];
settings: ISettings;
}
const getInitialConversation = (sessionId: string, workflowId: string, message: string): IConversation => ({
sessionId,
const getNewExecution = (workflowId: string) => ({
executionId: uuidv4(),
workflowId,
createDate: new Date().toLocaleString(),
updateDate: new Date().toLocaleString(),
nodeContext: {},
globalContext: {},
});
const getNewConversationExecution = (workflowId: string, message: string) => ({
executionId: uuidv4(),
workflowId,
name: 'new conversation',
createDate: new Date().toLocaleString(),
updateDate: new Date().toLocaleString(),
nodeContext: {},
globalContext: { currentMessage: message, messages: [] },
globalContext: { userInput: message },
});
const getInitialConversation = (sessionId: string, workflowId: string, message: string): IConversation => ({
sessionId,
workflowId,
name: 'new conversation',
executions: [getNewConversationExecution(workflowId, message)],
conversationContext: {
messages: [message],
currentMessage: message,
},
});
export const newConversation = async (workflowId: string, message: string, db: any) => {
const {
Expand Down Expand Up @@ -62,15 +79,20 @@ export const chat = async (sessionId: string, workflowId: string, message: strin
conversation = getInitialConversation(sessionId, workflowId, message);
conversations.push(conversation);
} else {
conversation.globalContext.currentMessage = message;
conversation.conversationContext.currentMessage = message;
conversation.conversationContext.messages.push(message);
conversation.executions.push(getNewConversationExecution(message, workflowId));
}
const { nodes = [] } = currentWorkflow?.data || {};
if (!nodes.length) return;
await executeDAG(currentWorkflow.data, nodelets, integrations, conversation, {
const currentExecution = conversation.executions[conversation.executions.length - 1];
//@ts-ignore
await executeDAG(currentWorkflow.data, nodelets, integrations, currentExecution, {
knowledgeBases,
integrations,
settings,
});
conversation.conversationContext.messages.push(currentExecution.globalContext.outputMessage);
db.data.conversations = conversations;
await db.write();
return conversation;
Expand All @@ -87,7 +109,44 @@ function createNodeMap(nodes: Node[]) {
});
return nodeMap;
}

export const runAutomation = async (workflowId: string, inputs: any, workflow: IWorkflow, db: any) => {
await db.read();
const {
workflows,
nodelets,
integrations,
knowledgeBases,
settings,
executions,
}: {
workflows: IWorkflow[];
nodelets: Nodelet[];
knowledgeBases: IKnowledgeBase[];
integrations: IIntegration[];
settings: ISettings;
executions: IExecution[];
} = db.data;
const currentWorkflow = workflow ?? workflows.find((w) => w.id === workflowId);
const { nodes = [] } = currentWorkflow?.data || {};
if (!nodes.length) return;
const currentExecution = getNewExecution(workflowId);
if (inputs) {
currentExecution.nodeContext = inputs;
}
try {
//@ts-ignore
await executeDAG(currentWorkflow?.data, nodelets, integrations, currentExecution, {
knowledgeBases,
integrations,
settings,
});
} catch (e) {
console.log(e);
}
executions.push(currentExecution);
await db.write();
return currentExecution;
};
function linkNodes(nodeMap: Map<string, IDAGNode>, edges: Edge[]) {
edges.forEach((edge) => {
const sourceNode = nodeMap.get(edge.source);
Expand All @@ -107,7 +166,7 @@ async function executeDAG(
dagData: ReactFlowJsonObject,
nodelets: Nodelet[],
integrations: IIntegration[],
conversation: IConversation,
execution: IExecution,
context: IContext,
) {
const nodeMap = createNodeMap(dagData.nodes);
Expand All @@ -123,7 +182,7 @@ async function executeDAG(
while (queue.length > 0) {
const currentNode = queue.shift()!;
try {
await executeNode(currentNode, nodelets, integrations, conversation, context);
await executeNode(currentNode, nodelets, integrations, execution, context);
} catch (e) {
console.log(e);
}
Expand All @@ -138,27 +197,21 @@ async function executeDAG(
}
}

async function executeNode(
node: IDAGNode,
nodelets: Nodelet[],
integrations: IIntegration[],
conversation: IConversation,
context: IContext,
) {
async function executeNode(node: IDAGNode, nodelets: Nodelet[], integrations: IIntegration[], execution: IExecution, context: IContext) {
const nodelet = nodelets.find((nl) => nl.id === node.data.nodeletId)!;
const integration = integrations.find((integration) => integration.id === nodelet.id);
const nodeInputs = getNodeInputObj(node, nodelet, conversation);
const nodeInputs = getNodeInputObj(node, nodelet, execution);
const setNodeContext = curry((nodeId: string, nodeContext: any) => {
conversation.nodeContext[nodeId] = nodeContext;
execution.nodeContext[nodeId] = nodeContext;
})(node.id);
const setGlobalContext = (globalContext: any) => (conversation.globalContext = globalContext);
const setGlobalContext = (globalContext: any) => (execution.globalContext = globalContext);
const executorContext = {
nodeId: node.id,
nodeConfig: _cloneDeep(node.data?.config),
nodeInputs,
nodeContext: _cloneDeep(conversation.nodeContext[node.id] || {}),
nodeContext: _cloneDeep(execution.nodeContext[node.id] || {}),
integrationConfig: integration?.config || {},
globalContext: _cloneDeep(conversation.globalContext),
globalContext: _cloneDeep(execution.globalContext),
setNodeContext,
setGlobalContext,
context,
Expand Down
3 changes: 1 addition & 2 deletions electron/main/workflow/internal/chat-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ export async function executeChatResponse({ nodeInputs, globalContext, setGlobal
setGlobalContext &&
setGlobalContext({
...globalContext,
latestMessage: output,
messages: globalContext.messages.concat([output]),
outputMessage: output,
});
}
4 changes: 4 additions & 0 deletions electron/main/workflow/internal/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import { executeChatResponse } from './chat-response';
import { executeKnowledgeBase } from './knowledge-base';
import { executeOllama } from './ollama';
import { executeOpenAI } from './openAI';
import { saveToFile } from './save-to-file';
import { executeTextInput } from './text-input';
import { executeUserInput } from './user-input';

export const InternalNodeletExecutor = {
[InternalNodelets.UserInput]: { isAsync: false, executor: executeUserInput },
[InternalNodelets.TextInput]: { isAsync: false, executor: executeTextInput },
[InternalNodelets.OpenAI]: { isAsync: true, executor: executeOpenAI },
[InternalNodelets.Ollama]: { isAsync: true, executor: executeOllama },
[InternalNodelets.KnowledgeBase]: { isAsync: true, executor: executeKnowledgeBase },
[InternalNodelets.ChatResponse]: {
isAsync: false,
executor: executeChatResponse,
},
[InternalNodelets.SaveToFile]: { isAsync: true, executor: saveToFile },
};
32 changes: 32 additions & 0 deletions electron/main/workflow/internal/save-to-file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { jsPDF } from 'jspdf';
import fs from 'node:fs';
import { NodeletExecuteContext } from '../type';

enum FileType {
txt = 'txt',
pdf = 'pdf',
md = 'md',
}
export async function saveToFile({ nodeInputs, nodeConfig, setGlobalContext }: NodeletExecuteContext) {
const { output } = nodeInputs as { output: string };
const { fileType, fileName, folderPath } = nodeConfig;
switch (fileType) {
case FileType.txt:
case FileType.md: {
fs.writeFile(`${folderPath}/${fileName}.${fileType}`, output, (err) => {
if (err) {
console.log('An error ocurred creating the file ' + err.message);
} else {
console.log('The file has been succesfully saved');
}
});
break;
}
case FileType.pdf: {
const doc = new jsPDF();
doc.text(output, 10, 10);
doc.save(`${folderPath}/${fileName}.pdf`);
break;
}
}
}
9 changes: 9 additions & 0 deletions electron/main/workflow/internal/text-input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { NodeletExecuteContext } from '../type';

export async function executeTextInput({ nodeId, globalContext, setNodeContext, nodeContext }: NodeletExecuteContext) {
console.log(`executing node ${nodeId}, current message: ${globalContext?.userInput}`);
setNodeContext &&
setNodeContext({
outputs: { output: nodeContext?.text },
});
}
9 changes: 2 additions & 7 deletions electron/main/workflow/internal/user-input.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { NodeletExecuteContext } from '../type';

export async function executeUserInput({ nodeId, globalContext, setNodeContext, setGlobalContext }: NodeletExecuteContext) {
console.log(`executing node ${nodeId}, current message: ${globalContext?.currentMessage}`);
console.log(`executing node ${nodeId}, current message: ${globalContext?.userInput}`);
setNodeContext &&
setNodeContext({
outputs: { query: globalContext?.currentMessage },
});
setGlobalContext &&
setGlobalContext({
...globalContext,
messages: globalContext.messages.concat([globalContext?.currentMessage]),
outputs: { query: globalContext?.userInput },
});
}
8 changes: 4 additions & 4 deletions electron/main/workflow/utils.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { IConversation } from '@/type/conversation';
import { IExecution } from '@/type/conversation';
import { IDAGNode } from '@/type/dag';
import { Nodelet } from '@/type/nodelet';
import { InputsObject } from './type';

export const getNodeInputObj = (node: IDAGNode, nodelet: Nodelet, conversation: IConversation) => {
export const getNodeInputObj = (node: IDAGNode, nodelet: Nodelet, execution: IExecution) => {
const inputsObj: InputsObject = {};
if ((nodelet?.inputs || []).length === 0) return {};

nodelet?.inputs.forEach((input) => {
const { sourceHandle, id = '' } = node.sourceNodes.find((n) => n.targetHandle === input.id) || {}; // todo
const sourceOutput = conversation.nodeContext[id]?.outputs[sourceHandle!];
const sourceOutput = execution.nodeContext[id]?.outputs[sourceHandle!];
if (sourceOutput) {
inputsObj[input.id] = conversation.nodeContext[id]?.outputs[sourceHandle!];
inputsObj[input.id] = execution.nodeContext[id]?.outputs[sourceHandle!];
}
});
return inputsObj;
Expand Down
2 changes: 2 additions & 0 deletions electron/preload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ contextBridge.exposeInMainWorld('ipcRenderer', {
saveGlobal: (setting) => ipcRenderer.invoke('save-global', { setting }),
chat: (sessionId: string, workflowId: string, query: string, workflow: IWorkflow) =>
ipcRenderer.invoke('chat', { sessionId, workflowId, query, workflow }),
runAutomation: (workflowId: string, inputs: any, workflow: IWorkflow) =>
ipcRenderer.invoke('run-automation', { workflowId, inputs, workflow }),
newConversation: (workflowId: string) => ipcRenderer.invoke('new-conversation', { workflowId }),
saveConversation: (sessionId: string, conversation: IConversation) =>
ipcRenderer.invoke('save-conversation', { sessionId, conversation }),
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"@radix-ui/react-tabs": "^1.0.4",
"@radix-ui/react-tooltip": "^1.0.7",
"@tabler/icons-react": "^3.1.0",
"antd": "^5.16.0",
"class-variance-authority": "^0.7.0",
"clsx": "^2.1.0",
"d3-dsv": "^2.0.0",
Expand All @@ -45,6 +44,7 @@
"framer-motion": "^11.1.7",
"hnswlib-node": "^1.4.2",
"immer": "9.0.2",
"jspdf": "^2.5.1",
"langchain": "^0.1.33",
"lodash": "^4.17.21",
"lowdb": "^7.0.1",
Expand Down
1 change: 1 addition & 0 deletions src/constants/initialData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ export const DefaultData = {
workflows: [],
nodelets: Nodelets,
settings: {},
executions: [],
};
Loading

0 comments on commit 63cd131

Please sign in to comment.