Skip to content

Commit

Permalink
[Spark] Report writer offset mismatch in DVStore (#3661)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

We identified a potential issue with the API used to write DV to files,
where the `DataOutputStream.size()` method may not return the correct
file size. Our investigation revealed that `DataOutputStream` maintains
its own byte count, but its `write(data)` method does not increment this
counter `DataOutputStream` has multiple subclasses, which might override
the counter or the `write(data)` method to update the counter correctly.
We want to find out which class is being used when the issue occurs,
thus this PR.

To address this, we introduced our own mechanism to track the number of
bytes written, which will be used solely for logging. If there is a
discrepancy between the system's reported file size and our own record,
a Delta event will be triggered.

## How was this patch tested?

Not needed.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
xupefei authored Sep 10, 2024
1 parent a08712d commit b7ef01c
Showing 1 changed file with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import java.util.zip.CRC32
import org.apache.spark.sql.delta.DeltaErrors
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, StoredBitmap}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.PathWithFileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.paths.SparkPath
import org.apache.spark.util.Utils

trait DeletionVectorStore extends Logging {
trait DeletionVectorStore extends DeltaLogging {
/**
* Read a Deletion Vector and parse it as [[RoaringBitmapArray]].
*/
Expand Down Expand Up @@ -205,22 +205,36 @@ class HadoopFileSystemDVStore(hadoopConf: Configuration)
// Lazily create the writer for the deletion vectors, so that we don't write an empty file
// in case all deletion vectors are empty.
private var outputStream: FSDataOutputStream = _
private var writtenBytes = 0L

override def write(data: Array[Byte]): DeletionVectorStore.DVRangeDescriptor = {
if (outputStream == null) {
val overwrite = false // `create` Java API does not support named parameters
outputStream = path.fs.create(path.path, overwrite)
outputStream.writeByte(DeletionVectorStore.DV_FILE_FORMAT_VERSION_ID_V1)
writtenBytes += 1
}
val dvRange = DeletionVectorStore.DVRangeDescriptor(
offset = outputStream.size(),
length = data.length,
checksum = DeletionVectorStore.calculateChecksum(data)
)
checksum = DeletionVectorStore.calculateChecksum(data))

if (writtenBytes != dvRange.offset) {
recordDeltaEvent(
deltaLog = null,
opType = "delta.deletionVector.write.offsetMismatch",
data = Map(
"path" -> path.path.toString,
"reportedOffset" -> dvRange.offset,
"calculatedOffset" -> writtenBytes))
}

log.debug(s"Writing DV range to file: Path=${path.path}, Range=${dvRange}")
outputStream.writeInt(data.length)
outputStream.write(data)
outputStream.writeInt(dvRange.checksum)
writtenBytes += DeletionVectorStore.getTotalSizeOfDVFieldsInFile(data.length)

dvRange
}

Expand Down

0 comments on commit b7ef01c

Please sign in to comment.