Skip to content

Commit

Permalink
Uniform: Add support for Timestamp partition values, and move away fr…
Browse files Browse the repository at this point in the history
…om using partition string paths to using StructLike partition values in Iceberg..
  • Loading branch information
amogh-jahagirdar committed Sep 3, 2024
1 parent eb00b0d commit 496d0fe
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ class IcebergConversionTransaction(
tablePath,
partitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot.statsSchema,
statsParser,
postCommitSnapshot.deltaLog
postCommitSnapshot
)
)
}
Expand Down Expand Up @@ -138,17 +137,20 @@ class IcebergConversionTransaction(
tablePath,
partitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot.statsSchema,
statsParser,
postCommitSnapshot.deltaLog
postCommitSnapshot
)
)
}

def remove(remove: RemoveFile): Unit = {
overwriter.deleteFile(
convertDeltaRemoveFileToIcebergDataFile(
remove, tablePath, partitionSpec, logicalToPhysicalPartitionNames)
remove,
tablePath,
partitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot)
)
}
}
Expand All @@ -167,7 +169,11 @@ class IcebergConversionTransaction(
val dataFilesToDelete = removes.map { f =>
assert(!f.dataChange, "Rewrite operation should not add data")
convertDeltaRemoveFileToIcebergDataFile(
f, tablePath, partitionSpec, logicalToPhysicalPartitionNames)
f,
tablePath,
partitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot)
}.toSet.asJava

val dataFilesToAdd = adds.map { f =>
Expand All @@ -177,9 +183,8 @@ class IcebergConversionTransaction(
tablePath,
partitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot.statsSchema,
statsParser,
postCommitSnapshot.deltaLog
postCommitSnapshot
)
}.toSet.asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

package org.apache.spark.sql.delta.icebergShaded

import java.nio.ByteBuffer
import java.time.Instant

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfig, DeltaConfigs, DeltaErrors, DeltaLog, DeltaRuntimeException}
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfig, DeltaConfigs, DeltaErrors, DeltaLog, Snapshot}
import org.apache.spark.sql.delta.DeltaConfigs.parseCalendarInterval
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
import shadedForDelta.org.apache.iceberg.Metrics
import shadedForDelta.org.apache.iceberg.StructLike
import shadedForDelta.org.apache.iceberg.TableProperties

// scalastyle:off import.ordering.noEmptyLine
Expand All @@ -35,7 +39,7 @@ import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier =>
import shadedForDelta.org.apache.iceberg.hive.HiveCatalog

import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier => SparkTableIdentifier}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.CalendarInterval

object IcebergTransactionUtils
Expand Down Expand Up @@ -80,27 +84,49 @@ object IcebergTransactionUtils
// throw an exception when building the data file.
.withRecordCount(add.numLogicalRecords.getOrElse(-1L))

if (add.stats != null && add.stats.nonEmpty) {
try {
val statsRow = statsParser(add.stats)

val metricsConverter = IcebergStatsConverter(statsRow, statsSchema)
val metrics = new Metrics(
metricsConverter.numRecordsStat, // rowCount
null, // columnSizes
null, // valueCounts
metricsConverter.nullValueCountsStat.getOrElse(null).asJava, // nullValueCounts
null, // nanValueCounts
metricsConverter.lowerBoundsStat.getOrElse(null).asJava, // lowerBounds
metricsConverter.upperBoundsStat.getOrElse(null).asJava // upperBounds
)

dataFileBuilder = dataFileBuilder.withMetrics(metrics)
} catch {
case NonFatal(e) =>
logWarning("Failed to convert Delta stats to Iceberg stats. Iceberg conversion will " +
try {
if (add.stats != null && add.stats.nonEmpty) {
dataFileBuilder = dataFileBuilder.withMetrics(
metrics(statsParser, add.stats, statsSchema))
}
} catch {
case NonFatal(e) =>
logWarning("Failed to convert Delta stats to Iceberg stats. Iceberg conversion will " +
"attempt to proceed without stats.", e)
}

dataFileBuilder.build()
}

def convertDeltaAddFileToIcebergDataFile(
add: AddFile,
tablePath: Path,
partitionSpec: PartitionSpec,
logicalToPhysicalPartitionNames: Map[String, String],
statsParser: String => InternalRow,
snapshot: Snapshot): DataFile = {
if (add.deletionVector != null) {
throw new UnsupportedOperationException("No support yet for DVs")
}

var dataFileBuilder =
convertFileAction(
add, tablePath, partitionSpec, logicalToPhysicalPartitionNames, snapshot)
// Attempt to attach the number of records metric regardless of whether the Delta stats
// string is null/empty or not because this metric is required by Iceberg. If the number
// of records is both unavailable here and unavailable in the Delta stats, Iceberg will
// throw an exception when building the data file.
.withRecordCount(add.numLogicalRecords.getOrElse(-1L))

try {
if (add.stats != null && add.stats.nonEmpty) {
dataFileBuilder = dataFileBuilder.withMetrics(
metrics(statsParser, add.stats, snapshot.statsSchema))
}
} catch {
case NonFatal(e) =>
logWarning("Failed to convert Delta stats to Iceberg stats. Iceberg conversion will " +
"attempt to proceed without stats.", e)
}

dataFileBuilder.build()
Expand All @@ -120,6 +146,18 @@ object IcebergTransactionUtils
.build()
}

def convertDeltaRemoveFileToIcebergDataFile(
remove: RemoveFile,
tablePath: Path,
partitionSpec: PartitionSpec,
logicalToPhysicalPartitionNames: Map[String, String],
snapshot: Snapshot): DataFile = {
convertFileAction(
remove, tablePath, partitionSpec, logicalToPhysicalPartitionNames, snapshot)
.withRecordCount(remove.numLogicalRecords.getOrElse(0L))
.build()
}

/**
* We expose this as a public API since APIs like
* [[shadedForDelta.org.apache.iceberg.DeleteFiles#deleteFile]] actually only need to take in
Expand Down Expand Up @@ -168,6 +206,14 @@ object IcebergTransactionUtils
partitionSchema.fields.map(f => f.name -> DeltaColumnMapping.getPhysicalName(f)).toMap
}

class Row (val values: Array[Any]) extends StructLike {
override def size: Int = values.length
override def get[T <: Any](pos: Int, javaClass: Class[T]): T = javaClass.cast(values(pos))
override def set[T <: Any](pos: Int, value: T): Unit = {
values(pos) = value
}
}

////////////////////
// Helper Methods //
////////////////////
Expand Down Expand Up @@ -210,6 +256,118 @@ object IcebergTransactionUtils
builder
}

private[delta] def convertFileAction(
f: FileAction,
tablePath: Path,
partitionSpec: PartitionSpec,
logicalToPhysicalPartitionNames: Map[String, String],
snapshot: Snapshot): DataFiles.Builder = {
val absPath = canonicalizeFilePath(f, tablePath)
val schema = snapshot.schema
var builder = DataFiles
.builder(partitionSpec)
.withPath(absPath)
.withFileSizeInBytes(f.getFileSize)
.withFormat(FileFormat.PARQUET)
val nameToDataTypes = schema.fields.map(f => f.name -> f.dataType).toMap

if (partitionSpec.isPartitioned) {
val ICEBERG_NULL_PARTITION_VALUE = "__HIVE_DEFAULT_PARTITION__"
val partitionPath = partitionSpec.fields()
val partitionVals = new Array[Any](partitionSpec.fields().size())
for (i <- partitionVals.indices) {
val logicalPartCol = partitionPath.get(i).name()
val physicalPartKey = logicalToPhysicalPartitionNames(logicalPartCol)
// ICEBERG_NULL_PARTITION_VALUE is referred in Iceberg lib to mark NULL partition value
val partValue = Option(f.partitionValues(physicalPartKey))
.getOrElse(ICEBERG_NULL_PARTITION_VALUE)
val partitionColumnDataType = nameToDataTypes(logicalPartCol)
val deltaPartitionValue =
deserializeDeltaPartitionValue(partitionColumnDataType, partValue, snapshot.version)
val icebergPartitionValue =
deltaPartitionValueToIcebergPartitionValue(
deltaPartitionValue, partitionColumnDataType, snapshot.version)
partitionVals(i) = icebergPartitionValue
}

builder = builder.withPartition(new Row(partitionVals))
}
builder
}

/**
* Follows deserialization as specified here
* https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Partition-Value-Serialization
*/
private def deserializeDeltaPartitionValue(
elemType: DataType,
partitionVal: String,
version: Long): Any = {
elemType match {
case _: StringType => partitionVal
case _: DateType => java.sql.Date.valueOf(partitionVal)
case _: IntegerType => partitionVal.toInt
case _: ShortType => partitionVal.toInt
case _: ByteType => partitionVal.toInt
case _: LongType => partitionVal.toLong
case _: BooleanType => partitionVal.toBoolean
case _: FloatType => partitionVal.toFloat
case _: DoubleType => partitionVal.toDouble
case _: DecimalType => new java.math.BigDecimal(partitionVal)
case _: BinaryType => partitionVal.getBytes("UTF-8")
case _: TimestampNTZType => java.sql.Timestamp.valueOf(partitionVal)
case _: TimestampType => Instant.parse(partitionVal)
case _ =>
throw DeltaErrors.universalFormatConversionFailedException(
version, "iceberg", "Unexpected partition data type " + elemType)
}
}

private def deltaPartitionValueToIcebergPartitionValue(
deltaPartitionValue: Any,
deltaPartitionType: DataType,
version: Long): Any = {
deltaPartitionType match {
case _: StringType => deltaPartitionValue.asInstanceOf[String]
case _: DateType => deltaPartitionValue.asInstanceOf[java.sql.Date]
.toLocalDate.toEpochDay.asInstanceOf[Int]
case _: IntegerType => deltaPartitionValue.asInstanceOf[Integer]
case _: ShortType => deltaPartitionValue.asInstanceOf[Integer]
case _: ByteType => deltaPartitionValue.asInstanceOf[Integer]
case _: LongType => deltaPartitionValue.asInstanceOf[Long]
case _: BooleanType => deltaPartitionValue.asInstanceOf[Boolean]
case _: FloatType => deltaPartitionValue.asInstanceOf[Float]
case _: DoubleType => deltaPartitionValue.asInstanceOf[Double]
case _: DecimalType => deltaPartitionValue.asInstanceOf[BigDecimal]
case _: BinaryType => ByteBuffer.wrap(deltaPartitionValue.asInstanceOf[Array[Byte]])
case _: TimestampNTZType => deltaPartitionValue
.asInstanceOf[java.sql.Timestamp]
.getNanos/1000.asInstanceOf[Long]
case _: TimestampType => deltaPartitionValue
.asInstanceOf[Instant].getNano/1000.asInstanceOf[Long]
case _ =>
throw DeltaErrors.universalFormatConversionFailedException(
version, "iceberg", "Unexpected partition data type " + deltaPartitionType)
}
}

private def metrics(
statsParser: String => InternalRow,
stats: String,
statsSchema: StructType): Metrics = {
val statsRow = statsParser(stats)
val metricsConverter = IcebergStatsConverter(statsRow, statsSchema)
new Metrics(
metricsConverter.numRecordsStat, // rowCount
null, // columnSizes
null, // valueCounts
metricsConverter.nullValueCountsStat.getOrElse(null).asJava, // nullValueCounts
null, // nanValueCounts
metricsConverter.lowerBoundsStat.getOrElse(null).asJava, // lowerBounds
metricsConverter.upperBoundsStat.getOrElse(null).asJava // upperBounds
)
}

/**
* Create an Iceberg HiveCatalog
* @param conf: Hadoop Configuration
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,12 @@
"<schema>"
]
},
"UNSUPPORTED_PARTITION_DATA_TYPE" : {
"message" : [
"IcebergCompatV<version> does not support the data type <dataType> for partition columns in your schema. Your partition schema:",
"<schema>"
]
},
"VERSION_MUTUAL_EXCLUSIVE" : {
"message" : [
"Only one IcebergCompat version can be enabled, please explicitly disable all other IcebergCompat versions that are not needed."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3250,6 +3250,15 @@ trait DeltaErrorsBase
)
}

def icebergCompatUnsupportedPartitionDataTypeException(
version: Int, dataType: DataType, schema: StructType): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_ICEBERG_COMPAT_VIOLATION.UNSUPPORTED_PARTITION_DATA_TYPE",
messageParameters = Array(version.toString, version.toString,
dataType.typeName, schema.treeString)
)
}

def icebergCompatMissingRequiredTableFeatureException(
version: Int, tf: TableFeature): Throwable = {
new DeltaUnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object IcebergCompatV2 extends IcebergCompat(
CheckOnlySingleVersionEnabled,
CheckAddFileHasStats,
CheckTypeInV2AllowList,
CheckPartitionDataTypeInV2AllowList,
CheckNoPartitionEvolution,
CheckDeletionVectorDisabled
)
Expand Down Expand Up @@ -398,6 +399,27 @@ object CheckTypeInV2AllowList extends IcebergCompatCheck {
}
}

object CheckPartitionDataTypeInV2AllowList extends IcebergCompatCheck {
private val allowTypes = Set[Class[_]] (
ByteType.getClass, ShortType.getClass, IntegerType.getClass, LongType.getClass,
FloatType.getClass, DoubleType.getClass, classOf[DecimalType],
StringType.getClass, BinaryType.getClass,
BooleanType.getClass,
TimestampType.getClass, TimestampNTZType.getClass, DateType.getClass,
)
override def apply(context: IcebergCompatContext): Unit = {
val partitionSchema = context.newestMetadata.partitionSchema
SchemaUtils
.findAnyTypeRecursively(partitionSchema)(t => !allowTypes.contains(t.getClass))
match {
case Some(unsupportedType) =>
throw DeltaErrors.icebergCompatUnsupportedDataTypeException(
context.version, unsupportedType, partitionSchema)
case _ =>
}
}
}

/**
* Check if the deletion vector has been disabled by previous snapshot
* or newest metadata and protocol depending on whether the operation
Expand Down
Loading

0 comments on commit 496d0fe

Please sign in to comment.