Skip to content

Commit

Permalink
code refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
Junwei Dai committed Jan 8, 2025
1 parent 95085ff commit 991633b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,14 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
ProvisionWorkflowAction.INSTANCE,
workflowRequest,
ActionListener.wrap(provisionResponse -> {
if (request.getWaitForCompletionTimeout() != null) {
listener.onResponse(
new WorkflowResponse(
listener.onResponse(
request.getWaitForCompletionTimeout() != null
? new WorkflowResponse(
provisionResponse.getWorkflowId(),
provisionResponse.getWorkflowState()
)
);
} else {
listener.onResponse(
new WorkflowResponse(provisionResponse.getWorkflowId())
);
}
: new WorkflowResponse(provisionResponse.getWorkflowId())
);
}, exception -> {
String errorMessage = "Provisioning failed.";
logger.error(errorMessage, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,28 +354,61 @@ public void onFailure(Exception e) {
}
}, threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL));

// Schedule timeout handler
scheduleTimeoutHandler(workflowId, listener, timeout, isResponseSent);
}

/**
* Schedules a timeout handler for workflow execution.
* This method starts a new task in the thread pool to wait for the specified timeout duration.
* If the workflow does not complete within the given timeout, it triggers a follow-up action
* to fetch the workflow's state and notify the listener.
*
* @param workflowId The unique identifier of the workflow being executed.
* @param listener The ActionListener to notify with the workflow's response or failure.
* @param timeout The maximum time (in milliseconds) to wait for the workflow to complete before timing out.
* @param isResponseSent An AtomicBoolean flag to ensure the response is sent only once.
*/
private void scheduleTimeoutHandler(
String workflowId,
ActionListener<WorkflowResponse> listener,
long timeout,
AtomicBoolean isResponseSent
) {
threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> {
try {
Thread.sleep(timeout);
if (isResponseSent.compareAndSet(false, true)) {
logger.warn("Workflow execution timed out for workflowId: {}", workflowId);
client.execute(
GetWorkflowStateAction.INSTANCE,
new GetWorkflowStateRequest(workflowId, false),
ActionListener.wrap(
response -> listener.onResponse(new WorkflowResponse(workflowId, response.getWorkflowState())),
exception -> listener.onFailure(
new FlowFrameworkException("Failed to get workflow state after timeout", ExceptionsHelper.status(exception))
)
)
);
fetchWorkflowStateAfterTimeout(workflowId, listener);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

/**
* Fetches the workflow state after a timeout has occurred.
* This method sends a request to retrieve the current state of the workflow
* and notifies the listener with the updated state or an error if the request fails.
*
* @param workflowId The unique identifier of the workflow whose state needs to be fetched.
* @param listener The ActionListener to notify with the workflow's updated state or failure.
*/
private void fetchWorkflowStateAfterTimeout(String workflowId, ActionListener<WorkflowResponse> listener) {
client.execute(
GetWorkflowStateAction.INSTANCE,
new GetWorkflowStateRequest(workflowId, false),
ActionListener.wrap(
response -> listener.onResponse(new WorkflowResponse(workflowId, response.getWorkflowState())),
exception -> listener.onFailure(
new FlowFrameworkException("Failed to get workflow state after timeout", ExceptionsHelper.status(exception))
)
)
);
}

/**
* Executes the given workflow sequence
* @param workflowSequence The topologically sorted workflow to execute
Expand Down

0 comments on commit 991633b

Please sign in to comment.