Skip to content

Commit

Permalink
Make AddFile extend RowBackedAction. Add tests for equals and hashCode.
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuandong-db committed Dec 30, 2024
1 parent 058587a commit 9552e1e
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ static CloseableIterator<Row> generateAppendActions(
if (isIcebergCompatV2Enabled) {
IcebergCompatV2Utils.validDataFileStatus(dataFileStatus);
}
Row addFileRow =
AddFile addFileRow =
AddFile.convertDataFileStatus(
tableRoot,
dataFileStatus,
((DataWriteContextImpl) dataWriteContext).getPartitionValues(),
true /* dataChange */);
return SingleAction.createAddFileSingleAction(addFileRow);
return SingleAction.createAddFileSingleAction(addFileRow.toRow());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.VectorUtils.toJavaMap;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.MapValue;
Expand All @@ -34,7 +36,7 @@
import java.util.stream.IntStream;

/** Delta log action representing an `AddFile` */
public class AddFile {
public class AddFile extends RowBackedAction {

/* We conditionally read this field based on the query filter */
private static final StructField JSON_STATS_FIELD =
Expand Down Expand Up @@ -76,42 +78,78 @@ public class AddFile {
.collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i));

/**
* Utility to generate `AddFile` row from the given {@link DataFileStatus} and partition values.
* Utility to generate {@link AddFile} action instance from the given {@link DataFileStatus} and
* partition values.
*/
public static Row convertDataFileStatus(
public static AddFile convertDataFileStatus(
URI tableRoot,
DataFileStatus dataFileStatus,
Map<String, Literal> partitionValues,
boolean dataChange) {
Path filePath = new Path(dataFileStatus.getPath());
Map<Integer, Object> valueMap = new HashMap<>();
valueMap.put(
COL_NAME_TO_ORDINAL.get("path"), relativizePath(filePath, tableRoot).toUri().toString());
valueMap.put(
COL_NAME_TO_ORDINAL.get("partitionValues"), serializePartitionMap(partitionValues));
valueMap.put(COL_NAME_TO_ORDINAL.get("size"), dataFileStatus.getSize());
valueMap.put(COL_NAME_TO_ORDINAL.get("modificationTime"), dataFileStatus.getModificationTime());
valueMap.put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);
if (dataFileStatus.getStatistics().isPresent()) {
valueMap.put(
COL_NAME_TO_ORDINAL.get("stats"), dataFileStatus.getStatistics().get().serializeAsJson());
}
// any fields not present in the valueMap are considered null
return new GenericRow(FULL_SCHEMA, valueMap);

return new AddFile(
relativizePath(filePath, tableRoot).toUri().toString(),
serializePartitionMap(partitionValues),
dataFileStatus.getSize(),
dataFileStatus.getModificationTime(),
dataChange,
Optional.empty(), // deletionVector
Optional.empty(), // tags
Optional.empty(), // baseRowId
Optional.empty(), // defaultRowCommitVersion
dataFileStatus.getStatistics());
}

/** Constructs an {@link AddFile} action from the given 'AddFile' {@link Row}. */
public AddFile(Row row) {
super(row);
}

/**
* The underlying {@link Row} that represents an 'AddFile' action and contains all its field
* values. Can be either a {@link GenericRow} or a {@link DelegateRow}.
* Constructs an {@link AddFile} action from the given fields. Internally, this creates a new
* {@link GenericRow} with the given fields and return a new {@link AddFile} instance backed by
* it.
*/
private final Row row;
public AddFile(
String path,
MapValue partitionValues,
long size,
long modificationTime,
boolean dataChange,
Optional<DeletionVectorDescriptor> deletionVector,
Optional<MapValue> tags,
Optional<Long> baseRowId,
Optional<Long> defaultRowCommitVersion,
Optional<DataFileStatistics> stats) {
super(
new GenericRow(
FULL_SCHEMA,
new HashMap<Integer, Object>() {
{
// TODO - Add support for DeletionVectorDescriptor
checkArgument(
!deletionVector.isPresent(),
"DeletionVectorDescriptor currently doesn't have a way to convert to a Row");

public AddFile(Row row) {
this.row = row;
put(COL_NAME_TO_ORDINAL.get("path"), path);
put(COL_NAME_TO_ORDINAL.get("partitionValues"), partitionValues);
put(COL_NAME_TO_ORDINAL.get("size"), size);
put(COL_NAME_TO_ORDINAL.get("modificationTime"), modificationTime);
put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);
tags.ifPresent(tags -> put(COL_NAME_TO_ORDINAL.get("tags"), tags));
baseRowId.ifPresent(id -> put(COL_NAME_TO_ORDINAL.get("baseRowId"), id));
defaultRowCommitVersion.ifPresent(
version -> put(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"), version));
stats.ifPresent(
stats -> put(COL_NAME_TO_ORDINAL.get("stats"), stats.serializeAsJson()));
}
}));
}

public Row toRow() {
return row;
@Override
protected StructType getFullSchema() {
return FULL_SCHEMA;
}

public String getPath() {
Expand Down Expand Up @@ -191,7 +229,7 @@ public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) {
@Override
public String toString() {
// Convert the partitionValues and tags to Java Maps and make them sorted by key.
Map<String, String> partitionValuesJavaMap = VectorUtils.toJavaMap(getPartitionValues());
Map<String, String> partitionValuesJavaMap = toJavaMap(getPartitionValues());
Map<String, String> sortedPartitionValuesJavaMap = new TreeMap<>(partitionValuesJavaMap);

Optional<Map<String, String>> tagsJavaMap = getTags().map(VectorUtils::toJavaMap);
Expand All @@ -218,30 +256,38 @@ public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof AddFile)) return false;
AddFile other = (AddFile) obj;

// MapValue and DataFileStatistics don't implement equals(), so we need to convert
// partitionValues and tags to Java Maps, and stats to strings to compare them
return getSize() == other.getSize()
&& getModificationTime() == other.getModificationTime()
&& getDataChange() == other.getDataChange()
&& Objects.equals(getPath(), other.getPath())
&& Objects.equals(getPartitionValues(), other.getPartitionValues())
&& Objects.equals(toJavaMap(getPartitionValues()), toJavaMap(other.getPartitionValues()))
&& Objects.equals(getDeletionVector(), other.getDeletionVector())
&& Objects.equals(getTags(), other.getTags())
&& Objects.equals(
getTags().map(VectorUtils::toJavaMap), other.getTags().map(VectorUtils::toJavaMap))
&& Objects.equals(getBaseRowId(), other.getBaseRowId())
&& Objects.equals(getDefaultRowCommitVersion(), other.getDefaultRowCommitVersion())
&& Objects.equals(getStats(), other.getStats());
&& Objects.equals(
getStats().map(DataFileStatistics::toString),
other.getStats().map(DataFileStatistics::toString));
}

@Override
public int hashCode() {
// MapValue and DataFileStatistics don't implement hashCode(), so we need to convert
// partitionValues and tags to Java Maps, and stats to strings to compute the hash code
return Objects.hash(
getPath(),
getPartitionValues(),
toJavaMap(getPartitionValues()),
getSize(),
getModificationTime(),
getDataChange(),
getDeletionVector(),
getTags(),
getTags().map(VectorUtils::toJavaMap),
getBaseRowId(),
getDefaultRowCommitVersion(),
getStats());
getStats().map(DataFileStatistics::toString));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public class RemoveFile {
.add("size", LongType.LONG, true /* nullable*/)
.add("stats", StringType.STRING, true /* nullable */)
.add("tags", new MapType(StringType.STRING, StringType.STRING, true), true /* nullable */)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */);
// There are more fields which are added when row-id tracking is enabled. When Kernel
// starts supporting row-ids, we should add those fields here.
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */)
.add("baseRowId", LongType.LONG, true /* nullable */)
.add("defaultRowCommitVersion", LongType.LONG, true /* nullable */);
// TODO: Currently, Kernel doesn't create RemoveFile actions internally, nor provides APIs for
// connectors to generate and commit them. Once we have the need for this, we should ensure
// that the baseRowId and defaultRowCommitVersion fields of RemoveFile actions are correctly
// populated to match the corresponding AddFile actions.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.data.Row;
import io.delta.kernel.types.StructType;

/**
* Base abstract class for Delta Log actions that are backed by a {@link Row}.This abstraction is
* used by actions like {@link AddFile}, where we want to avoid materializing all fields from the
* row when creating an action instance. For these actions, we only maintain a reference to the
* underlying row, and the getters retrieve values directly from it.
*/
public abstract class RowBackedAction {

/** The underlying {@link Row} that represents an action and contains all its field values. */
protected final Row row;

protected RowBackedAction(Row row) {
checkArgument(
row.getSchema().equals(getFullSchema()),
"Expected row schema: %s, found: %s",
getFullSchema(),
row.getSchema());

this.row = row;
}

/** Returns the full schema of the row that represents this action. */
protected abstract StructType getFullSchema();

public Row toRow() {
return row;
}
}
Loading

0 comments on commit 9552e1e

Please sign in to comment.