Skip to content

Commit

Permalink
Pipe: Remove already deleted tsfile resource from historical extracti…
Browse files Browse the repository at this point in the history
…on list to prevent FileNotFoundException (#13869)
  • Loading branch information
SteveYurongSu authored Oct 22, 2024
1 parent bacde2f commit ce53f9c
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,27 +517,33 @@ private void extractTsFiles(
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
// Some resource may not be closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource))
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
&& (
// Some resource may not be closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
resourceList.addAll(sequenceTsFileResources);

final Collection<TsFileResource> unsequenceTsFileResources =
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
// Some resource may not be closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource))
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
&& (
// Some resource may not be closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
resourceList.addAll(unsequenceTsFileResources);

Expand Down

0 comments on commit ce53f9c

Please sign in to comment.