Skip to content

Commit

Permalink
[SPARK-45481][SQL][FOLLOWUP]Add lowerCaseName for ParquetCompressionC…
Browse files Browse the repository at this point in the history
…odec
  • Loading branch information
beliefer committed Oct 28, 2023
1 parent 57e73da commit 765c107
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;

Expand Down Expand Up @@ -51,6 +53,14 @@ public static ParquetCompressionCodec fromString(String s) {
return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
}

private static final Map<String, String> codecNameMap =
Arrays.stream(ParquetCompressionCodec.values()).collect(
Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));

public String lowerCaseName() {
return codecNameMap.get(this.name());
}

public static final List<ParquetCompressionCodec> availableCodecs =
Arrays.asList(
ParquetCompressionCodec.UNCOMPRESSED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object ParquetOptions extends DataSourceOptions {
// The parquet compression short names
private val shortParquetCompressionCodecNames =
ParquetCompressionCodec.values().map {
codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec
codec => codec.lowerCaseName() -> codec.getCompressionCodec
}.toMap

def getParquetCompressionCodecName(name: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark {
}

spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
ParquetCompressionCodec.SNAPPY.lowerCaseName())
spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")

formats.foreach { format =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.benchmark

import java.io.File
import java.util.Locale

import scala.jdk.CollectionConverters._
import scala.util.Random
Expand Down Expand Up @@ -100,17 +99,19 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}

val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)

private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = {
df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
df.mode("overwrite")
.option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName())
.parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
}

private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = {
withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
df.mode("overwrite")
.option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName())
.parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.benchmark

import java.io.File
import java.util.Locale

import scala.util.Random

Expand Down Expand Up @@ -53,7 +52,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
.setIfMissing("spark.executor.memory", "3g")
.setIfMissing("orc.compression", "snappy")
.setIfMissing("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
ParquetCompressionCodec.SNAPPY.lowerCaseName())

SparkSession.builder().config(conf).getOrCreate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.benchmark

import java.util.Locale

import scala.util.Try

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -54,8 +52,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging {
val conf = new SparkConf()
.setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
.setAppName("test-sql-context")
.set("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
.set("spark.sql.parquet.compression.codec", ParquetCompressionCodec.SNAPPY.lowerCaseName())
.set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
.set("spark.driver.memory", "3g")
.set("spark.executor.memory", "3g")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.QueryTest
Expand Down Expand Up @@ -65,8 +63,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// on Maven Central.
override protected def availableCodecs: Seq[String] =
(ParquetCompressionCodec.NONE +:
ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
.map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)).map(_.lowerCaseName())
}

class OrcCodecSuite extends FileSourceCodecSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.util.Locale

import scala.jdk.CollectionConverters._

Expand All @@ -41,29 +40,28 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar

test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
withSQLConf(
SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val props = Map(
"compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
"compression" -> ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName(),
ParquetOutputFormat.COMPRESSION -> ParquetCompressionCodec.GZIP.lowerCaseName())
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}

// When "compression" is not configured, "parquet.compression" should be the preferred choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map(ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
withSQLConf(
SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val props =
Map(ParquetOutputFormat.COMPRESSION -> ParquetCompressionCodec.GZIP.lowerCaseName())
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name)
}

// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
withSQLConf(
SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1502,10 +1502,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}

test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
withSQLConf(
SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val option = new ParquetOptions(
Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName()),
spark.sessionState.conf)
assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive

import java.time.{Duration, Period}
import java.time.temporal.ChronoUnit
import java.util.Locale

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest}
Expand Down Expand Up @@ -158,8 +157,8 @@ class HiveParquetSuite extends QueryTest

test("SPARK-37098: Alter table properties should invalidate cache") {
// specify the compression in case we change it in future
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) {
withSQLConf(
SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
withTempPath { dir =>
withTable("t") {
sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'")
Expand Down

0 comments on commit 765c107

Please sign in to comment.