Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Allow customization of conversion to relative paths #324

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions api/src/main/java/io/onetable/spi/filter/FilterInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.spi.filter;

import java.util.Map;

/**
* A {@link FilterInterceptor} is the filter component of the Intercepting Filter design pattern. It
* is used to apply a transformation or to instrument a given input. Concrete implementations of
* this interface are injected (intercept) in the table format translation workflow to customize the
* behavior of the default translation workflow, or to generate logs or metrics.
*
* <p>E.g. of filters 1) a filter to convert absolute paths of data files to relative paths 2) a
* filter to emit metrics for the number of data files in a snapshot 3) validate the table metadata
* and throw an exception if certain conditions are not met
*
* <p>A filter to generate relative paths may be needed by certain query planners. In such a case
* the user would inject such a filter before data files are written in the target format. Different
* users may need to integrate with different metric collectors. A user can inject a specific filter
* and limit the number of dependencies in the core code.
*
* <p>As such, the filter is a tool to customize the behavior of the table format translation
* workflow.
*/
public interface FilterInterceptor<T> {
/**
* Each filter is identifiable by a name. This name can be used in config files to specify which
* filters to apply at runtime.
*
* @return the identifier of this filter.
*/
String getIdentifier();

/**
* Initialize the filter with the given properties.
*
* @param properties the properties map to initialize the filter.
*/
void init(Map<String, String> properties);

/**
* Apply the filter to the given input. Note that this method may alter the input.
*
* @param input the input to apply the filter to.
* @return the transformed input.
*/
T apply(T input);

/** Close the filter and release any resources. */
default void close() {}
}
32 changes: 32 additions & 0 deletions api/src/main/java/io/onetable/spi/filter/SnapshotFilesFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.spi.filter;

import java.util.List;

import io.onetable.model.storage.OneDataFile;
import io.onetable.spi.sync.TargetClient;

/**
* This extension of the {@link FilterInterceptor} interface is used to apply a filter or
* transformation on a list of {@link OneDataFile}s. Concrete implementation of this interface would
* typically be applied by a {@link TargetClient} before writing a list of {@link OneDataFile}s
* related to a specific snapshot.
*/
public interface SnapshotFilesFilter extends FilterInterceptor<List<OneDataFile>> {}
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.1.1</version>
</dependency>


<!-- Avro -->
<dependency>
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/java/io/onetable/delta/DeltaActionsConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public OneDataFile convertAddActionToOneDataFile(
boolean includeColumnStats,
DeltaPartitionExtractor partitionExtractor,
DeltaStatsExtractor fileStatsExtractor) {
String tableBasePath = deltaSnapshot.deltaLog().dataPath().toUri().toString();
List<ColumnStat> columnStats =
includeColumnStats
? fileStatsExtractor.getColumnStatsForFile(addFile, fields)
Expand All @@ -64,7 +63,7 @@ public OneDataFile convertAddActionToOneDataFile(
columnStats.stream().map(ColumnStat::getNumValues).max(Long::compareTo).orElse(0L);
// TODO(https://github.com/onetable-io/onetable/issues/102): removed record count.
return OneDataFile.builder()
.physicalPath(getFullPathToFile(tableBasePath, addFile.path()))
.physicalPath(getFullPathToFile(deltaSnapshot, addFile.path()))
.fileFormat(fileFormat)
.fileSizeBytes(addFile.size())
.lastModified(addFile.modificationTime())
Expand All @@ -81,9 +80,8 @@ public OneDataFile convertRemoveActionToOneDataFile(
FileFormat fileFormat,
List<OnePartitionField> partitionFields,
DeltaPartitionExtractor partitionExtractor) {
String tableBasePath = deltaSnapshot.deltaLog().dataPath().toUri().toString();
return OneDataFile.builder()
.physicalPath(getFullPathToFile(tableBasePath, removeFile.path()))
.physicalPath(getFullPathToFile(deltaSnapshot, removeFile.path()))
.fileFormat(fileFormat)
.partitionValues(
partitionExtractor.partitionValueExtraction(
Expand All @@ -101,10 +99,11 @@ public FileFormat convertToOneTableFileFormat(String provider) {
String.format("delta file format %s is not recognized", provider));
}

private String getFullPathToFile(String tableBasePath, String path) {
if (path.startsWith(tableBasePath)) {
return path;
static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
String tableBasePath = snapshot.deltaLog().dataPath().toUri().toString();
if (dataFilePath.startsWith(tableBasePath)) {
return dataFilePath;
}
return tableBasePath + Path.SEPARATOR + path;
return tableBasePath + Path.SEPARATOR + dataFilePath;
}
}
48 changes: 40 additions & 8 deletions core/src/main/java/io/onetable/delta/DeltaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
package io.onetable.delta;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -55,13 +51,16 @@

import io.onetable.client.PerTableConfig;
import io.onetable.exception.NotSupportedException;
import io.onetable.filter.FilterManager;
import io.onetable.model.OneTable;
import io.onetable.model.OneTableMetadata;
import io.onetable.model.schema.OnePartitionField;
import io.onetable.model.schema.OneSchema;
import io.onetable.model.storage.OneDataFile;
import io.onetable.model.storage.OneDataFilesDiff;
import io.onetable.model.storage.OneFileGroup;
import io.onetable.model.storage.TableFormat;
import io.onetable.spi.filter.SnapshotFilesFilter;
import io.onetable.spi.sync.TargetClient;

public class DeltaClient implements TargetClient {
Expand All @@ -78,6 +77,8 @@ public class DeltaClient implements TargetClient {
private int logRetentionInHours;
private TransactionState transactionState;

private FilterManager<SnapshotFilesFilter, List<OneDataFile>> snapshotFileFilterManager;

public DeltaClient() {}

public DeltaClient(PerTableConfig perTableConfig, SparkSession sparkSession) {
Expand Down Expand Up @@ -130,6 +131,17 @@ private void _init(
this.deltaLog = deltaLog;
this.tableName = tableName;
this.logRetentionInHours = logRetentionInHours;

initializeFilters(tableDataPath);
}

@VisibleForTesting
protected void initializeFilters(String tableDataPath) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add test to validate customization

snapshotFileFilterManager =
new FilterManager<>(
Collections.singletonMap("ToRelativePathFilter.basePath", tableDataPath));
// snapshotFileFilterManager.loadFilters(
// SnapshotFilesFilter.class, Collections.singleton("ToRelativePathFilter"));
}

@Override
Expand Down Expand Up @@ -179,16 +191,36 @@ public void syncMetadata(OneTableMetadata metadata) {

@Override
public void syncFilesForSnapshot(List<OneFileGroup> partitionedDataFiles) {
List<OneFileGroup> filteredFileGroups =
partitionedDataFiles.stream()
.map(
group -> {
List<OneDataFile> filteredFiles =
snapshotFileFilterManager.process(group.getFiles());
return OneFileGroup.builder()
.partitionValues(group.getPartitionValues())
.files(filteredFiles)
.build();
})
.filter(group -> !group.getFiles().isEmpty())
.collect(Collectors.toList());

transactionState.setActions(
dataFileUpdatesExtractor.applySnapshot(
deltaLog, partitionedDataFiles, transactionState.getLatestSchemaInternal()));
deltaLog, filteredFileGroups, transactionState.getLatestSchemaInternal()));
}

@Override
public void syncFilesForDiff(OneDataFilesDiff oneDataFilesDiff) {
List<OneDataFile> filesAdded = new ArrayList<>(oneDataFilesDiff.getFilesAdded());
filesAdded = snapshotFileFilterManager.process(filesAdded);
List<OneDataFile> filesRemoved = new ArrayList<>(oneDataFilesDiff.getFilesRemoved());
filesRemoved = snapshotFileFilterManager.process(filesRemoved);
OneDataFilesDiff filteredOneDataFilesDiff = OneDataFilesDiff.from(filesAdded, filesRemoved);

transactionState.setActions(
dataFileUpdatesExtractor.applyDiff(
oneDataFilesDiff,
filteredOneDataFilesDiff,
transactionState.getLatestSchemaInternal(),
deltaLog.dataPath().toString()));
}
Expand Down
15 changes: 0 additions & 15 deletions core/src/main/java/io/onetable/delta/DeltaDataFileExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,6 @@ public class DeltaDataFileExtractor {
@Builder.Default
private final DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance();

/**
* Initializes an iterator for Delta Lake files. This should only be used when column stats are
* not required.
*
* @return Delta table file iterator, files returned do not have column stats set to reduce memory
* overhead
*/
public DataFileIterator iteratorWithoutStats(Snapshot deltaSnapshot, OneSchema schema) {
return new DeltaDataFileIterator(deltaSnapshot, schema, false);
}

/**
* Initializes an iterator for Delta Lake files.
*
Expand All @@ -70,8 +59,6 @@ public class DeltaDataFileIterator implements DataFileIterator {
private final List<OneField> fields;
private final List<OnePartitionField> partitionFields;
private final Iterator<OneDataFile> dataFilesIterator;
private final String tableBasePath;
private final boolean includeColumnStats;

private DeltaDataFileIterator(Snapshot snapshot, OneSchema schema, boolean includeColumnStats) {
this.fileFormat =
Expand All @@ -80,8 +67,6 @@ private DeltaDataFileIterator(Snapshot snapshot, OneSchema schema, boolean inclu
this.partitionFields =
partitionExtractor.convertFromDeltaPartitionFormat(
schema, snapshot.metadata().partitionSchema());
this.tableBasePath = snapshot.deltaLog().dataPath().toUri().toString();
this.includeColumnStats = includeColumnStats;
this.dataFilesIterator =
snapshot.allFiles().collectAsList().stream()
.map(
Expand Down
Loading
Loading