Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-dotnet): add external events in workflow thread #1292

Merged
merged 7 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\LittleHorse.Sdk\LittleHorse.Sdk.csproj" />
</ItemGroup>

</Project>
82 changes: 82 additions & 0 deletions sdk-dotnet/Examples/ExternalEventExample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using ExternalEventExample;
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;
using LittleHorse.Sdk.Workflow.Spec;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

public abstract class Program
{
private static ServiceProvider? _serviceProvider;
private static void SetupApplication()
{
_serviceProvider = new ServiceCollection()
.AddLogging(config =>
{
config.AddConsole();
config.SetMinimumLevel(LogLevel.Debug);
})
.BuildServiceProvider();
}

private static LHConfig GetLHConfig(string[] args, ILoggerFactory loggerFactory)
{
var config = new LHConfig(loggerFactory);

string filePath = Path.Combine(Directory.GetCurrentDirectory(), ".config/littlehorse.config");
if (File.Exists(filePath))
config = new LHConfig(filePath, loggerFactory);

return config;
}

private static List<LHTaskWorker<WaitForExternalEventWorker>> GetTaskWorkers(LHConfig config)
{
var executable = new WaitForExternalEventWorker();
var workers = new List<LHTaskWorker<WaitForExternalEventWorker>>
{
new(executable, "ask-for-name", config),
new(executable, "greet", config)
};

return workers;
}

private static Workflow GetWorkflow()
{
void MyEntryPoint(WorkflowThread wf)
{
WfRunVariable name = wf.DeclareStr("name").Searchable();
wf.Execute("ask-for-name");
name.Assign(wf.WaitForEvent("name-event"));
wf.Execute("greet", name);
}

return new Workflow("example-external-event", MyEntryPoint);
}

static void Main(string[] args)
{
SetupApplication();
if (_serviceProvider != null)
{
var loggerFactory = _serviceProvider.GetRequiredService<ILoggerFactory>();
var config = GetLHConfig(args, loggerFactory);
var taskWorkers = GetTaskWorkers(config);
foreach (var worker in taskWorkers)
{
worker.RegisterTaskDef();
}

var workflow = GetWorkflow();
workflow.RegisterWfSpec(config.GetGrpcClientInstance());

Thread.Sleep(300);

foreach (var worker in taskWorkers)
{
worker.Start();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using LittleHorse.Sdk.Worker;

namespace ExternalEventExample;

public class WaitForExternalEventWorker
{
[LHTaskMethod("ask-for-name")]
public string AskForName()
{
Console.WriteLine("Executing ask-for-name");
return "Hi what's your name?";
}

[LHTaskMethod("greet")]
public string Greet(string name)
{
Console.WriteLine("Executing greet");
return "Hello there, " + name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,98 @@ void EntryPointAction(WorkflowThread wf)
Assert.Equal(expectedNumberOfNodes, wfThreadCompiled.Nodes.Count);
Assert.Equal(expectedResult, actualResult);
}

[Fact]
public void WfThread_WithExternalEvent_ShouldCompileItInTheWorkflowThread()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfExternalEvents = 1;
var numberOfTasks = 1;
var workflowName = "TestWorkflow";
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);
void EntryPointAction(WorkflowThread wf)
{
WfRunVariable name = wf.DeclareStr("name");
name.Assign(wf.WaitForEvent("name-event"));
wf.Execute("greet", name);
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);

var compiledWfThread = workflowThread.Compile();

var expectedSpec = new ThreadSpec();
var entrypoint = new Node
{
Entrypoint = new EntrypointNode(),
OutgoingEdges =
{
new Edge { SinkNodeName = "1-name-event-EXTERNAL_EVENT" }
}
};

var externalEvent = new Node
{
ExternalEvent = new ExternalEventNode
{
ExternalEventDefId = new ExternalEventDefId { Name = "name-event" }
},
OutgoingEdges =
{
new Edge
{
SinkNodeName = "2-greet-TASK",
VariableMutations =
{
new VariableMutation
{
LhsName = "name",
RhsAssignment = new VariableAssignment
{
NodeOutput =
new VariableAssignment.Types.NodeOutputReference
{ NodeName = "1-name-event-EXTERNAL_EVENT" }
}
}
}
}
}
};

var greetTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "greet" }, Variables =
{
new VariableAssignment { VariableName = "name" }
}
},
OutgoingEdges = { new Edge { SinkNodeName = "3-exit-EXIT" } }
};

var exitNode = new Node
{
Exit = new ExitNode()
};

var threadVarDef = new ThreadVarDef
{
VarDef = new VariableDef
{
Name = "name",
Type = VariableType.Str
},
AccessLevel = WfRunVariableAccessLevel.PrivateVar
};
expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint);
expectedSpec.Nodes.Add("1-name-event-EXTERNAL_EVENT", externalEvent);
expectedSpec.Nodes.Add("2-greet-TASK", greetTask);
expectedSpec.Nodes.Add("3-exit-EXIT", exitNode);
expectedSpec.VariableDefs.Add(threadVarDef);

var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfExternalEvents + numberOfTasks;
Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count);
Assert.Equal(expectedSpec, compiledWfThread);
}
}
21 changes: 21 additions & 0 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Workflow
private Queue<Tuple<string, Action<WorkflowThread>>> _threadActions;
private readonly string _parentWfSpecName;
private readonly HashSet<string> _requiredTaskDefNames;
private readonly HashSet<string> _requiredEedNames;
private int _defaultTaskTimeout;
private int _defaultSimpleRetries;
internal ExponentialBackoffRetryPolicy _defaultExponentialBackoff = null!;
Expand All @@ -30,6 +31,7 @@ public Workflow(string name, Action<WorkflowThread> entryPoint)
_spec = new PutWfSpecRequest { Name = name };
_threadActions = new Queue<Tuple<string, Action<WorkflowThread>>>();
_requiredTaskDefNames = new HashSet<string>();
_requiredEedNames = new HashSet<string>();
}

public PutWfSpecRequest Compile()
Expand Down Expand Up @@ -90,6 +92,11 @@ internal void AddTaskDefName(string taskDefName)
_requiredTaskDefNames.Add(taskDefName);
}

internal void AddExternalEventDefName(string eedName)
{
_requiredEedNames.Add(eedName);
}

/// <summary>
/// Returns the default task timeout, or null if it is not set.
/// </summary>
Expand Down Expand Up @@ -140,4 +147,18 @@ public void SetDefaultTaskRetries(int defaultSimpleRetries)
{
return _defaultExponentialBackoff!;
}

/// <summary>
/// Returns the names of all `ExternalEventDef`s used by this workflow. Includes
/// ExternalEventDefs used for Interrupts or for EXTERNAL_EVENT nodes.
///
/// </summary>
/// <returns>
/// A Set of Strings containing the names of all `ExternalEventDef`s used by this workflow.
/// </returns>
public HashSet<string> GetRequiredExternalEventDefNames()
{
_compiledWorkflow ??= CompileWorkflowDetails();
return _requiredEedNames;
}
}
72 changes: 65 additions & 7 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@

namespace LittleHorse.Sdk.Workflow.Spec;

internal static class Constants
{
internal static readonly Dictionary<Node.NodeOneofCase, string> NodeTypes = new Dictionary<Node.NodeOneofCase, string>
{
{ Node.NodeOneofCase.Entrypoint, "ENTRYPOINT" },
{ Node.NodeOneofCase.Exit, "EXIT" },
{ Node.NodeOneofCase.Task, "TASK" },
{ Node.NodeOneofCase.None, "NONE" },
{ Node.NodeOneofCase.ExternalEvent, "EXTERNAL_EVENT" },
{ Node.NodeOneofCase.StartThread, "START_THREAD" },
{ Node.NodeOneofCase.Nop, "NOP" },
{ Node.NodeOneofCase.Sleep, "SLEEP" },
{ Node.NodeOneofCase.UserTask, "USER_TASK" },
{ Node.NodeOneofCase.StartMultipleThreads, "START_MULTIPLE_THREADS" },
{ Node.NodeOneofCase.ThrowEvent, "THROW_EVENT" },
{ Node.NodeOneofCase.WaitForCondition, "WAIT_FOR_CONDITION" }
};
}

public class WorkflowThread
{
private Workflow _parent;
Expand Down Expand Up @@ -119,6 +138,9 @@ private string AddNode(string name, Node.NodeOneofCase type, IMessage subNode)
case Node.NodeOneofCase.Nop:
node.Nop = (NopNode) subNode;
break;
case Node.NodeOneofCase.ExternalEvent:
node.ExternalEvent = (ExternalEventNode) subNode;
break;
case Node.NodeOneofCase.Exit:
node.Exit = (ExitNode) subNode;
break;
Expand All @@ -134,7 +156,7 @@ private string AddNode(string name, Node.NodeOneofCase type, IMessage subNode)

private string GetNodeName(string name, Node.NodeOneofCase type)
{
return $"{_spec.Nodes.Count}-{name}-{type.ToString().ToUpper()}";
return $"{_spec.Nodes.Count}-{name}-{Constants.NodeTypes[type]}";
}

public WfRunVariable AddVariable(string name, Object typeOrDefaultVal)
Expand All @@ -146,6 +168,20 @@ public WfRunVariable AddVariable(string name, Object typeOrDefaultVal)
return wfRunVariable;
}

/// <summary>
/// Adds a TASK node to the ThreadSpec.
///
/// </summary>
/// <param name="taskName">
/// It is the name of the TaskDef to execute.
/// </param>
/// <param name="args">
/// It is the input parameters to pass into the Task Run. If the type of arg is a
/// `WfRunVariable`, then that WfRunVariable is passed in as the argument; otherwise, the
/// library will attempt to cast the provided argument to a LittleHorse VariableValue and
/// pass that literal value in.
/// </param>
/// <returns>A NodeOutput for that TASK node.</returns>
public NodeOutput Execute(string taskName, params object[] args)
{
CheckIfWorkflowThreadIsActive();
Expand All @@ -156,8 +192,8 @@ public NodeOutput Execute(string taskName, params object[] args)

return new NodeOutput(nodeName, this);
}
public VariableAssignment AssignVariable(Object variable)

private VariableAssignment AssignVariable(Object variable)
{
CheckIfWorkflowThreadIsActive();
return AssignVariableHelper(variable);
Expand Down Expand Up @@ -186,6 +222,28 @@ private TaskNode CreateTaskNode(TaskNode taskNode, params object[] args)
return taskNode;
}

/// <summary>
/// Adds an EXTERNAL_EVENT node which blocks until an 'ExternalEvent' of the specified type
/// arrives.
/// </summary>
/// <param name="externalEventDefName">
/// It is the type of ExternalEvent to wait for.
/// </param>
/// <returns>A NodeOutput for this event.</returns>
public NodeOutput WaitForEvent(string externalEventDefName)
{
CheckIfWorkflowThreadIsActive();
var waitNode = new ExternalEventNode
{
ExternalEventDefId = new ExternalEventDefId { Name = externalEventDefName }
};

_parent.AddExternalEventDefName(externalEventDefName);
var nodeName = AddNode(externalEventDefName, Node.NodeOneofCase.ExternalEvent, waitNode);

return new NodeOutput(nodeName, this);
}

/// <summary>
/// Creates a variable of type BOOL in the ThreadSpec.
/// </summary>
Expand Down Expand Up @@ -397,18 +455,18 @@ public WorkflowCondition Condition(object lhs, Comparator comparator, object rhs
/// `WfRunVariable` which determines the right hand side of the expression, or a `NodeOutput`
/// (which allows you to use the output of a Node Run to mutate variables).
/// </param>
public void Mutate(WfRunVariable lhsVar, VariableMutationType type, object rhs)
public void Mutate(WfRunVariable lhs, VariableMutationType type, object rhs)
{
CheckIfWorkflowThreadIsActive();
var mutation = new VariableMutation
{
LhsName = lhsVar.Name,
LhsName = lhs.Name,
Operation = type,
RhsAssignment = AssignVariableHelper(rhs)
};
if (lhsVar.JsonPath != null)
if (lhs.JsonPath != null)
{
mutation.LhsJsonPath = lhsVar.JsonPath;
mutation.LhsJsonPath = lhs.JsonPath;
}

_variableMutations.Enqueue(mutation);
Expand Down
Loading