Skip to content

Commit

Permalink
Make row private in RowBackedAction
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuandong-db committed Jan 2, 2025
1 parent 479c151 commit a9c0db3
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.VectorUtils.toJavaMap;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.data.DelegateRow;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatistics;
import io.delta.kernel.utils.DataFileStatus;
import java.net.URI;
import java.util.*;
import java.util.stream.IntStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;

/** Delta log action representing an `AddFile` */
public class AddFile extends RowBackedAction {
Expand Down Expand Up @@ -72,11 +73,6 @@ public class AddFile extends RowBackedAction {
// There are more fields which are added when row-id tracking and clustering is enabled.
// When Kernel starts supporting row-ids and clustering, we should add those fields here.

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, FULL_SCHEMA.length())
.boxed()
.collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i));

/**
* Utility to generate {@link AddFile} action instance from the given {@link DataFileStatus} and
* partition values.
Expand All @@ -86,32 +82,24 @@ public static AddFile convertDataFileStatus(
DataFileStatus dataFileStatus,
Map<String, Literal> partitionValues,
boolean dataChange) {
Path filePath = new Path(dataFileStatus.getPath());

return new AddFile(
relativizePath(filePath, tableRoot).toUri().toString(),
serializePartitionMap(partitionValues),
dataFileStatus.getSize(),
dataFileStatus.getModificationTime(),
dataChange,
Optional.empty(), // deletionVector
Optional.empty(), // tags
Optional.empty(), // baseRowId
Optional.empty(), // defaultRowCommitVersion
dataFileStatus.getStatistics());
Row row =
createAddFileRow(
relativizePath(new Path(dataFileStatus.getPath()), tableRoot).toUri().toString(),
serializePartitionMap(partitionValues),
dataFileStatus.getSize(),
dataFileStatus.getModificationTime(),
dataChange,
Optional.empty(), // deletionVector
Optional.empty(), // tags
Optional.empty(), // baseRowId
Optional.empty(), // defaultRowCommitVersion
dataFileStatus.getStatistics());

return new AddFile(row);
}

/** Constructs an {@link AddFile} action from the given 'AddFile' {@link Row}. */
public AddFile(Row row) {
super(row);
}

/**
* Constructs an {@link AddFile} action from the given fields. Internally, this creates a new
* {@link GenericRow} with the given fields and return a new {@link AddFile} instance backed by
* it.
*/
public AddFile(
/** Utility to generate an 'AddFile' row from the given fields. */
public static Row createAddFileRow(
String path,
MapValue partitionValues,
long size,
Expand All @@ -122,29 +110,30 @@ public AddFile(
Optional<Long> baseRowId,
Optional<Long> defaultRowCommitVersion,
Optional<DataFileStatistics> stats) {
super(
new GenericRow(
FULL_SCHEMA,
new HashMap<Integer, Object>() {
{
// TODO - Add support for DeletionVectorDescriptor
checkArgument(
!deletionVector.isPresent(),
"DeletionVectorDescriptor currently doesn't have a way to convert to a Row");

put(COL_NAME_TO_ORDINAL.get("path"), path);
put(COL_NAME_TO_ORDINAL.get("partitionValues"), partitionValues);
put(COL_NAME_TO_ORDINAL.get("size"), size);
put(COL_NAME_TO_ORDINAL.get("modificationTime"), modificationTime);
put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);
tags.ifPresent(tags -> put(COL_NAME_TO_ORDINAL.get("tags"), tags));
baseRowId.ifPresent(id -> put(COL_NAME_TO_ORDINAL.get("baseRowId"), id));
defaultRowCommitVersion.ifPresent(
version -> put(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"), version));
stats.ifPresent(
stats -> put(COL_NAME_TO_ORDINAL.get("stats"), stats.serializeAsJson()));
}
}));

checkArgument(path != null, "path is not nullable");
checkArgument(partitionValues != null, "partitionValues is not nullable");
// TODO - Add support for DeletionVectorDescriptor
checkArgument(!deletionVector.isPresent(), "DeletionVectorDescriptor is unsupported");

Map<Integer, Object> fieldMap = new HashMap<>();
fieldMap.put(FULL_SCHEMA.indexOf("path"), path);
fieldMap.put(FULL_SCHEMA.indexOf("partitionValues"), partitionValues);
fieldMap.put(FULL_SCHEMA.indexOf("size"), size);
fieldMap.put(FULL_SCHEMA.indexOf("modificationTime"), modificationTime);
fieldMap.put(FULL_SCHEMA.indexOf("dataChange"), dataChange);
tags.ifPresent(tag -> fieldMap.put(FULL_SCHEMA.indexOf("tags"), tag));
baseRowId.ifPresent(id -> fieldMap.put(FULL_SCHEMA.indexOf("baseRowId"), id));
defaultRowCommitVersion.ifPresent(
version -> fieldMap.put(FULL_SCHEMA.indexOf("defaultRowCommitVersion"), version));
stats.ifPresent(stat -> fieldMap.put(FULL_SCHEMA.indexOf("stats"), stat.serializeAsJson()));

return new GenericRow(FULL_SCHEMA, fieldMap);
}

/** Constructs an {@link AddFile} action from the given 'AddFile' {@link Row}. */
public AddFile(Row row) {
super(row);
}

@Override
Expand All @@ -153,77 +142,60 @@ protected StructType getFullSchema() {
}

public String getPath() {
return row.getString(COL_NAME_TO_ORDINAL.get("path"));
return (String) getValueAsObject("path");
}

public MapValue getPartitionValues() {
return row.getMap(COL_NAME_TO_ORDINAL.get("partitionValues"));
return (MapValue) getValueAsObject("partitionValues");
}

public long getSize() {
return row.getLong(COL_NAME_TO_ORDINAL.get("size"));
return (long) getValueAsObject("size");
}

public long getModificationTime() {
return row.getLong(COL_NAME_TO_ORDINAL.get("modificationTime"));
return (long) getValueAsObject("modificationTime");
}

public boolean getDataChange() {
return row.getBoolean(COL_NAME_TO_ORDINAL.get("dataChange"));
return (boolean) getValueAsObject("dataChange");
}

public Optional<DeletionVectorDescriptor> getDeletionVector() {
int ordinal = COL_NAME_TO_ORDINAL.get("deletionVector");
return Optional.ofNullable(
row.isNullAt(ordinal) ? null : DeletionVectorDescriptor.fromRow(row.getStruct(ordinal)));
return Optional.ofNullable((Row) getValueAsObject("deletionVector"))
.map(DeletionVectorDescriptor::fromRow);
}

public Optional<MapValue> getTags() {
int ordinal = COL_NAME_TO_ORDINAL.get("tags");
return Optional.ofNullable(row.isNullAt(ordinal) ? null : row.getMap(ordinal));
return Optional.ofNullable((MapValue) getValueAsObject("tags"));
}

public Optional<Long> getBaseRowId() {
int ordinal = COL_NAME_TO_ORDINAL.get("baseRowId");
return Optional.ofNullable(row.isNullAt(ordinal) ? null : row.getLong(ordinal));
return Optional.ofNullable((Long) getValueAsObject("baseRowId"));
}

public Optional<Long> getDefaultRowCommitVersion() {
int ordinal = COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion");
return Optional.ofNullable(row.isNullAt(ordinal) ? null : row.getLong(ordinal));
return Optional.ofNullable((Long) getValueAsObject("defaultRowCommitVersion"));
}

public Optional<DataFileStatistics> getStats() {
int ordinal = COL_NAME_TO_ORDINAL.get("stats");
return Optional.ofNullable(
row.isNullAt(ordinal)
? null
: DataFileStatistics.deserializeFromJson(row.getString(ordinal)).orElse(null));
return Optional.ofNullable((String) getValueAsObject("stats"))
.flatMap(DataFileStatistics::deserializeFromJson);
}

public Optional<Long> getNumRecords() {
return this.getStats().map(DataFileStatistics::getNumRecords);
return getStats().map(DataFileStatistics::getNumRecords);
}

/**
* Returns a new {@link AddFile} with the provided baseRowId. Under the hood, this is achieved by
* creating a new {@link DelegateRow} with the baseRowId overridden.
*/
/** Returns a new {@link AddFile} with the provided baseRowId. */
public AddFile withNewBaseRowId(long baseRowId) {
Map<Integer, Object> overrides =
Collections.singletonMap(COL_NAME_TO_ORDINAL.get("baseRowId"), baseRowId);
return new AddFile(new DelegateRow(row, overrides));
return new AddFile(createRowWithOverriddenValue("baseRowId", baseRowId));
}

/**
* Returns a new {@link AddFile} with the provided defaultRowCommitVersion. Under the hood, this
* is achieved by creating a new {@link DelegateRow} with the defaultRowCommitVersion overridden.
*/
/** Returns a new {@link AddFile} with the provided defaultRowCommitVersion. */
public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) {
Map<Integer, Object> overrides =
Collections.singletonMap(
COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"), defaultRowCommitVersion);
return new AddFile(new DelegateRow(row, overrides));
return new AddFile(
createRowWithOverriddenValue("defaultRowCommitVersion", defaultRowCommitVersion));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.data.Row;
import io.delta.kernel.types.StructType;
import io.delta.kernel.internal.data.DelegateRow;
import io.delta.kernel.types.*;
import java.util.Collections;
import java.util.Map;

/**
* Base abstract class for Delta Log actions that are backed by a {@link Row}.This abstraction is
* used by actions like {@link AddFile}, where we want to avoid materializing all fields from the
* row when creating an action instance. For these actions, we only maintain a reference to the
* underlying row, and the getters retrieve values directly from it.
* An abstract base class for Delta Log actions that are backed by a {@link Row}. This design is to
* avoid materialization of all fields when creating action instances from action rows within
* Kernel. Actions like {@link AddFile} can extend this class to maintain just a reference to the
* underlying action row. Subclasses can:
*
* <ul>
* <li>Access field values directly from the row via {@link #getValueAsObject}
* <li>Create new action instances with updated fields via {@link #createRowWithOverriddenValue}
* </ul>
*/
public abstract class RowBackedAction {

/** The underlying {@link Row} that represents an action and contains all its field values. */
protected final Row row;
private final Row row;

protected RowBackedAction(Row row) {
checkArgument(
Expand All @@ -44,7 +53,76 @@ protected RowBackedAction(Row row) {
/** Returns the full schema of the row that represents this action. */
protected abstract StructType getFullSchema();

public Row toRow() {
/**
* Returns the index of the field with the given name in the full schema of the row. Throws an
* {@link IllegalArgumentException} if the field is not found.
*/
protected int getFieldIndex(String fieldName) {
int index = getFullSchema().indexOf(fieldName);
checkArgument(index >= 0, "Field '%s' not found in schema: %s", fieldName, getFullSchema());
return index;
}

/**
* Gets the value with the given field name from the row. The type of the Object returned depends
* on the data type of the field. If the field is nullable and the value is null, this method will
* return null. If the field is not nullable and the value is null, this method will throw an
* {@link IllegalArgumentException}.
*/
protected Object getValueAsObject(String fieldName) {
StructField field = getFullSchema().get(fieldName);
int ordinal = getFieldIndex(fieldName);

if (!field.isNullable()) {
requireNonNull(row, ordinal, field.getName());
} else if (row.isNullAt(ordinal)) {
return null;
}

DataType dataType = field.getDataType();
if (dataType instanceof BooleanType) {
return row.getBoolean(ordinal);
} else if (dataType instanceof ByteType) {
return row.getByte(ordinal);
} else if (dataType instanceof ShortType) {
return row.getShort(ordinal);
} else if (dataType instanceof IntegerType) {
return row.getInt(ordinal);
} else if (dataType instanceof LongType) {
return row.getLong(ordinal);
} else if (dataType instanceof FloatType) {
return row.getFloat(ordinal);
} else if (dataType instanceof DoubleType) {
return row.getDouble(ordinal);
} else if (dataType instanceof StringType) {
return row.getString(ordinal);
} else if (dataType instanceof DecimalType) {
return row.getDecimal(ordinal);
} else if (dataType instanceof BinaryType) {
return row.getBinary(ordinal);
} else if (dataType instanceof StructType) {
return row.getStruct(ordinal);
} else if (dataType instanceof ArrayType) {
return row.getArray(ordinal);
} else if (dataType instanceof MapType) {
return row.getMap(ordinal);
} else {
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType.getClass().getName());
}
}

/**
* Creates a new {@link Row} with the same schema and values as the row backing this action, but
* with the value of the field with the given name overridden by the given value.
*/
protected Row createRowWithOverriddenValue(String fieldName, Object value) {
Map<Integer, Object> overrides = Collections.singletonMap(getFieldIndex(fieldName), value);
return new DelegateRow(row, overrides);
}

/** Returns the underlying {@link Row} that represents this action. */
public final Row toRow() {
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package io.delta.kernel.internal.data;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.*;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* This wraps an existing {@link Row} and allows overriding values for some particular ordinals.
Expand All @@ -41,8 +40,8 @@ public class DelegateRow implements Row {
private final Map<Integer, Object> overrides;

public DelegateRow(Row row, Map<Integer, Object> overrides) {
requireNonNull(row, "row is null");
requireNonNull(overrides, "map of overrides is null");
Objects.requireNonNull(row, "row is null");
Objects.requireNonNull(overrides, "map of overrides is null");

if (row instanceof DelegateRow) {
// If the row is already a delegation of another row, we merge the overrides and keep only
Expand Down Expand Up @@ -198,7 +197,7 @@ private void throwIfUnsafeAccess(
if (!expDataType.isAssignableFrom(actualDataType.getClass())) {
String msg =
String.format(
"Trying to access a `%s` value from vector of type `%s`", accessType, actualDataType);
"Fail to access a '%s' value from a field of type '%s'", accessType, actualDataType);
throw new UnsupportedOperationException(msg);
}
}
Expand Down
Loading

0 comments on commit a9c0db3

Please sign in to comment.