diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 61c2f7a5926b8..1c9bdfdb493f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -26,7 +26,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} -import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors} @@ -260,12 +260,14 @@ class UnivocityParser( case ym: YearMonthIntervalType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - Cast(Literal(datum), ym).eval(EmptyRow) + IntervalUtils.castStringToYMInterval( + UTF8String.fromString(datum), ym.startField, ym.endField) } case dt: DayTimeIntervalType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - Cast(Literal(datum), dt).eval(EmptyRow) + IntervalUtils.castStringToDTInterval( + UTF8String.fromString(datum), dt.startField, dt.endField) } case udt: UserDefinedType[_] => diff --git a/sql/core/benchmarks/CSVBenchmark-jdk21-results.txt b/sql/core/benchmarks/CSVBenchmark-jdk21-results.txt index b37d2fa3060e7..50a13bd51ebe4 100644 --- a/sql/core/benchmarks/CSVBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/CSVBenchmark-jdk21-results.txt @@ -2,69 +2,76 @@ Benchmark to measure CSV read/write performance ================================================================================================ -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -One quoted string 23353 23432 75 0.0 467067.4 1.0X +One quoted string 23962 24182 316 0.0 479231.3 1.0X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 1000 columns 56825 57244 679 0.0 56825.1 1.0X -Select 100 columns 20482 20568 86 0.0 20481.7 2.8X -Select one column 16968 17000 36 0.1 16967.7 3.3X -count() 3366 3378 11 0.3 3366.4 16.9X -Select 100 columns, one bad input field 28347 28379 30 0.0 28346.6 2.0X -Select 100 columns, corrupt record field 32401 32450 42 0.0 32401.2 1.8X +Select 1000 columns 56724 57115 570 0.0 56724.1 1.0X +Select 100 columns 20740 20855 115 0.0 20739.7 2.7X +Select one column 17304 17377 114 0.1 17304.3 3.3X +count() 3719 3740 21 0.3 3719.0 15.3X +Select 100 columns, one bad input field 24943 24999 69 0.0 24943.2 2.3X +Select 100 columns, corrupt record field 28306 28341 31 0.0 28306.2 2.0X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns + count() 11174 11195 18 0.9 1117.4 1.0X -Select 1 column + count() 7666 7694 24 1.3 766.6 1.5X -count() 2042 2048 5 4.9 204.2 5.5X +Select 10 columns + count() 10977 10982 5 0.9 1097.7 1.0X +Select 1 column + count() 7406 7554 131 1.4 740.6 1.5X +count() 1550 1558 9 6.5 155.0 7.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 854 882 27 11.7 85.4 1.0X -to_csv(timestamp) 6166 6174 13 1.6 616.6 0.1X -write timestamps to files 6480 6575 158 1.5 648.0 0.1X -Create a dataset of dates 948 949 1 10.6 94.8 0.9X -to_csv(date) 4471 4474 3 2.2 447.1 0.2X -write dates to files 4599 4616 15 2.2 459.9 0.2X +Create a dataset of timestamps 845 847 3 11.8 84.5 1.0X +to_csv(timestamp) 5546 5597 57 1.8 554.6 0.2X +write timestamps to files 5760 5768 8 1.7 576.0 0.1X +Create a dataset of dates 1053 1064 9 9.5 105.3 0.8X +to_csv(date) 4115 4122 9 2.4 411.5 0.2X +write dates to files 4102 4108 5 2.4 410.2 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------- -read timestamp text from files 1200 1213 12 8.3 120.0 1.0X -read timestamps from files 11576 11601 22 0.9 1157.6 0.1X -infer timestamps from files 23234 23253 16 0.4 2323.4 0.1X -read date text from files 1115 1162 44 9.0 111.5 1.1X -read date from files 10978 11006 43 0.9 1097.8 0.1X -infer date from files 22588 22604 13 0.4 2258.8 0.1X -timestamp strings 1224 1236 21 8.2 122.4 1.0X -parse timestamps from Dataset[String] 13566 13595 41 0.7 1356.6 0.1X -infer timestamps from Dataset[String] 25057 25094 36 0.4 2505.7 0.0X -date strings 1618 1626 7 6.2 161.8 0.7X -parse dates from Dataset[String] 12784 12816 34 0.8 1278.4 0.1X -from_csv(timestamp) 12008 12088 69 0.8 1200.8 0.1X -from_csv(date) 11930 11938 12 0.8 1193.0 0.1X -infer error timestamps from Dataset[String] with default format 14366 14394 35 0.7 1436.6 0.1X -infer error timestamps from Dataset[String] with user-provided format 14380 14412 52 0.7 1438.0 0.1X -infer error timestamps from Dataset[String] with legacy format 14439 14453 21 0.7 1443.9 0.1X +read timestamp text from files 1107 1119 16 9.0 110.7 1.0X +read timestamps from files 9511 9553 49 1.1 951.1 0.1X +infer timestamps from files 19084 19114 27 0.5 1908.4 0.1X +read date text from files 1036 1046 14 9.7 103.6 1.1X +read date from files 8299 8309 15 1.2 829.9 0.1X +infer date from files 17290 17294 4 0.6 1729.0 0.1X +timestamp strings 1188 1197 7 8.4 118.8 0.9X +parse timestamps from Dataset[String] 11442 11458 14 0.9 1144.2 0.1X +infer timestamps from Dataset[String] 21076 21116 39 0.5 2107.6 0.1X +date strings 1651 1659 10 6.1 165.1 0.7X +parse dates from Dataset[String] 10181 10186 5 1.0 1018.1 0.1X +from_csv(timestamp) 10023 10062 34 1.0 1002.3 0.1X +from_csv(date) 9335 9351 15 1.1 933.5 0.1X +infer error timestamps from Dataset[String] with default format 11187 11205 16 0.9 1118.7 0.1X +infer error timestamps from Dataset[String] with user-provided format 11201 11216 13 0.9 1120.1 0.1X +infer error timestamps from Dataset[String] with legacy format 11210 11227 17 0.9 1121.0 0.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 4302 4383 137 0.0 43020.6 1.0X -pushdown disabled 4206 4220 13 0.0 42058.8 1.0X -w/ filters 776 784 10 0.1 7756.3 5.5X +w/o filters 4365 4377 13 0.0 43653.8 1.0X +pushdown disabled 4348 4370 22 0.0 43477.7 1.0X +w/ filters 695 713 29 0.1 6950.2 6.3X + +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1022-azure +AMD EPYC 7763 64-Core Processor +Interval: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Read as Intervals 7089 7096 7 0.4 2362.1 1.0X +Read Raw Strings 2071 2075 6 1.4 690.1 3.4X diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt b/sql/core/benchmarks/CSVBenchmark-results.txt index 522e164f80c8c..c1e4f53e1cf4a 100644 --- a/sql/core/benchmarks/CSVBenchmark-results.txt +++ b/sql/core/benchmarks/CSVBenchmark-results.txt @@ -2,69 +2,76 @@ Benchmark to measure CSV read/write performance ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -One quoted string 24681 24724 74 0.0 493616.4 1.0X +One quoted string 25841 26207 446 0.0 516822.6 1.0X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 1000 columns 55362 55719 576 0.0 55361.6 1.0X -Select 100 columns 22947 22975 36 0.0 22946.7 2.4X -Select one column 19695 19714 18 0.1 19694.7 2.8X -count() 3474 3514 54 0.3 3473.8 15.9X -Select 100 columns, one bad input field 32366 32417 47 0.0 32365.6 1.7X -Select 100 columns, corrupt record field 35921 35986 77 0.0 35921.3 1.5X +Select 1000 columns 57462 57944 502 0.0 57462.5 1.0X +Select 100 columns 23373 23449 71 0.0 23372.8 2.5X +Select one column 20009 20041 40 0.0 20008.9 2.9X +count() 4326 4402 66 0.2 4325.9 13.3X +Select 100 columns, one bad input field 28362 28472 104 0.0 28361.6 2.0X +Select 100 columns, corrupt record field 31926 32002 113 0.0 31925.7 1.8X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns + count() 9523 9537 15 1.1 952.3 1.0X -Select 1 column + count() 6868 6883 13 1.5 686.8 1.4X -count() 1820 1836 20 5.5 182.0 5.2X +Select 10 columns + count() 9591 9615 23 1.0 959.1 1.0X +Select 1 column + count() 6827 6845 21 1.5 682.7 1.4X +count() 1754 1759 5 5.7 175.4 5.5X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 899 912 12 11.1 89.9 1.0X -to_csv(timestamp) 7355 7371 14 1.4 735.5 0.1X -write timestamps to files 7751 7761 12 1.3 775.1 0.1X -Create a dataset of dates 1171 1174 6 8.5 117.1 0.8X -to_csv(date) 5040 5044 4 2.0 504.0 0.2X -write dates to files 5277 5292 24 1.9 527.7 0.2X +Create a dataset of timestamps 932 938 9 10.7 93.2 1.0X +to_csv(timestamp) 7214 7275 83 1.4 721.4 0.1X +write timestamps to files 7593 7604 10 1.3 759.3 0.1X +Create a dataset of dates 1036 1042 7 9.6 103.6 0.9X +to_csv(date) 5121 5139 15 2.0 512.1 0.2X +write dates to files 5203 5215 16 1.9 520.3 0.2X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------- -read timestamp text from files 1245 1251 7 8.0 124.5 1.0X -read timestamps from files 10059 10074 23 1.0 1005.9 0.1X -infer timestamps from files 20189 20223 36 0.5 2018.9 0.1X -read date text from files 1151 1167 24 8.7 115.1 1.1X -read date from files 10547 10568 25 0.9 1054.7 0.1X -infer date from files 21527 21540 11 0.5 2152.7 0.1X -timestamp strings 1355 1364 15 7.4 135.5 0.9X -parse timestamps from Dataset[String] 11522 11553 28 0.9 1152.2 0.1X -infer timestamps from Dataset[String] 21195 21203 10 0.5 2119.5 0.1X -date strings 1785 1788 5 5.6 178.5 0.7X -parse dates from Dataset[String] 12245 12288 44 0.8 1224.5 0.1X -from_csv(timestamp) 10102 10144 51 1.0 1010.2 0.1X -from_csv(date) 11329 11353 29 0.9 1132.9 0.1X -infer error timestamps from Dataset[String] with default format 12067 12091 36 0.8 1206.7 0.1X -infer error timestamps from Dataset[String] with user-provided format 12077 12093 24 0.8 1207.7 0.1X -infer error timestamps from Dataset[String] with legacy format 12047 12076 26 0.8 1204.7 0.1X +read timestamp text from files 1258 1260 3 7.9 125.8 1.0X +read timestamps from files 10113 10133 17 1.0 1011.3 0.1X +infer timestamps from files 19797 19926 134 0.5 1979.7 0.1X +read date text from files 1131 1133 3 8.8 113.1 1.1X +read date from files 10275 10283 8 1.0 1027.5 0.1X +infer date from files 20823 20856 29 0.5 2082.3 0.1X +timestamp strings 1330 1335 7 7.5 133.0 0.9X +parse timestamps from Dataset[String] 11297 11345 43 0.9 1129.7 0.1X +infer timestamps from Dataset[String] 20968 20999 41 0.5 2096.8 0.1X +date strings 1786 1793 7 5.6 178.6 0.7X +parse dates from Dataset[String] 11565 11595 34 0.9 1156.5 0.1X +from_csv(timestamp) 9920 9948 42 1.0 992.0 0.1X +from_csv(date) 10506 10512 5 1.0 1050.6 0.1X +infer error timestamps from Dataset[String] with default format 12344 12370 23 0.8 1234.4 0.1X +infer error timestamps from Dataset[String] with user-provided format 12351 12367 17 0.8 1235.1 0.1X +infer error timestamps from Dataset[String] with legacy format 12345 12366 18 0.8 1234.5 0.1X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure AMD EPYC 7763 64-Core Processor Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 4119 4139 17 0.0 41191.2 1.0X -pushdown disabled 4092 4110 16 0.0 40922.3 1.0X -w/ filters 691 702 13 0.1 6911.5 6.0X +w/o filters 4103 4108 5 0.0 41030.6 1.0X +pushdown disabled 4129 4132 4 0.0 41291.7 1.0X +w/ filters 769 775 6 0.1 7689.8 5.3X + +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1022-azure +AMD EPYC 7763 64-Core Processor +Interval: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Read as Intervals 6922 6964 44 0.4 2306.6 1.0X +Read Raw Strings 2360 2391 27 1.3 786.5 2.9X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala index e9cf35d9fab9c..524c222062150 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala @@ -371,6 +371,38 @@ object CSVBenchmark extends SqlBasedBenchmark { } } + private def intervalBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark(s"Interval", rowsNum, output = output) + withTempPath { path => + spark + .range(rowsNum) + .map { i => + (s"${i % 1000}-${"%02d".format(i % 12)}", + s"${i % 1000} " + + s"${"%02d".format(i % 24)}:" + + s"${"%02d".format(i % 60)}:" + + s"${"%02d".format(i % 60)}.${i % 1000000}") + } + .toDF("ym", "ds") + .write + .option("header", true) + .mode("overwrite") + .csv(path.getAbsolutePath) + benchmark.addCase("Read as Intervals", numIters) { _ => + spark.read.option("header", true) + .schema("ym INTERVAL YEAR TO MONTH, ds INTERVAL DAY TO SECOND") + .csv(path.getAbsolutePath) + .noop() + } + benchmark.addCase("Read Raw Strings", numIters) { _ => + spark.read.option("header", true) + .csv(path.getAbsolutePath) + .noop() + } + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("Benchmark to measure CSV read/write performance") { val numIters = 3 @@ -379,6 +411,7 @@ object CSVBenchmark extends SqlBasedBenchmark { countBenchmark(rowsNum = 10 * 1000 * 1000, numIters) datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) + intervalBenchmark(rowsNum = 300 * 1000, numIters) } } }