Skip to content

Commit

Permalink
Active Load: Fix scan tsfile do not close the stream resource (apache…
Browse files Browse the repository at this point in the history
…#13846)

(cherry picked from commit 928e6b3)
  • Loading branch information
YC27 committed Oct 21, 2024
1 parent 1d75836 commit a2c43f5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService {

Expand Down Expand Up @@ -99,16 +100,20 @@ private void scan() throws IOException {

final boolean isGeneratedByPipe =
listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)
.map(
file ->
(file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS))
? getTsFilePath(file.getAbsolutePath())
: file.getAbsolutePath())
.filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, isGeneratedByPipe));
try (final Stream<File> fileStream =
FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)) {
fileStream
.filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
.filter(File::exists)
.map(
file ->
(file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS))
? getTsFilePath(file.getAbsolutePath())
: file.getAbsolutePath())
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, isGeneratedByPipe));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ private void removeToFailDir(final String filePath) {
}
}

public boolean isFilePendingOrLoading(final String filePath) {
return pendingQueue.isFilePendingOrLoading(filePath);
public boolean isFilePendingOrLoading(final File file) {
return pendingQueue.isFilePendingOrLoading(file.getAbsolutePath());
}

// Metrics
Expand Down

0 comments on commit a2c43f5

Please sign in to comment.