diff --git a/doc/cluster.md b/doc/cluster.md index 8a3c6315..f278c703 100644 --- a/doc/cluster.md +++ b/doc/cluster.md @@ -17,3 +17,36 @@ def twoEndpointExample ( sc: SparkContext) = { } ``` If you want to use multiple Redis clusters/instances, an implicit RedisConfig can be used in a code block to specify the target cluster/instance. + +### Connecting to Sentinels +#### Using parameters +```scala +df + .option("table", "table") + .option("key.column", "key") + .option("host", "host1,host2,host3") + .option("port", "6000") + .option("dbNum", "0") + .option("timeout", "2000") + .option("auth", "pwd") + .option("ssl", "true") + .option("sentinel.master", "mymaster") + .option("sentinel.auth", "sentinelPwd") +``` + +#### Using sparkContext +```scala +val spark = SparkSession + .builder() + .appName("myApp") + .master("local[*]") + .config("spark.redis.host", "host1,host2,host3") + .config("spark.redis.port", "6000") + .config("spark.redis.auth", "passwd") + .config("spark.redis.ssl", "true") + .config("spark.redis.sentinel.master", "mymaster") + .config("spark.redis.sentinel.auth", "sentinelPwd") + .getOrCreate() + +val sc = spark.sparkContext +``` diff --git a/doc/configuration.md b/doc/configuration.md index 353f4e47..38d878ac 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -3,7 +3,7 @@ The supported configuration parameters are: ## Spark Context configuration parameters * `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster -topology from the initial node, so there is no need to provide the rest of the cluster nodes. +topology from the initial node, so there is no need to provide the rest of the cluster nodes. For sentinel mode all sentinels should be add comma separated `sentinel1,sentinel2,...` * `spark.redis.port` - the initial node's TCP redis port. * `spark.redis.auth` - the initial node's AUTH password * `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode. @@ -11,6 +11,9 @@ topology from the initial node, so there is no need to provide the rest of the c * `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100. * `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100. * `spark.redis.ssl` - set to true to use tls +* `spark.redis.sentinel.master` - master node name in Sentinel mode +* `spark.redis.sentinel.auth` - the sentinel's password + diff --git a/pom.xml b/pom.xml index 59dce265..81498413 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 1.8 2.11 ${scala.major.version}.12 - 3.2.0 + 3.3.0 2.4.1 1.0 diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index 322d8c5f..c91fdf32 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -1,31 +1,58 @@ package com.redislabs.provider.redis -import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool} +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, JedisSentinelPool} import redis.clients.jedis.exceptions.JedisConnectionException - import java.util.concurrent.ConcurrentHashMap +import redis.clients.jedis.util.Pool + import scala.collection.JavaConversions._ object ConnectionPool { - @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] = - new ConcurrentHashMap[RedisEndpoint, JedisPool]() + @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, Pool[Jedis]] = + new ConcurrentHashMap[RedisEndpoint, Pool[Jedis]]() + + private lazy val buildPoolConfig = { + val poolConfig: JedisPoolConfig = new JedisPoolConfig() + poolConfig.setMaxTotal(250) + poolConfig.setMaxIdle(32) + poolConfig.setTestOnBorrow(false) + poolConfig.setTestOnReturn(false) + poolConfig.setTestWhileIdle(false) + poolConfig.setMinEvictableIdleTimeMillis(60000) + poolConfig.setTimeBetweenEvictionRunsMillis(30000) + poolConfig.setNumTestsPerEvictionRun(-1) + + poolConfig + } def connect(re: RedisEndpoint): Jedis = { val pool = pools.getOrElseUpdate(re, { - val poolConfig: JedisPoolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(250) - poolConfig.setMaxIdle(32) - poolConfig.setTestOnBorrow(false) - poolConfig.setTestOnReturn(false) - poolConfig.setTestWhileIdle(false) - poolConfig.setMinEvictableIdleTimeMillis(60000) - poolConfig.setTimeBetweenEvictionRunsMillis(30000) - poolConfig.setNumTestsPerEvictionRun(-1) - - new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + val poolConfig = buildPoolConfig + + if (null == re.master || re.master.trim.isEmpty) { + new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + } else { + val sentinels = re.host.split(",").map(x => x + ":" + re.port).toSet + new JedisSentinelPool( + re.master.trim, //masterName + sentinels, //set of sentinels + poolConfig, //initial poolConfig + re.timeout, //initial timeOut + 2000, //initialsocketTimeout + null, //initaluser + re.auth, //initialPassword + re.dbNum, //initialDbNum + null, //clientName + 2000, //SentinelConnTimeout + 2000, //SentinelSocketTimeout + null, //SentinelUser + re.sentinelAuth, //SentinelPassword + null //SentinelClientName + ) + } } ) var sleepTime: Int = 4 @@ -35,15 +62,12 @@ object ConnectionPool { conn = pool.getResource } catch { - case e: JedisConnectionException if e.getCause.toString. - contains("ERR max number of clients reached") => { + case e: JedisConnectionException if e.getCause.toString.contains("ERR max number of clients reached") => if (sleepTime < 500) sleepTime *= 2 Thread.sleep(sleepTime) - } case e: Exception => throw e } } conn } } - diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index 91e2f05e..541b9802 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -24,7 +24,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, auth: String = null, dbNum: Int = Protocol.DEFAULT_DATABASE, timeout: Int = Protocol.DEFAULT_TIMEOUT, - ssl: Boolean = false) + ssl: Boolean = false, + master: String = null, + sentinelAuth: String = null) extends Serializable { /** @@ -39,7 +41,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, conf.get("spark.redis.auth", null), conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE), conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT), - conf.getBoolean("spark.redis.ssl", false) + conf.getBoolean("spark.redis.ssl", defaultValue = false), + conf.get("spark.redis.sentinel.master", null), + conf.get("spark.redis.sentinel.auth", null) ) } @@ -253,8 +257,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt //simply re-enter this function witht he master host/port - getNonClusterNodes(initialHost = new RedisEndpoint(host, port, - initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl)) + getNonClusterNodes(initialHost = RedisEndpoint(host, port, + initialHost.auth, initialHost.dbNum, initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth)) } else { //this is a master - take its slaves @@ -270,7 +274,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val range = nodes.length (0 until range).map(i => RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), + initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth), 0, 16383, i, range)).toArray } } @@ -300,7 +304,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) val port = node.get(1).toString.toInt RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), + initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth), sPos, ePos, i, diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index f2c84911..0ab83ed4 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -40,8 +40,10 @@ class RedisSourceRelation(override val sqlContext: SQLContext, val auth = parameters.getOrElse("auth", null) val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE) val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT) - val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false) - RedisEndpoint(host, port, auth, dbNum, timeout, ssl) + val ssl = parameters.get("ssl").exists(_.toBoolean) + val master = parameters.getOrElse("sentinel.master", null) + val sentinelAuth = parameters.getOrElse("sentinel.auth", null) + RedisEndpoint(host, port, auth, dbNum, timeout, ssl, master, sentinelAuth) } ) }