Skip to content

Commit

Permalink
better handling cancel extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
iverase committed Jan 16, 2025
1 parent 3a0ff94 commit b869267
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
private void joinCurrentResults() {
try (LimitAwareBulkIndexer bulkIndexer = new LimitAwareBulkIndexer(settings, this::executeBulkRequest)) {
while (currentResults.isEmpty() == false) {
if (dataExtractor.isCancelled()) {
break;
}
RowResults result = currentResults.pop();
DataFrameDataExtractor.Row row = dataFrameRowsIterator.next();
checkChecksumsMatch(row, result);
Expand Down Expand Up @@ -174,7 +177,7 @@ public boolean hasNext() {
@Override
public DataFrameDataExtractor.Row next() {
DataFrameDataExtractor.Row row = null;
while (hasNoMatch(row) && hasNext() && dataExtractor.isCancelled() == false) {
while (hasNoMatch(row) && hasNext()) {
advanceToNextBatchIfNecessary();
row = dataExtractor.createRow(currentDataFrameRows[currentDataFrameRowsIndex++]);
}
Expand Down

0 comments on commit b869267

Please sign in to comment.