Skip to content

Commit

Permalink
feat(sdk-dotnet): add error handler to workflow thread (#1314)
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlaCarvajal authored Feb 19, 2025
1 parent 1f4129d commit 282a825
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 1 deletion.
22 changes: 22 additions & 0 deletions sdk-dotnet/Examples/ExceptionsHandlerExample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using ExceptionsHandler;
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;
using LittleHorse.Sdk.Workflow.Spec;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -46,6 +47,24 @@ private static List<LHTaskWorker<MyWorker>> GetTaskWorkers(LHConfig config)
return workers;
}

private static Workflow GetWorkflow()
{
void MyEntryPoint(WorkflowThread wf)
{
NodeOutput node = wf.Execute("fail");
wf.HandleError(
node,
handler =>
{
handler.Execute("my-task");
}
);
wf.Execute("my-task");
}

return new Workflow("example-exception-handler", MyEntryPoint);
}

static void Main(string[] args)
{
SetupApplication();
Expand All @@ -59,6 +78,9 @@ static void Main(string[] args)
worker.RegisterTaskDef();
}

var workflow = GetWorkflow();
workflow.RegisterWfSpec(config.GetGrpcClientInstance());

Thread.Sleep(300);

foreach (var worker in workers)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
using System;
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Workflow.Spec;
using Moq;
using Xunit;

namespace LittleHorse.Sdk.Tests.Workflow.Spec;

public class WorkflowThreadErrorsAndExceptionsTest
{
private readonly Action<WorkflowThread> _action;
void ParentEntrypoint(WorkflowThread thread)
{
}

public WorkflowThreadErrorsAndExceptionsTest()
{
LHLoggerFactoryProvider.Initialize(null);
_action = ParentEntrypoint;
}

[Fact]
public void WfThread_WithoutSpecificError_ShouldCompileErrorHandling()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfTasks = 2;
var workflowName = "TestWorkflow";
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);

void EntryPointAction(WorkflowThread wf)
{
NodeOutput node = wf.Execute("fail");
wf.HandleError(
node,
handler =>
{
handler.Execute("my-task");
}
);
wf.Execute("my-task");
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);

var compiledWfThread = workflowThread.Compile();

var expectedSpec = new ThreadSpec();
var entrypoint = new Node
{
Entrypoint = new EntrypointNode(),
OutgoingEdges =
{
new Edge { SinkNodeName = "1-fail-TASK" }
}
};

var failTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "fail" }
},
OutgoingEdges = { new Edge { SinkNodeName = "2-my-task-TASK" } },
FailureHandlers =
{
new FailureHandlerDef
{
HandlerSpecName = "exn-handler-1-fail-TASK-FAILURE_TYPE_ERROR",
AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeError
}
}
};

var myTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "my-task" }
},
OutgoingEdges = { new Edge { SinkNodeName = "3-exit-EXIT" } }
};

var exitNode = new Node
{
Exit = new ExitNode()
};

expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint);
expectedSpec.Nodes.Add("1-fail-TASK", failTask);
expectedSpec.Nodes.Add("2-my-task-TASK", myTask);
expectedSpec.Nodes.Add("3-exit-EXIT", exitNode);

var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks;
Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count);
Assert.Equal(expectedSpec, compiledWfThread);
}

[Fact]
public void WfThread_WithSpecificError_ShouldCompileErrorHandling()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfTasks = 2;
var workflowName = "TestWorkflow";
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);

void EntryPointAction(WorkflowThread wf)
{
NodeOutput node = wf.Execute("fail");
wf.HandleError(
node,
LHErrorType.Timeout,
handler =>
{
handler.Execute("my-task");
}
);
wf.Execute("my-task");
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);

var compiledWfThread = workflowThread.Compile();

var expectedSpec = new ThreadSpec();
var entrypoint = new Node
{
Entrypoint = new EntrypointNode(),
OutgoingEdges =
{
new Edge { SinkNodeName = "1-fail-TASK" }
}
};

var failTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "fail" }
},
OutgoingEdges = { new Edge { SinkNodeName = "2-my-task-TASK" } },
FailureHandlers =
{
new FailureHandlerDef
{
HandlerSpecName = "exn-handler-1-fail-TASK-TIMEOUT",
SpecificFailure = "TIMEOUT"
}
}
};

var myTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "my-task" }
},
OutgoingEdges = { new Edge { SinkNodeName = "3-exit-EXIT" } }
};

var exitNode = new Node
{
Exit = new ExitNode()
};

expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint);
expectedSpec.Nodes.Add("1-fail-TASK", failTask);
expectedSpec.Nodes.Add("2-my-task-TASK", myTask);
expectedSpec.Nodes.Add("3-exit-EXIT", exitNode);

var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks;
Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count);
Assert.Equal(expectedSpec, compiledWfThread);
}
}
3 changes: 2 additions & 1 deletion sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/Workflow.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Exceptions;
using LittleHorse.Sdk.Helper;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -51,7 +52,7 @@ public void RegisterWfSpec(LittleHorseClient client)
_logger!.LogInformation(LHMappingHelper.ProtoToJson(client.PutWfSpec(Compile())));
}

private string AddSubThread(string subThreadName, Action<WorkflowThread> subThreadAction)
internal string AddSubThread(string subThreadName, Action<WorkflowThread> subThreadAction)
{
foreach (var threadPair in _threadActions)
{
Expand Down
104 changes: 104 additions & 0 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -573,4 +573,108 @@ private Node CheckTaskNode(TaskNodeOutput node)

return newNode;
}

/// <summary>
/// Attaches an Error Handler to the specified NodeOutput, allowing it to manage specific types of errors
/// as defined by the 'error' parameter. If 'error' is set to null, the handler will catch all errors.
/// </summary>
/// <param name="node">
/// The NodeOutput instance to which the Error Handler will be attached.
/// </param>
/// <param name="error">
/// The type of error that the handler will manage.
/// </param>
/// <param name="handler">
/// A ThreadFunction defining a ThreadSpec that specifies how to handle the error.
/// </param>
public void HandleError(NodeOutput node, LHErrorType error, Action<WorkflowThread> handler)
{
CheckIfWorkflowThreadIsActive();
var errorFormatted = error.ToString().ToUpper();
var handlerDef = BuildFailureHandlerDef(node,
errorFormatted,
handler);
handlerDef.SpecificFailure = errorFormatted;
AddFailureHandlerDef(handlerDef, node);
}

/// <summary>
/// Attaches an Error Handler to the specified NodeOutput, allowing it to manage any types of errors.
///
/// </summary>
/// <param name="node">
/// The NodeOutput instance to which the Error Handler will be attached.
/// </param>
/// <param name="handler">
/// A ThreadFunction defining a ThreadSpec that specifies how to handle the error.
/// </param>
public void HandleError(NodeOutput node, Action<WorkflowThread> handler)
{
CheckIfWorkflowThreadIsActive();
var handlerDef = BuildFailureHandlerDef(node,
"FAILURE_TYPE_ERROR",
handler);
handlerDef.AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeError;
AddFailureHandlerDef(handlerDef, node);
}

/// <summary>
/// Adds an EXIT node with a Failure defined. This causes a ThreadRun to fail, and the resulting
/// Failure has the specified value, name, and human-readable message.
/// </summary>
/// <param name="output">
/// It is a literal value (cast to VariableValue by the Library) or a WfRunVariable.
/// The assigned value is the payload of the resulting Failure, which can be accessed by any
/// Failure Handler ThreadRuns.
/// </param>
/// <param name="failureName">
/// It is the name of the failure to throw.
/// </param>
/// <param name="message">
/// It is a human-readable message.
/// </param>
public void Fail(object? output, string failureName, string? message)
{
CheckIfWorkflowThreadIsActive();
var failureDef = new FailureDef();
if (output != null) failureDef.Content = AssignVariable(output);
if (message != null) failureDef.Message = message;
failureDef.FailureName = failureName;

ExitNode exitNode = new ExitNode { FailureDef = failureDef };

AddNode(failureName, Node.NodeOneofCase.Exit, exitNode);
}

/// <summary>
/// Adds an EXIT node with a Failure defined. This causes a ThreadRun to fail, and the resulting
/// Failure has the specified name and human-readable message.
/// </summary>
/// <param name="failureName">
/// It is the name of the failure to throw.
/// </param>
/// <param name="message">
/// It is a human-readable message.
/// </param>
public void Fail(string failureName, string message)
{
Fail(null, failureName, message);
}

private FailureHandlerDef BuildFailureHandlerDef(NodeOutput node, string error, Action<WorkflowThread> handler)
{
string threadName = $"exn-handler-{node.NodeName}-{error}";

threadName = _parent.AddSubThread(threadName, handler);

return new FailureHandlerDef { HandlerSpecName = threadName };
}

private void AddFailureHandlerDef(FailureHandlerDef handlerDef, NodeOutput node)
{
// Add the failure handler to the most recent node
Node lastNode = FindNode(node.NodeName);

lastNode.FailureHandlers.Add(handlerDef);
}
}

0 comments on commit 282a825

Please sign in to comment.