Skip to content

Commit

Permalink
Done refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Jan 29, 2025
1 parent c04cb06 commit ef278ab
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.actions;

import io.delta.kernel.annotation.Evolving;
import java.util.Set;

/**
* Interface for Protocol actions in Delta.
*
* <p>See <a
* href=https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution>Protocol</a>
* for more details.
*/
@Evolving
public interface AbstractProtocol {

/** The minimum reader version required to read the table. */
int getMinReaderVersion();

/** The minimum writer version required to read the table. */
int getMinWriterVersion();

/** The reader features that need to be supported to read the table. */
Set<String> getReaderFeatures();

/** The writer features that need to be supported to write the table. */
Set<String> getWriterFeatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class TableFeatures {

public static final String INVARIANTS_FEATURE_NAME = "invariants";

/** The minimum reader version required to support table features. */
public static final int TABLE_FEATURES_MIN_READER_VERSION = 3;

/** The minimum writer version required to support table features. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

Expand All @@ -82,7 +85,7 @@ public static void validateReadSupportedTable(
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
Set<String> readerFeatures = protocol.getReaderFeatures();
if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) {
Set<String> unsupportedFeatures = new HashSet<>(readerFeatures);
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
Expand Down Expand Up @@ -194,8 +197,7 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
Metadata metadata, Protocol protocol) {
return TableFeatures.SUPPORTED_WRITER_FEATURES.stream()
.filter(f -> metadataRequiresWriterFeatureToBeEnabled(metadata, f))
.filter(
f -> protocol.getWriterFeatures() == null || !protocol.getWriterFeatures().contains(f))
.filter(f -> !protocol.getWriterFeatures().contains(f))
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -277,11 +279,7 @@ private static void validateNoInvariants(StructType tableSchema) {
}

private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) {
List<String> writerFeatures = protocol.getWriterFeatures();
if (writerFeatures == null) {
return false;
}
return writerFeatures.contains(featureName)
return protocol.getWriterFeatures().contains(featureName)
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ public Transaction build(Engine engine) {
if (!newWriterFeatures.isEmpty()) {
logger.info("Automatically enabling writer features: {}", newWriterFeatures);
shouldUpdateProtocol = true;
List<String> oldWriterFeatures = protocol.getWriterFeatures();
Set<String> oldWriterFeatures = protocol.getWriterFeatures();
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
Set<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol, metadata, metadata.getSchema(), table.getPath(engine));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.TableFeatures.TABLE_FEATURES_MIN_READER_VERSION;
import static io.delta.kernel.internal.TableFeatures.TABLE_FEATURES_MIN_WRITER_VERSION;
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;

import io.delta.kernel.actions.AbstractProtocol;
import io.delta.kernel.data.*;
import io.delta.kernel.internal.TableFeatures;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.util.InternalUtils;
import io.delta.kernel.internal.util.Preconditions;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
Expand All @@ -28,22 +33,32 @@
import io.delta.kernel.types.StructType;
import java.util.*;

public class Protocol {
public class Protocol implements AbstractProtocol {

//////////////////////////////////
// Static variables and methods //
//////////////////////////////////

/**
* Creates a {@link Protocol} object from the given {@link ColumnVector}.
*
* <p>This method should always be used after a {@code protocolVector.isNullAt} check to ensure
* that the column vector is not null at the given row ID. This method never returns null.
*
* @throws IllegalArgumentException if the column vector is null at the given row ID
*/
public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
}
InternalUtils.requireNonNull(vector, rowId, "protocol");

return new Protocol(
vector.getChild(0).getInt(rowId),
vector.getChild(1).getInt(rowId),
vector.getChild(2).isNullAt(rowId)
? Collections.emptyList()
: VectorUtils.toJavaList(vector.getChild(2).getArray(rowId)),
? null
: new HashSet<>(VectorUtils.toJavaList(vector.getChild(2).getArray(rowId))),
vector.getChild(3).isNullAt(rowId)
? Collections.emptyList()
: VectorUtils.toJavaList(vector.getChild(3).getArray(rowId)));
? null
: new HashSet<>(VectorUtils.toJavaList(vector.getChild(3).getArray(rowId))));
}

public static final StructType FULL_SCHEMA =
Expand All @@ -53,35 +68,82 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
.add("readerFeatures", new ArrayType(StringType.STRING, false /* contains null */))
.add("writerFeatures", new ArrayType(StringType.STRING, false /* contains null */));

//////////////////////////////////
// Member variables and methods //
//////////////////////////////////

private final int minReaderVersion;
private final int minWriterVersion;
private final List<String> readerFeatures;
private final List<String> writerFeatures;
private final Set<String> readerFeatures;
private final Set<String> writerFeatures;

public Protocol(
int minReaderVersion,
int minWriterVersion,
List<String> readerFeatures,
List<String> writerFeatures) {
Set<String> nullableReaderFeatures,
Set<String> nullableWriterFeatures) {
Preconditions.checkArgument(minReaderVersion >= 1, "minReaderVersion must be >= 1");
Preconditions.checkArgument(minWriterVersion >= 1, "minWriterVersion must be >= 1");

this.minReaderVersion = minReaderVersion;
this.minWriterVersion = minWriterVersion;
this.readerFeatures = readerFeatures;
this.writerFeatures = writerFeatures;
this.readerFeatures =
nullableReaderFeatures == null
? Collections.emptySet()
: Collections.unmodifiableSet(nullableReaderFeatures);
this.writerFeatures =
nullableWriterFeatures == null
? Collections.emptySet()
: Collections.unmodifiableSet(nullableWriterFeatures);

final boolean supportsReaderFeatures = minReaderVersion >= TABLE_FEATURES_MIN_READER_VERSION;
final boolean supportsWriterFeatures = minWriterVersion >= TABLE_FEATURES_MIN_WRITER_VERSION;

if (!supportsReaderFeatures && !readerFeatures.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"This protocol has minReaderVersion %d but readerFeatures is not empty: %s. "
+ "readerFeatures are only supported with minReaderVersion >= %d.",
minReaderVersion, readerFeatures, TABLE_FEATURES_MIN_READER_VERSION));
}

if (!supportsWriterFeatures && !writerFeatures.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"This protocol has minWriterVersion %d but writerFeatures is not empty: %s. "
+ "writerFeatures are only supported with minWriterVersion >= %d.",
minWriterVersion, writerFeatures, TABLE_FEATURES_MIN_WRITER_VERSION));
}

if (supportsReaderFeatures && !supportsWriterFeatures) {
throw new IllegalArgumentException(
String.format(
"This protocol has minReaderVersion %d but minWriterVersion %d. "
+ "When minReaderVersion is >= %d, minWriterVersion must be >= %d.",
minReaderVersion,
minWriterVersion,
TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION));
}
}

@Override
public int getMinReaderVersion() {
return minReaderVersion;
}

@Override
public int getMinWriterVersion() {
return minWriterVersion;
}

public List<String> getReaderFeatures() {
@Override
public Set<String> getReaderFeatures() {
return readerFeatures;
}

public List<String> getWriterFeatures() {
@Override
public Set<String> getWriterFeatures() {
return writerFeatures;
}

Expand All @@ -102,26 +164,35 @@ public String toString() {
* @return {@link Row} object with the schema {@link Protocol#FULL_SCHEMA}
*/
public Row toRow() {
Map<Integer, Object> protocolMap = new HashMap<>();
final Map<Integer, Object> protocolMap = new HashMap<>();
protocolMap.put(0, minReaderVersion);
protocolMap.put(1, minWriterVersion);
protocolMap.put(2, stringArrayValue(readerFeatures));
protocolMap.put(3, stringArrayValue(writerFeatures));

// readerFeatures can only exist in the serialized protocol action when minReaderVersion >= 3
protocolMap.put(
2,
minReaderVersion >= TABLE_FEATURES_MIN_READER_VERSION
? stringArrayValue(new ArrayList<>(readerFeatures))
: null);

// writerFeatures can only exist in the serialized protocol action when minWriterVersion >= 7
protocolMap.put(
3,
minWriterVersion >= TABLE_FEATURES_MIN_WRITER_VERSION
? stringArrayValue(new ArrayList<>(writerFeatures))
: null);

return new GenericRow(Protocol.FULL_SCHEMA, protocolMap);
}

public Protocol withNewWriterFeatures(Set<String> writerFeatures) {
Tuple2<Integer, Integer> newProtocolVersions =
TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(writerFeatures);
List<String> newWriterFeatures = new ArrayList<>(writerFeatures);
if (this.writerFeatures != null) {
newWriterFeatures.addAll(this.writerFeatures);
}
public Protocol withNewWriterFeatures(Set<String> newWriterFeatures) {
final Tuple2<Integer, Integer> newProtocolVersions =
TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(newWriterFeatures);
final Set<String> allNewWriterFeatures = new HashSet<>();
allNewWriterFeatures.addAll(writerFeatures);
allNewWriterFeatures.addAll(newWriterFeatures);

return new Protocol(
newProtocolVersions._1,
newProtocolVersions._2,
this.readerFeatures == null ? null : new ArrayList<>(this.readerFeatures),
newWriterFeatures);
newProtocolVersions._1, newProtocolVersions._2, readerFeatures, allNewWriterFeatures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ class TableFeaturesSuite extends AnyFunSuite {
def createTestProtocol(minWriterVersion: Int, writerFeatures: String*): Protocol = {
new Protocol(
// minReaderVersion - it doesn't matter as the read fails anyway before the writer check
0,
1,
minWriterVersion,
// reader features - it doesn't matter as the read fails anyway before the writer check
Collections.emptyList(),
writerFeatures.toSeq.asJava
Collections.emptySet(),
writerFeatures.toSet.asJava
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal.actions

import org.scalatest.funsuite.AnyFunSuite

import scala.collection.JavaConverters._

class ProtocolSuite extends AnyFunSuite {

test("instantiation -- bad minReaderVersion should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(0, 1, null, null)
}.getMessage

assert(exMsg === "minReaderVersion must be >= 1")
}

test("instantiation -- bad minWriterVersion should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(1, 0, null, null)
}.getMessage

assert(exMsg === "minWriterVersion must be >= 1")
}

test("instantiation -- minReaderVersion < 3 but readerFeatures non-empty should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(1, 1, Set("columnMapping").asJava, null)
}.getMessage

assert(exMsg === "This protocol has minReaderVersion 1 but readerFeatures is not " +
"empty: [columnMapping]. readerFeatures are only supported with minReaderVersion >= 3.")
}

test("instantiation -- minWriterVersion < 7 but writerFeatures non-empty should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(1, 1, null, Set("appendOnly").asJava)
}.getMessage

assert(exMsg === "This protocol has minWriterVersion 1 but writerFeatures is not " +
"empty: [appendOnly]. writerFeatures are only supported with minWriterVersion >= 7.")
}

test("instantiation -- minReaderVersion >= 3 but minWriterVersion < 7 should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(3, 6, null, null)
}.getMessage

assert(exMsg === "This protocol has minReaderVersion 3 but minWriterVersion 6. When " +
"minReaderVersion is >= 3, minWriterVersion must be >= 7.")
}

}
Loading

0 comments on commit ef278ab

Please sign in to comment.