Skip to content

Commit

Permalink
Merge pull request #1161 from datastax/SPARKC-527
Browse files Browse the repository at this point in the history
SPARKC-527: Allow Split Count to be Passed to CassandraSourceRelation
  • Loading branch information
RussellSpitzer authored Feb 5, 2018
2 parents 8ec266c + 8f02948 commit 37f4d61
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
8 changes: 8 additions & 0 deletions doc/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ OSS Cassandra this should never be used.</td>
<td>64</td>
<td>Approx amount of data to be fetched into a Spark partition</td>
</tr>
<tr>
<td><code>splitCount</code></td>
<td>None</td>
<td>Specify the number of Spark partitions to
read the Cassandra table into. This parameter is
used in SparkSql and DataFrame Options.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.concurrent.Future
import com.datastax.spark.connector._
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.CassandraTableScanRDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.cassandra._
import org.joda.time.LocalDate
Expand Down Expand Up @@ -283,5 +284,16 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase with Eventuall
firstRow should be((Byte.MinValue.toInt, Short.MinValue.toInt, "2016-08-03 00:00:00.0"))
}

it should "be able to set splitCount" in {
val df = sqlContext
.read
.cassandraFormat("kv", ks)
.option("splitCount", "120")
.load

val rdd = df.rdd.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[CassandraTableScanRDD[_]]
rdd.readConf.splitCount should be (Some(120))
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ case class ReadConf(
object ReadConf {
val ReferenceSection = "Read Tuning Parameters"

val SplitCountParam = ConfigParameter[Option[Int]](
name = "splitCount",
section = ReferenceSection,
default = None,
description =
"""Specify the number of Spark partitions to
|read the Cassandra table into. This parameter is
|used in SparkSql and DataFrame Options.
""".stripMargin
)

val SplitSizeInMBParam = ConfigParameter[Int](
name = "spark.cassandra.input.split.size_in_mb",
section = ReferenceSection,
Expand Down Expand Up @@ -71,6 +82,7 @@ object ReadConf {

// Whitelist for allowed Read environment variables
val Properties = Set(
SplitCountParam,
SplitSizeInMBParam,
FetchSizeInRowsParam,
ConsistencyLevelParam,
Expand All @@ -90,7 +102,8 @@ object ReadConf {
taskMetricsEnabled = conf.getBoolean(TaskMetricParam.name, TaskMetricParam.default),
throughputJoinQueryPerSec = conf.getLong(ThroughputJoinQueryPerSecParam.name,
ThroughputJoinQueryPerSecParam.default),
parallelismLevel = conf.getInt(ParallelismLevelParam.name, ParallelismLevelParam.default)
parallelismLevel = conf.getInt(ParallelismLevelParam.name, ParallelismLevelParam.default),
splitCount = conf.getOption(SplitCountParam.name).map(_.toInt)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.spark.sql.cassandra

import java.net.InetAddress
import java.util.UUID
import java.util.{Locale, UUID}

import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, Schema}
import com.datastax.spark.connector.rdd.partitioner.CassandraPartitionGenerator._
Expand Down Expand Up @@ -313,7 +313,7 @@ object CassandraSourceRelation {
//Keyspace/Cluster level settings
for (prop <- DefaultSource.confProperties) {
val value = Seq(
tableConf.get(prop),
tableConf.get(prop.toLowerCase(Locale.ROOT)),
sqlConf.get(s"$cluster:$ks/$prop"),
sqlConf.get(s"$cluster/$prop"),
sqlConf.get(s"default/$prop")).flatten.headOption
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.apache.spark.sql.cassandra

import scala.collection.mutable
import java.util.Locale

import scala.collection.mutable
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.cassandra.DefaultSource._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

import com.datastax.spark.connector.util.Logging
import com.datastax.spark.connector.cql.{AuthConfFactory, CassandraConnectorConf, DefaultAuthConfFactory}
import com.datastax.spark.connector.rdd.ReadConf
Expand Down Expand Up @@ -138,10 +138,12 @@ object DefaultSource {
DefaultAuthConfFactory.properties

// Dot is not allowed in Options key for Spark SQL parsers, so convert . to _
// Map converted property to origin property name
// Map converted property to origin property name. Options are all going to
// lower case so we need to convert them as well.
// TODO check SPARK 1.4 it may be fixed
private val propertiesMap : Map[String, String] = {
confProperties.map(prop => (prop.replace(".", "_"), prop)).toMap
confProperties.map(prop =>
(prop.replace(".", "_").toLowerCase(Locale.ROOT), prop.toLowerCase(Locale.ROOT))).toMap
}

/** Construct a map stores Cassandra Conf settings from options */
Expand Down

0 comments on commit 37f4d61

Please sign in to comment.