From 991633b9e8941b95250388913ae993a12e52a4af Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Wed, 8 Jan 2025 13:08:34 -0800 Subject: [PATCH] code refactor Signed-off-by: Junwei Dai --- .../CreateWorkflowTransportAction.java | 14 ++--- .../ProvisionWorkflowTransportAction.java | 53 +++++++++++++++---- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index eda7e42f3..ab5e45df6 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -262,18 +262,14 @@ private void createExecute(WorkflowRequest request, User user, ActionListener { - if (request.getWaitForCompletionTimeout() != null) { - listener.onResponse( - new WorkflowResponse( + listener.onResponse( + request.getWaitForCompletionTimeout() != null + ? new WorkflowResponse( provisionResponse.getWorkflowId(), provisionResponse.getWorkflowState() ) - ); - } else { - listener.onResponse( - new WorkflowResponse(provisionResponse.getWorkflowId()) - ); - } + : new WorkflowResponse(provisionResponse.getWorkflowId()) + ); }, exception -> { String errorMessage = "Provisioning failed."; logger.error(errorMessage, exception); diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 841c76cf5..17aa4cdff 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -354,21 +354,33 @@ public void onFailure(Exception e) { } }, threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL)); + // Schedule timeout handler + scheduleTimeoutHandler(workflowId, listener, timeout, isResponseSent); + } + + /** + * Schedules a timeout handler for workflow execution. + * This method starts a new task in the thread pool to wait for the specified timeout duration. + * If the workflow does not complete within the given timeout, it triggers a follow-up action + * to fetch the workflow's state and notify the listener. + * + * @param workflowId The unique identifier of the workflow being executed. + * @param listener The ActionListener to notify with the workflow's response or failure. + * @param timeout The maximum time (in milliseconds) to wait for the workflow to complete before timing out. + * @param isResponseSent An AtomicBoolean flag to ensure the response is sent only once. + */ + private void scheduleTimeoutHandler( + String workflowId, + ActionListener listener, + long timeout, + AtomicBoolean isResponseSent + ) { threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { try { Thread.sleep(timeout); if (isResponseSent.compareAndSet(false, true)) { logger.warn("Workflow execution timed out for workflowId: {}", workflowId); - client.execute( - GetWorkflowStateAction.INSTANCE, - new GetWorkflowStateRequest(workflowId, false), - ActionListener.wrap( - response -> listener.onResponse(new WorkflowResponse(workflowId, response.getWorkflowState())), - exception -> listener.onFailure( - new FlowFrameworkException("Failed to get workflow state after timeout", ExceptionsHelper.status(exception)) - ) - ) - ); + fetchWorkflowStateAfterTimeout(workflowId, listener); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -376,6 +388,27 @@ public void onFailure(Exception e) { }); } + /** + * Fetches the workflow state after a timeout has occurred. + * This method sends a request to retrieve the current state of the workflow + * and notifies the listener with the updated state or an error if the request fails. + * + * @param workflowId The unique identifier of the workflow whose state needs to be fetched. + * @param listener The ActionListener to notify with the workflow's updated state or failure. + */ + private void fetchWorkflowStateAfterTimeout(String workflowId, ActionListener listener) { + client.execute( + GetWorkflowStateAction.INSTANCE, + new GetWorkflowStateRequest(workflowId, false), + ActionListener.wrap( + response -> listener.onResponse(new WorkflowResponse(workflowId, response.getWorkflowState())), + exception -> listener.onFailure( + new FlowFrameworkException("Failed to get workflow state after timeout", ExceptionsHelper.status(exception)) + ) + ) + ); + } + /** * Executes the given workflow sequence * @param workflowSequence The topologically sorted workflow to execute