From 1a7cccbd984adee7358816fc41241056de7fb0ce Mon Sep 17 00:00:00 2001 From: Ashvin Agrawal Date: Thu, 1 Feb 2024 15:01:12 -0800 Subject: [PATCH] Add configurable filter to control relative paths --- .../spi/filter/FilterInterceptor.java | 67 +++++++++ .../spi/filter/SnapshotFilesFilter.java | 32 +++++ .../java/io/onetable/delta/DeltaClient.java | 48 +++++-- .../delta/DeltaDataFileUpdatesExtractor.java | 5 +- .../io/onetable/filter/FilterManager.java | 124 ++++++++++++++++ .../onetable/filter/ToRelativePathFilter.java | 87 ++++++++++++ .../io/onetable/filter/FilterManagerTest.java | 133 ++++++++++++++++++ .../filter/ToRelativePathFilterTest.java | 105 ++++++++++++++ 8 files changed, 589 insertions(+), 12 deletions(-) create mode 100644 api/src/main/java/io/onetable/spi/filter/FilterInterceptor.java create mode 100644 api/src/main/java/io/onetable/spi/filter/SnapshotFilesFilter.java create mode 100644 core/src/main/java/io/onetable/filter/FilterManager.java create mode 100644 core/src/main/java/io/onetable/filter/ToRelativePathFilter.java create mode 100644 core/src/test/java/io/onetable/filter/FilterManagerTest.java create mode 100644 core/src/test/java/io/onetable/filter/ToRelativePathFilterTest.java diff --git a/api/src/main/java/io/onetable/spi/filter/FilterInterceptor.java b/api/src/main/java/io/onetable/spi/filter/FilterInterceptor.java new file mode 100644 index 000000000..dd219eeda --- /dev/null +++ b/api/src/main/java/io/onetable/spi/filter/FilterInterceptor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable.spi.filter; + +import java.util.Map; + +/** + * A {@link FilterInterceptor} is the filter component of the Intercepting Filter design pattern. It + * is used to apply a transformation or to instrument a given input. Concrete implementations of + * this interface are injected (intercept) in the table format translation workflow to customize the + * behavior of the default translation workflow, or to generate logs or metrics. + * + *

E.g. of filters 1) a filter to convert absolute paths of data files to relative paths 2) a + * filter to emit metrics for the number of data files in a snapshot 3) validate the table metadata + * and throw an exception if certain conditions are not met + * + *

A filter to generate relative paths may be needed by certain query planners. In such a case + * the user would inject such a filter before data files are written in the target format. Different + * users may need to integrate with different metric collectors. A user can inject a specific filter + * and limit the number of dependencies in the core code. + * + *

As such, the filter is a tool to customize the behavior of the table format translation + * workflow. + */ +public interface FilterInterceptor { + /** + * Each filter is identifiable by a name. This name can be used in config files to specify which + * filters to apply at runtime. + * + * @return the identifier of this filter. + */ + String getIdentifier(); + + /** + * Initialize the filter with the given properties. + * + * @param properties the properties map to initialize the filter. + */ + void init(Map properties); + + /** + * Apply the filter to the given input. Note that this method may alter the input. + * + * @param input the input to apply the filter to. + * @return the transformed input. + */ + T apply(T input); + + /** Close the filter and release any resources. */ + default void close() {} +} diff --git a/api/src/main/java/io/onetable/spi/filter/SnapshotFilesFilter.java b/api/src/main/java/io/onetable/spi/filter/SnapshotFilesFilter.java new file mode 100644 index 000000000..801946290 --- /dev/null +++ b/api/src/main/java/io/onetable/spi/filter/SnapshotFilesFilter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable.spi.filter; + +import java.util.List; + +import io.onetable.model.storage.OneDataFile; +import io.onetable.spi.sync.TargetClient; + +/** + * This extension of the {@link FilterInterceptor} interface is used to apply a filter or + * transformation on a list of {@link OneDataFile}s. Concrete implementation of this interface would + * typically be applied by a {@link TargetClient} before writing a list of {@link OneDataFile}s + * related to a specific snapshot. + */ +public interface SnapshotFilesFilter extends FilterInterceptor> {} diff --git a/core/src/main/java/io/onetable/delta/DeltaClient.java b/core/src/main/java/io/onetable/delta/DeltaClient.java index 8b2ffdbbe..ec6ec4ab3 100644 --- a/core/src/main/java/io/onetable/delta/DeltaClient.java +++ b/core/src/main/java/io/onetable/delta/DeltaClient.java @@ -19,12 +19,8 @@ package io.onetable.delta; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -55,13 +51,16 @@ import io.onetable.client.PerTableConfig; import io.onetable.exception.NotSupportedException; +import io.onetable.filter.FilterManager; import io.onetable.model.OneTable; import io.onetable.model.OneTableMetadata; import io.onetable.model.schema.OnePartitionField; import io.onetable.model.schema.OneSchema; +import io.onetable.model.storage.OneDataFile; import io.onetable.model.storage.OneDataFilesDiff; import io.onetable.model.storage.OneFileGroup; import io.onetable.model.storage.TableFormat; +import io.onetable.spi.filter.SnapshotFilesFilter; import io.onetable.spi.sync.TargetClient; public class DeltaClient implements TargetClient { @@ -78,6 +77,8 @@ public class DeltaClient implements TargetClient { private int logRetentionInHours; private TransactionState transactionState; + private FilterManager> snapshotFileFilterManager; + public DeltaClient() {} public DeltaClient(PerTableConfig perTableConfig, SparkSession sparkSession) { @@ -130,6 +131,17 @@ private void _init( this.deltaLog = deltaLog; this.tableName = tableName; this.logRetentionInHours = logRetentionInHours; + + initializeFilters(tableDataPath); + } + + @VisibleForTesting + protected void initializeFilters(String tableDataPath) { + snapshotFileFilterManager = + new FilterManager<>( + Collections.singletonMap("ToRelativePathFilter.basePath", tableDataPath)); + snapshotFileFilterManager.loadFilters( + SnapshotFilesFilter.class, Collections.singleton("ToRelativePathFilter")); } @Override @@ -179,16 +191,36 @@ public void syncMetadata(OneTableMetadata metadata) { @Override public void syncFilesForSnapshot(List partitionedDataFiles) { + List filteredFileGroups = + partitionedDataFiles.stream() + .map( + group -> { + List filteredFiles = + snapshotFileFilterManager.process(group.getFiles()); + return OneFileGroup.builder() + .partitionValues(group.getPartitionValues()) + .files(filteredFiles) + .build(); + }) + .filter(group -> !group.getFiles().isEmpty()) + .collect(Collectors.toList()); + transactionState.setActions( dataFileUpdatesExtractor.applySnapshot( - deltaLog, partitionedDataFiles, transactionState.getLatestSchemaInternal())); + deltaLog, filteredFileGroups, transactionState.getLatestSchemaInternal())); } @Override public void syncFilesForDiff(OneDataFilesDiff oneDataFilesDiff) { + List filesAdded = new ArrayList<>(oneDataFilesDiff.getFilesAdded()); + filesAdded = snapshotFileFilterManager.process(filesAdded); + List filesRemoved = new ArrayList<>(oneDataFilesDiff.getFilesRemoved()); + filesRemoved = snapshotFileFilterManager.process(filesRemoved); + OneDataFilesDiff filteredOneDataFilesDiff = OneDataFilesDiff.from(filesAdded, filesRemoved); + transactionState.setActions( dataFileUpdatesExtractor.applyDiff( - oneDataFilesDiff, + filteredOneDataFilesDiff, transactionState.getLatestSchemaInternal(), deltaLog.dataPath().toString())); } diff --git a/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java b/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java index 4b16f4747..c84c61a12 100644 --- a/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java +++ b/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java @@ -43,7 +43,6 @@ import io.onetable.model.storage.OneDataFile; import io.onetable.model.storage.OneDataFilesDiff; import io.onetable.model.storage.OneFileGroup; -import io.onetable.paths.PathUtils; import io.onetable.spi.extractor.DataFileIterator; @Builder @@ -99,9 +98,7 @@ private Stream createAddFileAction( OneDataFile dataFile, OneSchema schema, String tableBasePath) { return Stream.of( new AddFile( - // Delta Lake supports relative and absolute paths in theory but relative paths seem - // more commonly supported by query engines in our testing - PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath), + dataFile.getPhysicalPath(), convertJavaMapToScala(deltaPartitionExtractor.partitionValueSerialization(dataFile)), dataFile.getFileSizeBytes(), dataFile.getLastModified(), diff --git a/core/src/main/java/io/onetable/filter/FilterManager.java b/core/src/main/java/io/onetable/filter/FilterManager.java new file mode 100644 index 000000000..2d37f678f --- /dev/null +++ b/core/src/main/java/io/onetable/filter/FilterManager.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable.filter; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import org.apache.arrow.util.VisibleForTesting; + +import io.onetable.model.storage.OneDataFile; +import io.onetable.spi.filter.FilterInterceptor; + +/** + * This class is responsible for applying custom transformations at specific points in the format + * translation flow using a configured list of {@link FilterInterceptor}s. On invocation, it applies + * the filters on the given input in the order they were registered. The output of one filter is the + * input of the next filter. Given that a filter can alter the input, a subsequent filter in the + * chain would apply its transformation on the original input. + * + *

An instance of this class expects filters of a specific type (parameterized), which would all + * operate on the same type of input. When a filter is registered, it is initialized with the + * properties provided to this class. The registration can be done programmatically, or using the + * ServiceLoader. + * + *

For e.g. an instance of the {@link FilterManager} responsible for transforming data file + * before adding to the target client. The registered filters include a filter to ensure all paths + * are relative, and a filter to emit the count of files added to the target client. At the entry + * point of data file sync, this manager would invoke both the filters in that order. As such the + * specific instance would expect the INPUT_TYPE to be list of {@link OneDataFile}. + * + * @param the type of filter to manage + * @param the type of input to filter + */ +// this class is lombok.Data so that this class can be extended +@Data +@RequiredArgsConstructor +public class FilterManager, INPUT_TYPE> { + /** A list of all the filters registered with this manager */ + private final List filters = new ArrayList<>(); + + /** + * A map of configurations of all the filters. This is passed to each filter when it is registered + */ + private final Map properties; + + public INPUT_TYPE process(INPUT_TYPE input) { + for (FILTER_TYPE filter : filters) { + input = filter.apply(input); + } + return input; + } + + /** + * Load filters of the given type using the ServiceLoader. If filterIdentifiers is not empty, only + * the filters with the given identifiers are registered and in the order specified in the list. + * If filterIdentifiers is empty, all the filters of the given type are registered. + * + *

Note: all previously registered filters are removed and closed before the new filters are + * loaded. + * + * @param filterTypeClass the type of filters to load + * @param filterIdentifiers the identifiers of the filters to register + */ + public void loadFilters(Class filterTypeClass, Set filterIdentifiers) { + filters.forEach(FilterInterceptor::close); + filters.clear(); + + Map allFilters = getAvailableFilters(filterTypeClass); + if (filterIdentifiers == null || filterIdentifiers.isEmpty()) { + allFilters.values().forEach(this::registerFilter); + } else { + filterIdentifiers.stream() + .filter(allFilters::containsKey) + .map(allFilters::get) + .forEach(this::registerFilter); + } + } + + /** + * Register a filter with this manager. The filter is initialized with the properties provided to + * this manager. The filter is added to the end of the list of filters. It will be invoked after + * all the existing filters and will receive the output of the last filter as input. + * + * @param filter the filter to register + */ + void registerFilter(FILTER_TYPE filter) { + filter.init(properties); + filters.add(filter); + } + + @VisibleForTesting + protected Map getAvailableFilters(Class filterTypeClass) { + ServiceLoader loader = ServiceLoader.load(filterTypeClass); + Map allFilters = + StreamSupport.stream(loader.spliterator(), false) + .collect(Collectors.toMap(FilterInterceptor::getIdentifier, f -> f)); + return allFilters; + } + + @VisibleForTesting + List getFilters() { + return Collections.unmodifiableList(filters); + } +} diff --git a/core/src/main/java/io/onetable/filter/ToRelativePathFilter.java b/core/src/main/java/io/onetable/filter/ToRelativePathFilter.java new file mode 100644 index 000000000..02b2c6de5 --- /dev/null +++ b/core/src/main/java/io/onetable/filter/ToRelativePathFilter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable.filter; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import io.onetable.model.storage.OneDataFile; +import io.onetable.spi.filter.SnapshotFilesFilter; + +/** + * This filter will generate a new list of data files, in which the absolute paths of input data + * files will be replaced by relative paths. This path conversion will be done only if the input + * data files are located in the subtree with a configured base path as the root. For e.g. if the + * configured base is located at /data/table1, and the input data files are located at + * /data/table1/2019-01/abc.parquet, then the output data files will be located at + * 2019-01/abc.parquet. However, if the input data files are located at + * /data/table2/2019-01/abc.parquet, then the output data files will have the same path as the input + * file, i.e. located at /data/table2/2019-01/abc.parquet. + */ +public class ToRelativePathFilter implements SnapshotFilesFilter { + private static final String BASE_PATH = ToRelativePathFilter.class.getSimpleName() + ".basePath"; + private String basePath; + + @Override + public String getIdentifier() { + return ToRelativePathFilter.class.getSimpleName(); + } + + @Override + public void init(Map properties) { + basePath = properties.get(BASE_PATH); + } + + /** + * This method will perform the path conversion described in the contract of this filter. + * + * @param files a list of data files whose path needs to be converted + * @return a new list of data files, with new file objects, whose path has been converted + */ + @Override + public List apply(List files) { + List updatedFiles = + files.stream() + .map( + file -> { + String oldPath = file.getPhysicalPath(); + if (!oldPath.startsWith(basePath)) { + return file; + } + + String newPath = oldPath.substring(basePath.length() + 1); + + // create a new data file + return OneDataFile.builder() + .schemaVersion(file.getSchemaVersion()) + .physicalPath(newPath) + .fileFormat(file.getFileFormat()) + .partitionValues(file.getPartitionValues()) + .fileSizeBytes(file.getFileSizeBytes()) + .recordCount(file.getRecordCount()) + .columnStats(file.getColumnStats()) + .lastModified(file.getLastModified()) + .build(); + }) + .collect(Collectors.toList()); + + return updatedFiles; + } +} diff --git a/core/src/test/java/io/onetable/filter/FilterManagerTest.java b/core/src/test/java/io/onetable/filter/FilterManagerTest.java new file mode 100644 index 000000000..edde40c4b --- /dev/null +++ b/core/src/test/java/io/onetable/filter/FilterManagerTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable.filter; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Sets; + +import io.onetable.spi.filter.FilterInterceptor; + +class FilterManagerTest { + + private HashMap> availableFilters; + private FilterInterceptor mockFilter1; + private FilterInterceptor mockFilter2; + private FilterInterceptor mockFilter3; + + @BeforeEach + void setup() { + availableFilters = new HashMap<>(); + mockFilter1 = mock(FilterInterceptor.class); + when(mockFilter1.getIdentifier()).thenReturn("mock1"); + availableFilters.put("mock1", mockFilter1); + mockFilter2 = mock(FilterInterceptor.class); + when(mockFilter2.getIdentifier()).thenReturn("mock2"); + availableFilters.put("mock2", mockFilter2); + mockFilter3 = mock(FilterInterceptor.class); + when(mockFilter3.getIdentifier()).thenReturn("mock3"); + availableFilters.put("mock3", mockFilter3); + } + + @Test + void process() { + HashMap properties = new HashMap<>(); + FilterManager, String> manager = new FilterManager<>(properties); + FilterManager, String> spyManager = spy(manager); + + Class> filterType = + (Class>) mockFilter1.getClass(); + doReturn(availableFilters).when(spyManager).getAvailableFilters(filterType); + + spyManager.loadFilters(filterType, Sets.newHashSet("mock1", "mock3")); + + when(mockFilter1.apply("test")).thenReturn("test1"); + when(mockFilter3.apply("test1")).thenReturn("test3"); + + String result = spyManager.process("test"); + assertEquals("test3", result); + verify(mockFilter1, times(1)).apply("test"); + verify(mockFilter2, never()).apply(any()); + verify(mockFilter3, times(1)).apply("test1"); + } + + @Test + void loadFilters() { + HashMap properties = new HashMap<>(); + FilterManager, String> manager = new FilterManager<>(properties); + FilterManager, String> spyManager = spy(manager); + + Class> filterType = + (Class>) mockFilter1.getClass(); + doReturn(availableFilters).when(spyManager).getAvailableFilters(filterType); + + // test load all filters + spyManager.loadFilters(filterType, null); + List> filters = spyManager.getFilters(); + assertEquals(3, filters.size()); + assertTrue(filters.contains(mockFilter1)); + assertTrue(filters.contains(mockFilter2)); + assertTrue(filters.contains(mockFilter3)); + verify(mockFilter1, times(1)).init(properties); + verify(mockFilter2, times(1)).init(properties); + verify(mockFilter3, times(1)).init(properties); + + spyManager.loadFilters(filterType, Sets.newHashSet("mock1")); + filters = spyManager.getFilters(); + assertEquals(1, filters.size()); + assertTrue(filters.contains(mockFilter1)); + verify(mockFilter1, times(2)).init(properties); + verify(mockFilter1, times(1)).close(); + verify(mockFilter2, times(1)).close(); + + spyManager.loadFilters(filterType, Sets.newHashSet("mock1", "mock3")); + filters = spyManager.getFilters(); + assertEquals(2, filters.size()); + assertTrue(filters.contains(mockFilter1)); + assertTrue(filters.contains(mockFilter3)); + } + + @Test + void registerFilter() { + HashMap properties = new HashMap<>(); + FilterManager, String> manager = new FilterManager<>(properties); + assertTrue(manager.getFilters().isEmpty()); + + manager.registerFilter(mockFilter1); + List> filters = manager.getFilters(); + assertEquals(1, filters.size()); + assertEquals(mockFilter1, filters.get(0)); + verify(mockFilter1, times(1)).init(properties); + + manager.registerFilter(mockFilter2); + filters = manager.getFilters(); + assertEquals(2, filters.size()); + assertEquals(mockFilter1, filters.get(0)); + assertEquals(mockFilter2, filters.get(1)); + verify(mockFilter1, times(1)).init(properties); + verify(mockFilter2, times(1)).init(properties); + } +} diff --git a/core/src/test/java/io/onetable/filter/ToRelativePathFilterTest.java b/core/src/test/java/io/onetable/filter/ToRelativePathFilterTest.java new file mode 100644 index 000000000..601feceb2 --- /dev/null +++ b/core/src/test/java/io/onetable/filter/ToRelativePathFilterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable.filter; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.provider.Arguments; + +import io.onetable.model.storage.OneDataFile; + +class ToRelativePathFilterTest { + @Test + void applyToFilesNotInSubTree() { + ToRelativePathFilter filter = new ToRelativePathFilter(); + String basePath = "scheme:///db/table"; + filter.init(Collections.singletonMap("ToRelativePathFilter.basePath", basePath)); + + List dataFiles = + Arrays.asList( + OneDataFile.builder().physicalPath("abfs://db/table/2019-01/1.parquet").build(), + OneDataFile.builder().physicalPath("s3://db/other_table/2020-01/2.parquet").build(), + OneDataFile.builder().physicalPath("scheme://db/other_table/1/2/3/4/3.parquet").build(), + OneDataFile.builder().physicalPath("file:///db/1.parquet").build()); + + List expected = + Arrays.asList( + OneDataFile.builder().physicalPath("abfs://db/table/2019-01/1.parquet").build(), + OneDataFile.builder().physicalPath("s3://db/other_table/2020-01/2.parquet").build(), + OneDataFile.builder().physicalPath("scheme://db/other_table/1/2/3/4/3.parquet").build(), + OneDataFile.builder().physicalPath("file:///db/1.parquet").build()); + + List result = filter.apply(dataFiles); + assertNotNull(result); + assertEquals(dataFiles.size(), result.size()); + assertEquals(expected, result); + } + + @Test + void applyToFilesInSubTree() { + ToRelativePathFilter filter = new ToRelativePathFilter(); + String basePath = "scheme://db/table"; + filter.init(Collections.singletonMap("ToRelativePathFilter.basePath", basePath)); + + List dataFiles = + Arrays.asList( + OneDataFile.builder().physicalPath(basePath + "/2019-01/1.parquet").build(), + OneDataFile.builder().physicalPath(basePath + "/2020-01/2.parquet").build(), + OneDataFile.builder().physicalPath(basePath + "/1/2/3/4/3.parquet").build(), + OneDataFile.builder().physicalPath("1.parquet").build()); + + List expected = + Arrays.asList( + OneDataFile.builder().physicalPath("2019-01/1.parquet").build(), + OneDataFile.builder().physicalPath("2020-01/2.parquet").build(), + OneDataFile.builder().physicalPath("1/2/3/4/3.parquet").build(), + OneDataFile.builder().physicalPath("1.parquet").build()); + + assertEquals(expected, filter.apply(dataFiles)); + } + + private static Stream inputs() { + return Stream.of( + Arguments.of("/relative/path", "file:///absolute/path", "relative/path"), + Arguments.of( + "file:///absolute/path/to/file.parquet", "file:///absolute/path", "to/file.parquet"), + Arguments.of( + "file:///absolute/path/to/file.parquet", "file:/absolute/path", "to/file.parquet"), + Arguments.of( + "file:/absolute/path/to/file.parquet", "file:///absolute/path", "to/file.parquet"), + Arguments.of("s3://absolute/path/to/file.parquet", "s3://absolute/path", "to/file.parquet"), + Arguments.of( + "s3a://absolute/path/to/file.parquet", "s3a://absolute/path", "to/file.parquet"), + Arguments.of( + "s3a://absolute/path/to/file.parquet", "s3://absolute/path", "to/file.parquet"), + Arguments.of( + "s3://absolute/path/to/file.parquet", "s3a://absolute/path", "to/file.parquet")); + } + + @Test + void getIdentifier() { + assertEquals("ToRelativePathFilter", new ToRelativePathFilter().getIdentifier()); + } +}