diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java index 1a37c7a33f20c..32d9701bdbb21 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -51,6 +53,14 @@ public static ParquetCompressionCodec fromString(String s) { return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT)); } + private static final Map codecNameMap = + Arrays.stream(ParquetCompressionCodec.values()).collect( + Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT))); + + public String lowerCaseName() { + return codecNameMap.get(this.name()); + } + public static final List availableCodecs = Arrays.asList( ParquetCompressionCodec.UNCOMPRESSED, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index ae110fdd0d3a3..1f022f8d369eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -89,7 +89,7 @@ object ParquetOptions extends DataSourceOptions { // The parquet compression short names private val shortParquetCompressionCodecNames = ParquetCompressionCodec.values().map { - codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec + codec => codec.lowerCaseName() -> codec.getCompressionCodec }.toMap def getParquetCompressionCodecName(name: String): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala index ba3228878ecee..a3f66e88be46c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -55,7 +55,7 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { } spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, - ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) + ParquetCompressionCodec.SNAPPY.lowerCaseName()) spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") formats.foreach { format => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index a8736c041517f..a16869782a469 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.benchmark import java.io.File -import java.util.Locale import scala.jdk.CollectionConverters._ import scala.util.Random @@ -100,17 +99,19 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { spark.read.json(dir).createOrReplaceTempView("jsonTable") } - val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT) - private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", parquetCodec).parquet(dir) + df.mode("overwrite") + .option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName()) + .parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table") } private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = { withSQLConf(ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString) { - df.mode("overwrite").option("compression", parquetCodec).parquet(dir) + df.mode("overwrite") + .option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName()) + .parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 10781ec90fa00..12d3b69648c53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.benchmark import java.io.File -import java.util.Locale import scala.util.Random @@ -53,7 +52,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { .setIfMissing("spark.executor.memory", "3g") .setIfMissing("orc.compression", "snappy") .setIfMissing("spark.sql.parquet.compression.codec", - ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) + ParquetCompressionCodec.SNAPPY.lowerCaseName()) SparkSession.builder().config(conf).getOrCreate() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index f01cfea62a958..721997d84e1a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.benchmark -import java.util.Locale - import scala.util.Try import org.apache.spark.SparkConf @@ -54,8 +52,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging { val conf = new SparkConf() .setMaster(System.getProperty("spark.sql.test.master", "local[1]")) .setAppName("test-sql-context") - .set("spark.sql.parquet.compression.codec", - ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) + .set("spark.sql.parquet.compression.codec", ParquetCompressionCodec.SNAPPY.lowerCaseName()) .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4")) .set("spark.driver.memory", "3g") .set("spark.executor.memory", "3g") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 1f1805a02d765..60e75aab78355 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources -import java.util.Locale - import scala.jdk.CollectionConverters._ import org.apache.spark.sql.QueryTest @@ -65,8 +63,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // on Maven Central. override protected def availableCodecs: Seq[String] = (ParquetCompressionCodec.NONE +: - ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)) - .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq) + ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)).map(_.lowerCaseName()) } class OrcCodecSuite extends FileSourceCodecSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 28ea430635a2b..040c127a43d13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File -import java.util.Locale import scala.jdk.CollectionConverters._ @@ -41,29 +40,28 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") { // When "compression" is configured, it should be the first choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> - ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + withSQLConf( + SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) { val props = Map( - "compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT), - ParquetOutputFormat.COMPRESSION -> - ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) + "compression" -> ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName(), + ParquetOutputFormat.COMPRESSION -> ParquetCompressionCodec.GZIP.lowerCaseName()) val option = new ParquetOptions(props, spark.sessionState.conf) assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } // When "compression" is not configured, "parquet.compression" should be the preferred choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> - ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { - val props = Map(ParquetOutputFormat.COMPRESSION -> - ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) + withSQLConf( + SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) { + val props = + Map(ParquetOutputFormat.COMPRESSION -> ParquetCompressionCodec.GZIP.lowerCaseName()) val option = new ParquetOptions(props, spark.sessionState.conf) assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name) } // When both "compression" and "parquet.compression" are not configured, // spark.sql.parquet.compression.codec should be the right choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> - ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + withSQLConf( + SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) { val props = Map.empty[String, String] val option = new ParquetOptions(props, spark.sessionState.conf) assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a5d5f8ce30f0c..44b9977978696 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1502,10 +1502,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> - ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + withSQLConf( + SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) { val option = new ParquetOptions( - Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)), + Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName()), spark.sessionState.conf) assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index 45dd8da6e0200..2152a7e300021 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.time.{Duration, Period} import java.time.temporal.ChronoUnit -import java.util.Locale import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest} @@ -158,8 +157,8 @@ class HiveParquetSuite extends QueryTest test("SPARK-37098: Alter table properties should invalidate cache") { // specify the compression in case we change it in future - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> - ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) { + withSQLConf( + SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.SNAPPY.lowerCaseName()) { withTempPath { dir => withTable("t") { sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'")