Skip to content

Commit

Permalink
Merge branch 'master' into schema
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas authored Feb 4, 2025
2 parents 2c98fdd + a6db4e0 commit dc59767
Show file tree
Hide file tree
Showing 339 changed files with 16,661 additions and 5,385 deletions.
8 changes: 5 additions & 3 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ The histogram bins correspond to the following ranges:
- Bin 5: [10000, 99999] (files with 10,000-99,999 deleted records)
- Bin 6: [100000, 999999] (files with 100,000-999,999 deleted records)
- Bin 7: [1000000, 9999999] (files with 1,000,000-9,999,999 deleted records)
- Bin 8: [10000000, Int.MaxValue - 1] (files with 10,000,000 to Int.MaxValue-1 deleted records)
- Bin 9: [Int.MaxValue, Long.MaxValue] (files with Int.MaxValue or more deleted records)
- Bin 8: [10000000, 2147483646] (files with 10,000,000 to 2147483646 (i.e. Int.MaxValue-1 in Java) deleted records)
- Bin 9: [2147483647, ∞) (files with 2147483647 or more deleted records)

This histogram allows analyzing the distribution of deleted records across files in a Delta table, which can be useful for monitoring and optimizing deletion patterns.

Expand Down Expand Up @@ -488,6 +488,8 @@ That means specifically that for any commit…
- it is **legal** for the same `path` to occur in an `add` action and a `remove` action, but with two different `dvId`s.
- it is **legal** for the same `path` to be added and/or removed and also occur in a `cdc` action.
- it is **illegal** for the same `path` to be occur twice with different `dvId`s within each set of `add` or `remove` actions.
- it is **illegal** for a `path` to occur in an `add` action that already occurs with a different `dvId` in the list of `add` actions from the snapshot of the version immediately preceeding the commit, unless the commit also contains a remove for the later combination.
- it is **legal** to commit an existing `path` and `dvId` combination again (this allows metadata updates).

The `dataChange` flag on either an `add` or a `remove` can be set to `false` to indicate that an action when combined with other actions in the same atomic version only rearranges existing data or adds new statistics.
For example, streaming queries that are tailing the transaction log can use this flag to skip actions that would not affect the final results.
Expand Down Expand Up @@ -825,7 +827,7 @@ A given snapshot of the table can be computed by replaying the events committed
- A single `metaData` action
- A collection of `txn` actions with unique `appId`s
- A collection of `domainMetadata` actions with unique `domain`s.
- A collection of `add` actions with unique `(path, deletionVector.uniqueId)` keys.
- A collection of `add` actions with unique path keys, corresponding to the newest (path, deletionVector.uniqueId) pair encountered for each path.
- A collection of `remove` actions with unique `(path, deletionVector.uniqueId)` keys. The intersection of the primary keys in the `add` collection and `remove` collection must be empty. That means a logical file cannot exist in both the `remove` and `add` collections at the same time; however, the same *data file* can exist with *different* DVs in the `remove` collection, as logically they represent different content. The `remove` actions act as _tombstones_, and only exist for the benefit of the VACUUM command. Snapshot reads only return `add` actions on the read path.

To achieve the requirements above, related actions from different delta files need to be reconciled with each other:
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5",
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down
8 changes: 3 additions & 5 deletions build/sbt
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@ realpath () {
)
}

if [[ "$JENKINS_URL" != "" ]]; then
# Make Jenkins use Google Mirror first as Maven Central may ban us
SBT_REPOSITORIES_CONFIG="$(dirname "$(realpath "$0")")/sbt-config/repositories"
export SBT_OPTS="-Dsbt.override.build.repos=true -Dsbt.repository.config=$SBT_REPOSITORIES_CONFIG"
fi
# Make Jenkins use Google Mirror first as Maven Central may ban us
SBT_REPOSITORIES_CONFIG="$(dirname "$(realpath "$0")")/sbt-config/repositories"
export SBT_OPTS="-Dsbt.override.build.repos=true -Dsbt.repository.config=$SBT_REPOSITORIES_CONFIG"

. "$(dirname "$(realpath "$0")")"/sbt-launch-lib.bash

Expand Down
2 changes: 1 addition & 1 deletion build/sbt-config/repositories
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
local
local-preloaded-ivy: file:///${sbt.preloaded-${sbt.global.base-${user.home}/.sbt}/preloaded/}, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext]
local-preloaded: file:///${sbt.preloaded-${sbt.global.base-${user.home}/.sbt}/preloaded/}
gcs-maven-central-mirror: https://maven-central.storage-download.googleapis.com/repos/central/data/
gcs-maven-central-mirror: https://maven-central.storage-download.googleapis.com/maven2/
maven-central
typesafe-ivy-releases: https://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sbt-ivy-snapshots: https://repo.scala-sbt.org/scalasbt/ivy-snapshots/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
Expand Down
50 changes: 30 additions & 20 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -36,42 +36,52 @@ dlog () {
[[ $debug ]] && echoerr "$@"
}

download_sbt () {
local url=$1
local output=$2
local temp_file="${output}.part"

if [ $(command -v curl) ]; then
curl --fail --location --silent ${url} > "${temp_file}" &&\
mv "${temp_file}" "${output}"
elif [ $(command -v wget) ]; then
wget --quiet ${url} -O "${temp_file}" &&\
mv "${temp_file}" "${output}"
else
printf "You do not have curl or wget installed, unable to downlaod ${url}\n"
exit -1
fi
}


acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\.version/ {print $2}' ./project/build.properties`

# Download sbt from mirror URL if the environment variable is provided
# Set primary and fallback URLs
if [[ "${SBT_VERSION}" == "0.13.18" ]] && [[ -n "${SBT_MIRROR_JAR_URL}" ]]; then
URL1="${SBT_MIRROR_JAR_URL}"
elif [[ "${SBT_VERSION}" == "1.5.5" ]] && [[ -n "${SBT_1_5_5_MIRROR_JAR_URL}" ]]; then
URL1="${SBT_1_5_5_MIRROR_JAR_URL}"
else
URL1=${DEFAULT_ARTIFACT_REPOSITORY:-https://repo1.maven.org/maven2/}org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar
URL1=${DEFAULT_ARTIFACT_REPOSITORY:-https://maven-central.storage-download.googleapis.com/maven2/}org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar
fi
BACKUP_URL="https://repo1.maven.org/maven2/org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar"

JAR=build/sbt-launch-${SBT_VERSION}.jar
sbt_jar=$JAR

if [[ ! -f "$sbt_jar" ]]; then
# Download sbt launch jar if it hasn't been downloaded yet
if [ ! -f "${JAR}" ]; then
# Download
printf 'Attempting to fetch sbt from %s\n' "${URL1}"
JAR_DL="${JAR}.part"
if [ $(command -v curl) ]; then
curl --fail --location --silent ${URL1} > "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
elif [ $(command -v wget) ]; then
wget --quiet ${URL1} -O "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from https://www.scala-sbt.org/\n"
exit -1
fi
download_sbt "${URL1}" "${JAR}"

if [[ ! -f "${JAR}" ]]; then
printf 'Download from %s failed. Retrying from %s\n' "${URL1}" "${BACKUP_URL}"
download_sbt "${BACKUP_URL}" "${JAR}"
fi
if [ ! -f "${JAR}" ]; then
# We failed to download
printf "Our attempt to download sbt locally to ${JAR} failed. Please install sbt manually from https://www.scala-sbt.org/\n"
exit -1

if [[ ! -f "${JAR}" ]]; then
printf "Failed to download sbt. Please install sbt manually from https://www.scala-sbt.org/\n"
exit 1
fi
printf "Launching sbt from ${JAR}\n"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
Expand All @@ -16,6 +15,7 @@
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;

import io.delta.standalone.types.DataType;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static LogicalType toFlinkDataType(DataType deltaType, boolean nullable)
case LONG:
return new BigIntType(nullable);
case BINARY:
return new BinaryType(nullable, BinaryType.DEFAULT_LENGTH);
return new VarBinaryType(nullable, VarBinaryType.MAX_LENGTH);
case BOOLEAN:
return new BooleanType(nullable);
case BYTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class KernelDeltaLogDelegator(
kernelSnapshotWrapper,
hadoopConf,
logPath,
kernelSnapshot.getVersion(engine), // note: engine isn't used
kernelSnapshot.getVersion(),
this,
standaloneDeltaLog
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ public Metadata getMetadata() {
*/
@Override
public long getVersion() {
// WARNING: getVersion in SnapshotImpl currently doesn't use the engine so we can
// pass null, but if this changes this code could break
return kernelSnapshot.getVersion(null);
return kernelSnapshot.getVersion();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
Expand All @@ -18,6 +17,7 @@
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -71,7 +71,9 @@ private static Stream<Arguments> dataTypes() {
Arguments.of(new io.delta.standalone.types.ByteType(), new TinyIntType()),
Arguments.of(new io.delta.standalone.types.ShortType(), new SmallIntType()),
Arguments.of(new io.delta.standalone.types.LongType(), new BigIntType()),
Arguments.of(new io.delta.standalone.types.BinaryType(), new BinaryType()),
Arguments.of(
new io.delta.standalone.types.BinaryType(),
new VarBinaryType(VarBinaryType.MAX_LENGTH)),
Arguments.of(new io.delta.standalone.types.TimestampType(), new TimestampType()),
Arguments.of(new io.delta.standalone.types.DateType(), new DateType()),
Arguments.of(
Expand Down Expand Up @@ -163,7 +165,7 @@ private static Stream<Arguments> mapTypes() {
new io.delta.standalone.types.ShortType(),
true
),
new MapType(new BinaryType(), new SmallIntType())),
new MapType(new VarBinaryType(VarBinaryType.MAX_LENGTH), new SmallIntType())),
Arguments.of(
new io.delta.standalone.types.MapType(
new io.delta.standalone.types.StringType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,56 @@ public void setUp() {
nonPartitionedLargeTablePath);
}

/** Tests fix for https://github.com/delta-io/delta/issues/3977 */
@Test
public void testWriteFromDatagenTableToDeltaTypeWithBytesType() throws Exception {
final StreamTableEnvironment tableEnv = setupTableEnvAndDeltaCatalog(false);
final String targetTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
final String datagenSourceDDL = ""
+ "CREATE TABLE source_table ("
+ " id BIGINT,"
+ " binary_data BYTES"
+ ") WITH ("
+ " 'connector' = 'datagen',"
+ " 'fields.id.kind' = 'sequence',"
+ " 'fields.id.start' = '1',"
+ " 'fields.id.end' = '8',"
+ " 'number-of-rows' = '8'," // this makes the source BOUNDED
+ " 'fields.binary_data.kind' = 'random',"
+ " 'fields.binary_data.length' = '16'"
+ ")";
final String deltaSinkDDL = String.format(""
+ "CREATE TABLE target_table ("
+ " id BIGINT,"
+ " binary_data BYTES"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
targetTablePath);

// Stage 1: Create the source and validate it

tableEnv.executeSql(datagenSourceDDL).await();

final List<Row> sourceRows =
DeltaTestUtils.readTableResult(tableEnv.executeSql("SELECT * FROM source_table"));

assertThat(sourceRows).hasSize(8);

// Stage 2: Create the sink and insert into it and validate it

tableEnv.executeSql(deltaSinkDDL).await();

// If our fix for issue #3977 did not work, then this would have thrown an exception.
tableEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table").await();

final List<Row> targetRows =
DeltaTestUtils.readTableResult(tableEnv.executeSql("SELECT * FROM target_table"));

assertThat(targetRows).hasSize(8);
}

@Test
public void shouldReadAndWriteDeltaTable() throws Exception {

Expand Down
Loading

0 comments on commit dc59767

Please sign in to comment.