From b8692676bef336c4a58e273421cf53ef1dc8b6ae Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Thu, 16 Jan 2025 19:15:36 +0100 Subject: [PATCH] better handling cancel extractor --- .../xpack/ml/dataframe/process/DataFrameRowsJoiner.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java index 6fd8945623a98..3e1968ca19ce1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java @@ -96,6 +96,9 @@ private void addResultAndJoinIfEndOfBatch(RowResults rowResults) { private void joinCurrentResults() { try (LimitAwareBulkIndexer bulkIndexer = new LimitAwareBulkIndexer(settings, this::executeBulkRequest)) { while (currentResults.isEmpty() == false) { + if (dataExtractor.isCancelled()) { + break; + } RowResults result = currentResults.pop(); DataFrameDataExtractor.Row row = dataFrameRowsIterator.next(); checkChecksumsMatch(row, result); @@ -174,7 +177,7 @@ public boolean hasNext() { @Override public DataFrameDataExtractor.Row next() { DataFrameDataExtractor.Row row = null; - while (hasNoMatch(row) && hasNext() && dataExtractor.isCancelled() == false) { + while (hasNoMatch(row) && hasNext()) { advanceToNextBatchIfNecessary(); row = dataExtractor.createRow(currentDataFrameRows[currentDataFrameRowsIndex++]); }