Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-dotnet): add error handler to workflow thread #1314

Merged
merged 9 commits into from
Feb 19, 2025
22 changes: 22 additions & 0 deletions sdk-dotnet/Examples/ExceptionsHandlerExample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using ExceptionsHandler;
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;
using LittleHorse.Sdk.Workflow.Spec;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -46,6 +47,24 @@ private static List<LHTaskWorker<MyWorker>> GetTaskWorkers(LHConfig config)
return workers;
}

private static Workflow GetWorkflow()
{
void MyEntryPoint(WorkflowThread wf)
{
NodeOutput node = wf.Execute("fail");
wf.HandleError(
node,
handler =>
{
handler.Execute("my-task");
}
);
wf.Execute("my-task");
}

return new Workflow("example-exception-handler", MyEntryPoint);
}

static void Main(string[] args)
{
SetupApplication();
Expand All @@ -59,6 +78,9 @@ static void Main(string[] args)
worker.RegisterTaskDef();
}

var workflow = GetWorkflow();
workflow.RegisterWfSpec(config.GetGrpcClientInstance());

Thread.Sleep(300);

foreach (var worker in workers)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
using System;
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Workflow.Spec;
using Moq;
using Xunit;

namespace LittleHorse.Sdk.Tests.Workflow.Spec;

public class WorkflowThreadErrorsAndExceptionsTest
{
private readonly Action<WorkflowThread> _action;
void ParentEntrypoint(WorkflowThread thread)
{
}

public WorkflowThreadErrorsAndExceptionsTest()
{
LHLoggerFactoryProvider.Initialize(null);
_action = ParentEntrypoint;
}

[Fact]
public void WfThread_WithoutSpecificError_ShouldCompileErrorHandling()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfTasks = 2;
var workflowName = "TestWorkflow";
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);

void EntryPointAction(WorkflowThread wf)
{
NodeOutput node = wf.Execute("fail");
wf.HandleError(
node,
handler =>
{
handler.Execute("my-task");
}
);
wf.Execute("my-task");
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);

var compiledWfThread = workflowThread.Compile();

var expectedSpec = new ThreadSpec();
var entrypoint = new Node
{
Entrypoint = new EntrypointNode(),
OutgoingEdges =
{
new Edge { SinkNodeName = "1-fail-TASK" }
}
};

var failTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "fail" }
},
OutgoingEdges = { new Edge { SinkNodeName = "2-my-task-TASK" } },
FailureHandlers =
{
new FailureHandlerDef
{
HandlerSpecName = "exn-handler-1-fail-TASK-FAILURE_TYPE_ERROR",
AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeError
}
}
};

var myTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "my-task" }
},
OutgoingEdges = { new Edge { SinkNodeName = "3-exit-EXIT" } }
};

var exitNode = new Node
{
Exit = new ExitNode()
};

expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint);
expectedSpec.Nodes.Add("1-fail-TASK", failTask);
expectedSpec.Nodes.Add("2-my-task-TASK", myTask);
expectedSpec.Nodes.Add("3-exit-EXIT", exitNode);

var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks;
Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count);
Assert.Equal(expectedSpec, compiledWfThread);
}

[Fact]
public void WfThread_WithSpecificError_ShouldCompileErrorHandling()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfTasks = 2;
var workflowName = "TestWorkflow";
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);

void EntryPointAction(WorkflowThread wf)
{
NodeOutput node = wf.Execute("fail");
wf.HandleError(
node,
LHErrorType.Timeout,
handler =>
{
handler.Execute("my-task");
}
);
wf.Execute("my-task");
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);

var compiledWfThread = workflowThread.Compile();

var expectedSpec = new ThreadSpec();
var entrypoint = new Node
{
Entrypoint = new EntrypointNode(),
OutgoingEdges =
{
new Edge { SinkNodeName = "1-fail-TASK" }
}
};

var failTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "fail" }
},
OutgoingEdges = { new Edge { SinkNodeName = "2-my-task-TASK" } },
FailureHandlers =
{
new FailureHandlerDef
{
HandlerSpecName = "exn-handler-1-fail-TASK-TIMEOUT",
SpecificFailure = "TIMEOUT"
}
}
};

var myTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "my-task" }
},
OutgoingEdges = { new Edge { SinkNodeName = "3-exit-EXIT" } }
};

var exitNode = new Node
{
Exit = new ExitNode()
};

expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint);
expectedSpec.Nodes.Add("1-fail-TASK", failTask);
expectedSpec.Nodes.Add("2-my-task-TASK", myTask);
expectedSpec.Nodes.Add("3-exit-EXIT", exitNode);

var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks;
Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count);
Assert.Equal(expectedSpec, compiledWfThread);
}
}
3 changes: 2 additions & 1 deletion sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/Workflow.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Exceptions;
using LittleHorse.Sdk.Helper;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -51,7 +52,7 @@ public void RegisterWfSpec(LittleHorseClient client)
_logger!.LogInformation(LHMappingHelper.ProtoToJson(client.PutWfSpec(Compile())));
}

private string AddSubThread(string subThreadName, Action<WorkflowThread> subThreadAction)
internal string AddSubThread(string subThreadName, Action<WorkflowThread> subThreadAction)
{
foreach (var threadPair in _threadActions)
{
Expand Down
104 changes: 104 additions & 0 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -573,4 +573,108 @@ private Node CheckTaskNode(TaskNodeOutput node)

return newNode;
}

/// <summary>
/// Attaches an Error Handler to the specified NodeOutput, allowing it to manage specific types of errors
/// as defined by the 'error' parameter. If 'error' is set to null, the handler will catch all errors.
/// </summary>
/// <param name="node">
/// The NodeOutput instance to which the Error Handler will be attached.
/// </param>
/// <param name="error">
/// The type of error that the handler will manage.
/// </param>
/// <param name="handler">
/// A ThreadFunction defining a ThreadSpec that specifies how to handle the error.
/// </param>
public void HandleError(NodeOutput node, LHErrorType error, Action<WorkflowThread> handler)
{
CheckIfWorkflowThreadIsActive();
var errorFormatted = error.ToString().ToUpper();
var handlerDef = BuildFailureHandlerDef(node,
errorFormatted,
handler);
handlerDef.SpecificFailure = errorFormatted;
AddFailureHandlerDef(handlerDef, node);
}

/// <summary>
/// Attaches an Error Handler to the specified NodeOutput, allowing it to manage any types of errors.
///
/// </summary>
/// <param name="node">
/// The NodeOutput instance to which the Error Handler will be attached.
/// </param>
/// <param name="handler">
/// A ThreadFunction defining a ThreadSpec that specifies how to handle the error.
/// </param>
public void HandleError(NodeOutput node, Action<WorkflowThread> handler)
{
CheckIfWorkflowThreadIsActive();
var handlerDef = BuildFailureHandlerDef(node,
"FAILURE_TYPE_ERROR",
handler);
handlerDef.AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeError;
AddFailureHandlerDef(handlerDef, node);
}

/// <summary>
/// Adds an EXIT node with a Failure defined. This causes a ThreadRun to fail, and the resulting
/// Failure has the specified value, name, and human-readable message.
/// </summary>
/// <param name="output">
/// It is a literal value (cast to VariableValue by the Library) or a WfRunVariable.
/// The assigned value is the payload of the resulting Failure, which can be accessed by any
/// Failure Handler ThreadRuns.
/// </param>
/// <param name="failureName">
/// It is the name of the failure to throw.
/// </param>
/// <param name="message">
/// It is a human-readable message.
/// </param>
public void Fail(object? output, string failureName, string? message)
{
CheckIfWorkflowThreadIsActive();
var failureDef = new FailureDef();
if (output != null) failureDef.Content = AssignVariable(output);
if (message != null) failureDef.Message = message;
failureDef.FailureName = failureName;

ExitNode exitNode = new ExitNode { FailureDef = failureDef };

AddNode(failureName, Node.NodeOneofCase.Exit, exitNode);
}

/// <summary>
/// Adds an EXIT node with a Failure defined. This causes a ThreadRun to fail, and the resulting
/// Failure has the specified name and human-readable message.
/// </summary>
/// <param name="failureName">
/// It is the name of the failure to throw.
/// </param>
/// <param name="message">
/// It is a human-readable message.
/// </param>
public void Fail(string failureName, string message)
{
Fail(null, failureName, message);
}

private FailureHandlerDef BuildFailureHandlerDef(NodeOutput node, string error, Action<WorkflowThread> handler)
{
string threadName = $"exn-handler-{node.NodeName}-{error}";

threadName = _parent.AddSubThread(threadName, handler);

return new FailureHandlerDef { HandlerSpecName = threadName };
}

private void AddFailureHandlerDef(FailureHandlerDef handlerDef, NodeOutput node)
{
// Add the failure handler to the most recent node
Node lastNode = FindNode(node.NodeName);

lastNode.FailureHandlers.Add(handlerDef);
}
}
Loading