From 499eced1f301a57ee884463251d391281b725b64 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Thu, 1 Feb 2018 13:17:53 -0800 Subject: [PATCH 1/2] SPARKC-527: Allow Split Count to be Passed to CassandraSourceRelation Previous to this it was impossible to choose the target split count for a CassandraSourceRelation (SparkSql, Dataframes). This patch allows for case insensitive parameter passing as well as adding a new parameter for `splitCount`. The extra lowerCase in `consolidateConfs` is for making the forward merge easier. --- .../connector/sql/CassandraDataFrameSpec.scala | 12 ++++++++++++ .../datastax/spark/connector/rdd/ReadConf.scala | 15 ++++++++++++++- .../sql/cassandra/CassandraSourceRelation.scala | 4 ++-- .../spark/sql/cassandra/DefaultSource.scala | 10 ++++++---- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala index 39d10a80c..fbd2c866a 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.Future import com.datastax.spark.connector._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.rdd.CassandraTableScanRDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.cassandra._ import org.joda.time.LocalDate @@ -283,5 +284,16 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase with Eventuall firstRow should be((Byte.MinValue.toInt, Short.MinValue.toInt, "2016-08-03 00:00:00.0")) } + it should "be able to set splitCount" in { + val df = sqlContext + .read + .cassandraFormat("kv", ks) + .option("splitCount", "120") + .load + + val rdd = df.rdd.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[CassandraTableScanRDD[_]] + rdd.readConf.splitCount should be (Some(120)) + } + } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ReadConf.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ReadConf.scala index 4ce719874..e9bff258f 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ReadConf.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ReadConf.scala @@ -29,6 +29,17 @@ case class ReadConf( object ReadConf { val ReferenceSection = "Read Tuning Parameters" + val SplitCountParam = ConfigParameter[Option[Int]]( + name = "splitCount", + section = ReferenceSection, + default = None, + description = + """Specify the number of Spark partitions to + |read the Cassandra table into. This parameter is + |used in SparkSql and DataFrame Options. + """.stripMargin + ) + val SplitSizeInMBParam = ConfigParameter[Int]( name = "spark.cassandra.input.split.size_in_mb", section = ReferenceSection, @@ -71,6 +82,7 @@ object ReadConf { // Whitelist for allowed Read environment variables val Properties = Set( + SplitCountParam, SplitSizeInMBParam, FetchSizeInRowsParam, ConsistencyLevelParam, @@ -90,7 +102,8 @@ object ReadConf { taskMetricsEnabled = conf.getBoolean(TaskMetricParam.name, TaskMetricParam.default), throughputJoinQueryPerSec = conf.getLong(ThroughputJoinQueryPerSecParam.name, ThroughputJoinQueryPerSecParam.default), - parallelismLevel = conf.getInt(ParallelismLevelParam.name, ParallelismLevelParam.default) + parallelismLevel = conf.getInt(ParallelismLevelParam.name, ParallelismLevelParam.default), + splitCount = conf.getOption(SplitCountParam.name).map(_.toInt) ) } diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala index 8147e8952..c6f1ebe81 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala @@ -1,7 +1,7 @@ package org.apache.spark.sql.cassandra import java.net.InetAddress -import java.util.UUID +import java.util.{Locale, UUID} import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, Schema} import com.datastax.spark.connector.rdd.partitioner.CassandraPartitionGenerator._ @@ -313,7 +313,7 @@ object CassandraSourceRelation { //Keyspace/Cluster level settings for (prop <- DefaultSource.confProperties) { val value = Seq( - tableConf.get(prop), + tableConf.get(prop.toLowerCase(Locale.ROOT)), sqlConf.get(s"$cluster:$ks/$prop"), sqlConf.get(s"$cluster/$prop"), sqlConf.get(s"default/$prop")).flatten.headOption diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala index 719ab67e0..2ed925b33 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala @@ -1,13 +1,13 @@ package org.apache.spark.sql.cassandra -import scala.collection.mutable +import java.util.Locale +import scala.collection.mutable import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.cassandra.DefaultSource._ import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} - import com.datastax.spark.connector.util.Logging import com.datastax.spark.connector.cql.{AuthConfFactory, CassandraConnectorConf, DefaultAuthConfFactory} import com.datastax.spark.connector.rdd.ReadConf @@ -138,10 +138,12 @@ object DefaultSource { DefaultAuthConfFactory.properties // Dot is not allowed in Options key for Spark SQL parsers, so convert . to _ - // Map converted property to origin property name + // Map converted property to origin property name. Options are all going to + // lower case so we need to convert them as well. // TODO check SPARK 1.4 it may be fixed private val propertiesMap : Map[String, String] = { - confProperties.map(prop => (prop.replace(".", "_"), prop)).toMap + confProperties.map(prop => + (prop.replace(".", "_").toLowerCase(Locale.ROOT), prop.toLowerCase(Locale.ROOT))).toMap } /** Construct a map stores Cassandra Conf settings from options */ From 8f0294836dd2a1303b228576bcb9df51bf3b4436 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Thu, 1 Feb 2018 22:40:16 -0800 Subject: [PATCH 2/2] SPARKC-527: Update Reference Docs --- doc/reference.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/doc/reference.md b/doc/reference.md index e298f049b..9c973395e 100644 --- a/doc/reference.md +++ b/doc/reference.md @@ -236,6 +236,14 @@ OSS Cassandra this should never be used. 64 Approx amount of data to be fetched into a Spark partition + + splitCount + None + Specify the number of Spark partitions to +read the Cassandra table into. This parameter is +used in SparkSql and DataFrame Options. + +