Skip to content

Commit

Permalink
error compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
yumingxuanguo-db committed Aug 31, 2024
1 parent 8ae47c1 commit 1844376
Show file tree
Hide file tree
Showing 45 changed files with 1,193 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ class IcebergConverter(spark: SparkSession)
var hasRemoves = false
var hasDataChange = false
var hasCommitInfo = false
var commitInfo: Option[CommitInfo] = None
breakable {
for (action <- actionsToCommit) {
action match {
Expand All @@ -476,7 +477,9 @@ class IcebergConverter(spark: SparkSession)
case r: RemoveFile =>
hasRemoves = true
if (r.dataChange) hasDataChange = true
case _: CommitInfo => hasCommitInfo = true
case ci: CommitInfo =>
commitInfo = Some(ci)
hasCommitInfo = true
case _ => // Do nothing
}
if (hasAdds && hasRemoves && hasDataChange && hasCommitInfo) break // Short-circuit
Expand Down Expand Up @@ -510,9 +513,14 @@ class IcebergConverter(spark: SparkSession)
}
overwriteHelper.commit()
} else if (hasAdds) {
val appendHelper = icebergTxn.getAppendOnlyHelper()
addsAndRemoves.foreach(action => appendHelper.add(action.add))
appendHelper.commit()
if (!hasRemoves && !hasDataChange && allDeltaActionsCaptured) {
logInfo(s"Skip Iceberg conversion for commit that only has AddFiles " +
s"without any RemoveFiles or data change. CommitInfo: $commitInfo")
} else {
val appendHelper = icebergTxn.getAppendOnlyHelper()
addsAndRemoves.foreach(action => appendHelper.add(action.add))
appendHelper.commit()
}
} else if (hasRemoves) {
val removeHelper = icebergTxn.getRemoveOnlyHelper()
addsAndRemoves.foreach(action => removeHelper.remove(action.remove))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.*;

/**
* A commit coordinator client that uses DynamoDB as the commit coordinator. The table schema is as follows:
Expand Down Expand Up @@ -350,11 +347,11 @@ DynamoDBTableEntryConstants.TABLE_LATEST_TIMESTAMP, new AttributeValueUpdate()
public CommitResponse commit(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
TableDescriptor tableDesc,
long commitVersion,
Iterator<String> actions,
UpdatedActions updatedActions) throws CommitFailedException {
Path logPath = tableDesc.getLogPath();
if (commitVersion == 0) {
throw new CommitFailedException(
false /* retryable */,
Expand All @@ -375,7 +372,7 @@ public CommitResponse commit(
commitVersion, commitPath);
CommitResponse res = commitToCoordinator(
logPath,
coordinatedCommitsTableConf,
tableDesc.getTableConf(),
commitVersion,
commitFileStatus,
inCommitTimestamp,
Expand All @@ -393,8 +390,7 @@ public CommitResponse commit(
backfillToVersion(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
tableDesc,
commitVersion,
null /* lastKnownBackfilledVersion */);
}
Expand Down Expand Up @@ -456,13 +452,12 @@ private GetCommitsResultInternal getCommitsImpl(

@Override
public GetCommitsResponse getCommits(
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
TableDescriptor tableDesc,
Long startVersion,
Long endVersion) {
try {
GetCommitsResultInternal res =
getCommitsImpl(logPath, coordinatedCommitsTableConf, startVersion, endVersion);
getCommitsImpl(tableDesc.getLogPath(), tableDesc.getTableConf(), startVersion, endVersion);
long latestTableVersionToReturn = res.response.getLatestTableVersion();
if (!res.hasAcceptedCommits) {
/*
Expand Down Expand Up @@ -533,16 +528,16 @@ private void validateBackfilledFileExists(
public void backfillToVersion(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
TableDescriptor tableDesc,
long version,
Long lastKnownBackfilledVersion) throws IOException {
LOG.info("Backfilling all unbackfilled commits.");
Path logPath = tableDesc.getLogPath();
GetCommitsResponse resp;
try {
resp = getCommitsImpl(
logPath,
coordinatedCommitsTableConf,
tableDesc.getTableConf(),
lastKnownBackfilledVersion,
null).response;
} catch (IOException e) {
Expand Down Expand Up @@ -582,7 +577,7 @@ public void backfillToVersion(
.withTableName(coordinatedCommitsTableName)
.addKeyEntry(
DynamoDBTableEntryConstants.TABLE_ID,
new AttributeValue().withS(getTableId(coordinatedCommitsTableConf)))
new AttributeValue().withS(getTableId(tableDesc.getTableConf())))
.addAttributeUpdatesEntry(
DynamoDBTableEntryConstants.COMMITS,
new AttributeValueUpdate()
Expand Down Expand Up @@ -624,6 +619,7 @@ public void backfillToVersion(
@Override
public Map<String, String> registerTable(
Path logPath,
Optional<TableIdentifier> tableIdentifier,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol) {
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@
},
"DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED" : {
"message" : [
"Clustering requires the data types of clustering columns to support data skipping. However the following column(s) '<columnsWithDataTypes>' have unsupported data types for data skipping in Delta."
"CLUSTER BY is not supported because the following column(s): <columnsWithDataTypes> don't support data skipping."
],
"sqlState" : "22000"
"sqlState" : "0A000"
},
"DELTA_CLUSTERING_COLUMNS_MISMATCH" : {
"message" : [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ trait Checkpoints extends DeltaLogging {
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
snapshotToCheckpoint.ensureCommitFilesBackfilled()
// TODO(table-identifier-plumbing): Plumb the right tableIdentifier from the Checkpoint Hook
// and pass it to `ensureCommitFilesBackfilled`.
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt = None)
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf, DeltaStreamUtils}

import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, IncrementalExecutionShims, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.execution.streaming.IncrementalExecution
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}

/**
Expand Down Expand Up @@ -179,7 +177,7 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {

val newData = queryExecution match {
case incrementalExecution: IncrementalExecution =>
selectFromStreamingDataFrame(incrementalExecution, data, selectExprs: _*)
DeltaStreamUtils.selectFromStreamingDataFrame(incrementalExecution, data, selectExprs: _*)
case _ => data.select(selectExprs: _*)
}
recordDeltaEvent(deltaLog, "delta.generatedColumns.write")
Expand Down Expand Up @@ -222,30 +220,4 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
schema
}
}

/**
* Select `cols` from a micro batch DataFrame. Directly calling `select` won't work because it
* will create a `QueryExecution` rather than inheriting `IncrementalExecution` from
* the micro batch DataFrame. A streaming micro batch DataFrame to execute should use
* `IncrementalExecution`.
*/
private def selectFromStreamingDataFrame(
incrementalExecution: IncrementalExecution,
df: DataFrame,
cols: Column*): DataFrame = {
val newMicroBatch = df.select(cols: _*)
val newIncrementalExecution = IncrementalExecutionShims.newInstance(
newMicroBatch.sparkSession,
newMicroBatch.queryExecution.logical,
incrementalExecution)
newIncrementalExecution.executedPlan // Force the lazy generation of execution plan


// Use reflection to call the private constructor.
val constructor =
classOf[Dataset[_]].getConstructor(classOf[QueryExecution], classOf[Encoder[_]])
constructor.newInstance(
newIncrementalExecution,
ExpressionEncoder(newIncrementalExecution.analyzed.schema)).asInstanceOf[DataFrame]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,17 @@ class DeltaParquetWriteSupport extends ParquetWriteSupport {
val id = getNestedFieldId(parentField, relElemFieldPath)
val elementField =
field.asGroupType().getFields.get(0).asGroupType().getFields.get(0).withId(id)
Types
val builder = Types
.buildGroup(field.getRepetition).as(LogicalTypeAnnotation.listType())
.addField(
Types.repeatedGroup()
.addField(convert(elementField, parentField, sparkSchema,
absolutePath :+ PARQUET_LIST_ELEMENT_FIELD_NAME, relElemFieldPath))
.named("list"))
.id(field.getId.intValue())
.named(field.getName)
if (field.getId != null) {
builder.id(field.getId.intValue())
}
builder.named(field.getName)
case _: MapLogicalTypeAnnotation =>
val relKeyFieldPath = relativePath :+ PARQUET_MAP_KEY_FIELD_NAME
val relValFieldPath = relativePath :+ PARQUET_MAP_VALUE_FIELD_NAME
Expand All @@ -123,7 +125,7 @@ class DeltaParquetWriteSupport extends ParquetWriteSupport {
field.asGroupType().getFields.get(0).asGroupType().getFields.get(0).withId(keyId)
val valueField =
field.asGroupType().getFields.get(0).asGroupType().getFields.get(1).withId(valId)
Types
val builder = Types
.buildGroup(field.getRepetition).as(LogicalTypeAnnotation.mapType())
.addField(
Types
Expand All @@ -133,8 +135,10 @@ class DeltaParquetWriteSupport extends ParquetWriteSupport {
.addField(convert(valueField, parentField, sparkSchema,
absolutePath :+ PARQUET_MAP_VALUE_FIELD_NAME, relValFieldPath))
.named("key_value"))
.id(field.getId.intValue())
.named(field.getName)
if (field.getId != null) {
builder.id(field.getId.intValue())
}
builder.named(field.getName)
case _ if field.isPrimitive => field
case _ =>
val builder = Types.buildGroup(field.getRepetition)
Expand All @@ -143,7 +147,10 @@ class DeltaParquetWriteSupport extends ParquetWriteSupport {
val parentField = findFieldInSparkSchema(sparkSchema, absPath)
builder.addField(convert(field, parentField, sparkSchema, absPath, Seq(field.getName)))
}
builder.id(field.getId.intValue()).named(field.getName)
if (field.getId != null) {
builder.id(field.getId.intValue())
}
builder.named(field.getName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,15 @@ object DeltaTableUtils extends PredicateHelper
private val logThrottler = new LogThrottler()

/** Whether a path should be hidden for delta-related file operations, such as Vacuum and Fsck. */
def isHiddenDirectory(partitionColumnNames: Seq[String], pathName: String): Boolean = {
def isHiddenDirectory(
partitionColumnNames: Seq[String],
pathName: String,
shouldIcebergMetadataDirBeHidden: Boolean = true): Boolean = {
// Names of the form partitionCol=[value] are partition directories, and should be
// GCed even if they'd normally be hidden. The _db_index directory contains (bloom filter)
// indexes and these must be GCed when the data they are tied to is GCed.
// metadata name is reserved for converted iceberg metadata with delta universal format
pathName.equals("metadata") ||
(shouldIcebergMetadataDirBeHidden && pathName.equals("metadata")) ||
(pathName.startsWith(".") || pathName.startsWith("_")) &&
!pathName.startsWith("_delta_index") && !pathName.startsWith("_change_data") &&
!partitionColumnNames.exists(c => pathName.startsWith(c ++ "="))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import java.nio.file.FileAlreadyExistsException
import java.util.{ConcurrentModificationException, UUID}
import java.util.{ConcurrentModificationException, Optional, UUID}
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -1556,7 +1556,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val updatedActions = new UpdatedActions(
commitInfo, metadata, protocol, snapshot.metadata, snapshot.protocol)
val commitResponse = TransactionExecutionObserver.withObserver(executionObserver) {
effectiveTableCommitCoordinatorClient.commit(attemptVersion, jsonActions, updatedActions)
effectiveTableCommitCoordinatorClient.commit(
attemptVersion, jsonActions, updatedActions, catalogTable.map(_.identifier))
}
// TODO(coordinated-commits): Use the right timestamp method on top of CommitInfo once ICT is
// merged.
Expand Down Expand Up @@ -1706,8 +1707,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite
log"file-system based table to coordinated-commits table: " +
log"[commit-coordinator: ${MDC(DeltaLogKeys.COORDINATOR_NAME, commitCoordinatorName)}" +
log", conf: ${MDC(DeltaLogKeys.COORDINATOR_CONF, commitCoordinatorConf)}]")
val tableIdentifierOpt =
CoordinatedCommitsUtils.toCCTableIdentifier(catalogTable.map(_.identifier))
newCoordinatedCommitsTableConf = Some(newCommitCoordinatorClient.registerTable(
deltaLog.logPath, readVersion, finalMetadata, protocol).asScala.toMap)
deltaLog.logPath,
tableIdentifierOpt,
readVersion,
finalMetadata,
protocol).asScala.toMap)
case (None, Some(readCommitCoordinatorClient)) =>
// CC -> FS conversion
val (newOwnerName, newOwnerConf) =
Expand Down Expand Up @@ -2277,11 +2284,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite
override def commit(
logStore: io.delta.storage.LogStore,
hadoopConf: Configuration,
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
commitVersion: Long,
actions: java.util.Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val logPath = tableDesc.getLogPath
// Get thread local observer for Fuzz testing purpose.
val executionObserver = TransactionExecutionObserver.getObserver
val commitFile = util.FileNames.unsafeDeltaFile(logPath, commitVersion)
Expand Down Expand Up @@ -2315,17 +2322,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

override def getCommits(
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
startVersion: java.lang.Long,
endVersion: java.lang.Long): GetCommitsResponse =
new GetCommitsResponse(Seq.empty.asJava, -1)

override def backfillToVersion(
logStore: io.delta.storage.LogStore,
hadoopConf: Configuration,
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
version: Long,
lastKnownBackfilledVersion: java.lang.Long): Unit = {}

Expand All @@ -2344,6 +2349,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite

override def registerTable(
logPath: Path,
tableIdentifier: Optional[TableIdentifier],
currentVersion: Long,
currentMetadata: AbstractMetadata,
currentProtocol: AbstractProtocol): java.util.Map[String, String] =
Expand Down Expand Up @@ -2380,7 +2386,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val updatedActions =
currentTransactionInfo.getUpdatedActions(snapshot.metadata, snapshot.protocol)
val commitResponse = TransactionExecutionObserver.withObserver(executionObserver) {
tableCommitCoordinatorClient.commit(attemptVersion, jsonActions, updatedActions)
tableCommitCoordinatorClient.commit(
attemptVersion, jsonActions, updatedActions, catalogTable.map(_.identifier))
}
if (attemptVersion == 0L) {
val expectedPathForCommitZero = unsafeDeltaFile(deltaLog.logPath, version = 0L).toUri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField,
case class PreprocessTableMerge(override val conf: SQLConf)
extends Rule[LogicalPlan] with UpdateExpressionsSupport {

override protected val supportMergeAndUpdateLegacyCastBehavior: Boolean = true


override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case m: DeltaMergeInto if m.resolved => apply(m, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ case class PreprocessTableUpdate(sqlConf: SQLConf)

override def conf: SQLConf = sqlConf

override protected val supportMergeAndUpdateLegacyCastBehavior: Boolean = true

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case u: DeltaUpdateTable if u.resolved =>
u.condition.foreach { cond =>
Expand Down
Loading

0 comments on commit 1844376

Please sign in to comment.