Skip to content

Commit

Permalink
Add configurable filter to control relative paths
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvina committed Feb 5, 2024
1 parent 69282bd commit 1a7cccb
Show file tree
Hide file tree
Showing 8 changed files with 589 additions and 12 deletions.
67 changes: 67 additions & 0 deletions api/src/main/java/io/onetable/spi/filter/FilterInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.spi.filter;

import java.util.Map;

/**
* A {@link FilterInterceptor} is the filter component of the Intercepting Filter design pattern. It
* is used to apply a transformation or to instrument a given input. Concrete implementations of
* this interface are injected (intercept) in the table format translation workflow to customize the
* behavior of the default translation workflow, or to generate logs or metrics.
*
* <p>E.g. of filters 1) a filter to convert absolute paths of data files to relative paths 2) a
* filter to emit metrics for the number of data files in a snapshot 3) validate the table metadata
* and throw an exception if certain conditions are not met
*
* <p>A filter to generate relative paths may be needed by certain query planners. In such a case
* the user would inject such a filter before data files are written in the target format. Different
* users may need to integrate with different metric collectors. A user can inject a specific filter
* and limit the number of dependencies in the core code.
*
* <p>As such, the filter is a tool to customize the behavior of the table format translation
* workflow.
*/
public interface FilterInterceptor<T> {
/**
* Each filter is identifiable by a name. This name can be used in config files to specify which
* filters to apply at runtime.
*
* @return the identifier of this filter.
*/
String getIdentifier();

/**
* Initialize the filter with the given properties.
*
* @param properties the properties map to initialize the filter.
*/
void init(Map<String, String> properties);

/**
* Apply the filter to the given input. Note that this method may alter the input.
*
* @param input the input to apply the filter to.
* @return the transformed input.
*/
T apply(T input);

/** Close the filter and release any resources. */
default void close() {}
}
32 changes: 32 additions & 0 deletions api/src/main/java/io/onetable/spi/filter/SnapshotFilesFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.spi.filter;

import java.util.List;

import io.onetable.model.storage.OneDataFile;
import io.onetable.spi.sync.TargetClient;

/**
* This extension of the {@link FilterInterceptor} interface is used to apply a filter or
* transformation on a list of {@link OneDataFile}s. Concrete implementation of this interface would
* typically be applied by a {@link TargetClient} before writing a list of {@link OneDataFile}s
* related to a specific snapshot.
*/
public interface SnapshotFilesFilter extends FilterInterceptor<List<OneDataFile>> {}
48 changes: 40 additions & 8 deletions core/src/main/java/io/onetable/delta/DeltaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
package io.onetable.delta;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -55,13 +51,16 @@

import io.onetable.client.PerTableConfig;
import io.onetable.exception.NotSupportedException;
import io.onetable.filter.FilterManager;
import io.onetable.model.OneTable;
import io.onetable.model.OneTableMetadata;
import io.onetable.model.schema.OnePartitionField;
import io.onetable.model.schema.OneSchema;
import io.onetable.model.storage.OneDataFile;
import io.onetable.model.storage.OneDataFilesDiff;
import io.onetable.model.storage.OneFileGroup;
import io.onetable.model.storage.TableFormat;
import io.onetable.spi.filter.SnapshotFilesFilter;
import io.onetable.spi.sync.TargetClient;

public class DeltaClient implements TargetClient {
Expand All @@ -78,6 +77,8 @@ public class DeltaClient implements TargetClient {
private int logRetentionInHours;
private TransactionState transactionState;

private FilterManager<SnapshotFilesFilter, List<OneDataFile>> snapshotFileFilterManager;

public DeltaClient() {}

public DeltaClient(PerTableConfig perTableConfig, SparkSession sparkSession) {
Expand Down Expand Up @@ -130,6 +131,17 @@ private void _init(
this.deltaLog = deltaLog;
this.tableName = tableName;
this.logRetentionInHours = logRetentionInHours;

initializeFilters(tableDataPath);
}

@VisibleForTesting
protected void initializeFilters(String tableDataPath) {
snapshotFileFilterManager =
new FilterManager<>(
Collections.singletonMap("ToRelativePathFilter.basePath", tableDataPath));
snapshotFileFilterManager.loadFilters(
SnapshotFilesFilter.class, Collections.singleton("ToRelativePathFilter"));
}

@Override
Expand Down Expand Up @@ -179,16 +191,36 @@ public void syncMetadata(OneTableMetadata metadata) {

@Override
public void syncFilesForSnapshot(List<OneFileGroup> partitionedDataFiles) {
List<OneFileGroup> filteredFileGroups =
partitionedDataFiles.stream()
.map(
group -> {
List<OneDataFile> filteredFiles =
snapshotFileFilterManager.process(group.getFiles());
return OneFileGroup.builder()
.partitionValues(group.getPartitionValues())
.files(filteredFiles)
.build();
})
.filter(group -> !group.getFiles().isEmpty())
.collect(Collectors.toList());

transactionState.setActions(
dataFileUpdatesExtractor.applySnapshot(
deltaLog, partitionedDataFiles, transactionState.getLatestSchemaInternal()));
deltaLog, filteredFileGroups, transactionState.getLatestSchemaInternal()));
}

@Override
public void syncFilesForDiff(OneDataFilesDiff oneDataFilesDiff) {
List<OneDataFile> filesAdded = new ArrayList<>(oneDataFilesDiff.getFilesAdded());
filesAdded = snapshotFileFilterManager.process(filesAdded);
List<OneDataFile> filesRemoved = new ArrayList<>(oneDataFilesDiff.getFilesRemoved());
filesRemoved = snapshotFileFilterManager.process(filesRemoved);
OneDataFilesDiff filteredOneDataFilesDiff = OneDataFilesDiff.from(filesAdded, filesRemoved);

transactionState.setActions(
dataFileUpdatesExtractor.applyDiff(
oneDataFilesDiff,
filteredOneDataFilesDiff,
transactionState.getLatestSchemaInternal(),
deltaLog.dataPath().toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.onetable.model.storage.OneDataFile;
import io.onetable.model.storage.OneDataFilesDiff;
import io.onetable.model.storage.OneFileGroup;
import io.onetable.paths.PathUtils;
import io.onetable.spi.extractor.DataFileIterator;

@Builder
Expand Down Expand Up @@ -99,9 +98,7 @@ private Stream<AddFile> createAddFileAction(
OneDataFile dataFile, OneSchema schema, String tableBasePath) {
return Stream.of(
new AddFile(
// Delta Lake supports relative and absolute paths in theory but relative paths seem
// more commonly supported by query engines in our testing
PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath),
dataFile.getPhysicalPath(),
convertJavaMapToScala(deltaPartitionExtractor.partitionValueSerialization(dataFile)),
dataFile.getFileSizeBytes(),
dataFile.getLastModified(),
Expand Down
124 changes: 124 additions & 0 deletions core/src/main/java/io/onetable/filter/FilterManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.filter;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import lombok.Data;
import lombok.RequiredArgsConstructor;

import org.apache.arrow.util.VisibleForTesting;

import io.onetable.model.storage.OneDataFile;
import io.onetable.spi.filter.FilterInterceptor;

/**
* This class is responsible for applying custom transformations at specific points in the format
* translation flow using a configured list of {@link FilterInterceptor}s. On invocation, it applies
* the filters on the given input in the order they were registered. The output of one filter is the
* input of the next filter. Given that a filter can alter the input, a subsequent filter in the
* chain would apply its transformation on the original input.
*
* <p>An instance of this class expects filters of a specific type (parameterized), which would all
* operate on the same type of input. When a filter is registered, it is initialized with the
* properties provided to this class. The registration can be done programmatically, or using the
* ServiceLoader.
*
* <p>For e.g. an instance of the {@link FilterManager} responsible for transforming data file
* before adding to the target client. The registered filters include a filter to ensure all paths
* are relative, and a filter to emit the count of files added to the target client. At the entry
* point of data file sync, this manager would invoke both the filters in that order. As such the
* specific instance would expect the INPUT_TYPE to be list of {@link OneDataFile}.
*
* @param <FILTER_TYPE> the type of filter to manage
* @param <INPUT_TYPE> the type of input to filter
*/
// this class is lombok.Data so that this class can be extended
@Data
@RequiredArgsConstructor
public class FilterManager<FILTER_TYPE extends FilterInterceptor<INPUT_TYPE>, INPUT_TYPE> {
/** A list of all the filters registered with this manager */
private final List<FILTER_TYPE> filters = new ArrayList<>();

/**
* A map of configurations of all the filters. This is passed to each filter when it is registered
*/
private final Map<String, String> properties;

public INPUT_TYPE process(INPUT_TYPE input) {
for (FILTER_TYPE filter : filters) {
input = filter.apply(input);
}
return input;
}

/**
* Load filters of the given type using the ServiceLoader. If filterIdentifiers is not empty, only
* the filters with the given identifiers are registered and in the order specified in the list.
* If filterIdentifiers is empty, all the filters of the given type are registered.
*
* <p>Note: all previously registered filters are removed and closed before the new filters are
* loaded.
*
* @param filterTypeClass the type of filters to load
* @param filterIdentifiers the identifiers of the filters to register
*/
public void loadFilters(Class<FILTER_TYPE> filterTypeClass, Set<String> filterIdentifiers) {
filters.forEach(FilterInterceptor::close);
filters.clear();

Map<String, FILTER_TYPE> allFilters = getAvailableFilters(filterTypeClass);
if (filterIdentifiers == null || filterIdentifiers.isEmpty()) {
allFilters.values().forEach(this::registerFilter);
} else {
filterIdentifiers.stream()
.filter(allFilters::containsKey)
.map(allFilters::get)
.forEach(this::registerFilter);
}
}

/**
* Register a filter with this manager. The filter is initialized with the properties provided to
* this manager. The filter is added to the end of the list of filters. It will be invoked after
* all the existing filters and will receive the output of the last filter as input.
*
* @param filter the filter to register
*/
void registerFilter(FILTER_TYPE filter) {
filter.init(properties);
filters.add(filter);
}

@VisibleForTesting
protected Map<String, FILTER_TYPE> getAvailableFilters(Class<FILTER_TYPE> filterTypeClass) {
ServiceLoader<FILTER_TYPE> loader = ServiceLoader.load(filterTypeClass);
Map<String, FILTER_TYPE> allFilters =
StreamSupport.stream(loader.spliterator(), false)
.collect(Collectors.toMap(FilterInterceptor::getIdentifier, f -> f));
return allFilters;
}

@VisibleForTesting
List<FILTER_TYPE> getFilters() {
return Collections.unmodifiableList(filters);
}
}
Loading

0 comments on commit 1a7cccb

Please sign in to comment.