From bc21e20cfef90f3568986ffc5cc299997b00f389 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 19 May 2020 00:54:06 -0700 Subject: [PATCH 1/6] Issue 207: Add Sentinels support for spark-redis library --- pom.xml | 2 +- .../provider/redis/ConnectionPool.scala | 62 +++++++++++++------ .../provider/redis/RedisConfig.scala | 16 +++-- .../spark/sql/redis/RedisSourceRelation.scala | 6 +- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/pom.xml b/pom.xml index 311add45..cdd0168f 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 1.8 2.11 ${scala.major.version}.12 - 3.2.0 + 3.3.0 2.4.1 1.0 diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index 322d8c5f..c91fdf32 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -1,31 +1,58 @@ package com.redislabs.provider.redis -import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool} +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, JedisSentinelPool} import redis.clients.jedis.exceptions.JedisConnectionException - import java.util.concurrent.ConcurrentHashMap +import redis.clients.jedis.util.Pool + import scala.collection.JavaConversions._ object ConnectionPool { - @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] = - new ConcurrentHashMap[RedisEndpoint, JedisPool]() + @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, Pool[Jedis]] = + new ConcurrentHashMap[RedisEndpoint, Pool[Jedis]]() + + private lazy val buildPoolConfig = { + val poolConfig: JedisPoolConfig = new JedisPoolConfig() + poolConfig.setMaxTotal(250) + poolConfig.setMaxIdle(32) + poolConfig.setTestOnBorrow(false) + poolConfig.setTestOnReturn(false) + poolConfig.setTestWhileIdle(false) + poolConfig.setMinEvictableIdleTimeMillis(60000) + poolConfig.setTimeBetweenEvictionRunsMillis(30000) + poolConfig.setNumTestsPerEvictionRun(-1) + + poolConfig + } def connect(re: RedisEndpoint): Jedis = { val pool = pools.getOrElseUpdate(re, { - val poolConfig: JedisPoolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(250) - poolConfig.setMaxIdle(32) - poolConfig.setTestOnBorrow(false) - poolConfig.setTestOnReturn(false) - poolConfig.setTestWhileIdle(false) - poolConfig.setMinEvictableIdleTimeMillis(60000) - poolConfig.setTimeBetweenEvictionRunsMillis(30000) - poolConfig.setNumTestsPerEvictionRun(-1) - - new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + val poolConfig = buildPoolConfig + + if (null == re.master || re.master.trim.isEmpty) { + new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + } else { + val sentinels = re.host.split(",").map(x => x + ":" + re.port).toSet + new JedisSentinelPool( + re.master.trim, //masterName + sentinels, //set of sentinels + poolConfig, //initial poolConfig + re.timeout, //initial timeOut + 2000, //initialsocketTimeout + null, //initaluser + re.auth, //initialPassword + re.dbNum, //initialDbNum + null, //clientName + 2000, //SentinelConnTimeout + 2000, //SentinelSocketTimeout + null, //SentinelUser + re.sentinelAuth, //SentinelPassword + null //SentinelClientName + ) + } } ) var sleepTime: Int = 4 @@ -35,15 +62,12 @@ object ConnectionPool { conn = pool.getResource } catch { - case e: JedisConnectionException if e.getCause.toString. - contains("ERR max number of clients reached") => { + case e: JedisConnectionException if e.getCause.toString.contains("ERR max number of clients reached") => if (sleepTime < 500) sleepTime *= 2 Thread.sleep(sleepTime) - } case e: Exception => throw e } } conn } } - diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index 91e2f05e..541b9802 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -24,7 +24,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, auth: String = null, dbNum: Int = Protocol.DEFAULT_DATABASE, timeout: Int = Protocol.DEFAULT_TIMEOUT, - ssl: Boolean = false) + ssl: Boolean = false, + master: String = null, + sentinelAuth: String = null) extends Serializable { /** @@ -39,7 +41,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, conf.get("spark.redis.auth", null), conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE), conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT), - conf.getBoolean("spark.redis.ssl", false) + conf.getBoolean("spark.redis.ssl", defaultValue = false), + conf.get("spark.redis.sentinel.master", null), + conf.get("spark.redis.sentinel.auth", null) ) } @@ -253,8 +257,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt //simply re-enter this function witht he master host/port - getNonClusterNodes(initialHost = new RedisEndpoint(host, port, - initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl)) + getNonClusterNodes(initialHost = RedisEndpoint(host, port, + initialHost.auth, initialHost.dbNum, initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth)) } else { //this is a master - take its slaves @@ -270,7 +274,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val range = nodes.length (0 until range).map(i => RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), + initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth), 0, 16383, i, range)).toArray } } @@ -300,7 +304,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) val port = node.get(1).toString.toInt RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), + initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth), sPos, ePos, i, diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index f2c84911..0ab83ed4 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -40,8 +40,10 @@ class RedisSourceRelation(override val sqlContext: SQLContext, val auth = parameters.getOrElse("auth", null) val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE) val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT) - val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false) - RedisEndpoint(host, port, auth, dbNum, timeout, ssl) + val ssl = parameters.get("ssl").exists(_.toBoolean) + val master = parameters.getOrElse("sentinel.master", null) + val sentinelAuth = parameters.getOrElse("sentinel.auth", null) + RedisEndpoint(host, port, auth, dbNum, timeout, ssl, master, sentinelAuth) } ) } From a623b0d5d8d256ec5dea0499d86b8b781de5ba02 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 19 May 2020 00:55:52 -0700 Subject: [PATCH 2/6] Issue 207: Add Sentinels support for spark-redis library. Add docs --- doc/cluster.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/doc/cluster.md b/doc/cluster.md index 8a3c6315..f278c703 100644 --- a/doc/cluster.md +++ b/doc/cluster.md @@ -17,3 +17,36 @@ def twoEndpointExample ( sc: SparkContext) = { } ``` If you want to use multiple Redis clusters/instances, an implicit RedisConfig can be used in a code block to specify the target cluster/instance. + +### Connecting to Sentinels +#### Using parameters +```scala +df + .option("table", "table") + .option("key.column", "key") + .option("host", "host1,host2,host3") + .option("port", "6000") + .option("dbNum", "0") + .option("timeout", "2000") + .option("auth", "pwd") + .option("ssl", "true") + .option("sentinel.master", "mymaster") + .option("sentinel.auth", "sentinelPwd") +``` + +#### Using sparkContext +```scala +val spark = SparkSession + .builder() + .appName("myApp") + .master("local[*]") + .config("spark.redis.host", "host1,host2,host3") + .config("spark.redis.port", "6000") + .config("spark.redis.auth", "passwd") + .config("spark.redis.ssl", "true") + .config("spark.redis.sentinel.master", "mymaster") + .config("spark.redis.sentinel.auth", "sentinelPwd") + .getOrCreate() + +val sc = spark.sparkContext +``` From 56e25407f0fc3b801d4090ad1e7266540df2a54f Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 19 May 2020 01:46:58 -0700 Subject: [PATCH 3/6] Issue 207: Add Sentinels support for spark-redis library. Add docs --- doc/configuration.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/configuration.md b/doc/configuration.md index 353f4e47..accf9849 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -11,6 +11,9 @@ topology from the initial node, so there is no need to provide the rest of the c * `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100. * `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100. * `spark.redis.ssl` - set to true to use tls +* `spark.redis.sentinel.master` - master node name in Sentinel mode +* `spark.redis.sentinel.auth` - the sentinel's password + From b73ddb070286d8648dbe394afbc896bc8a2881a9 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 20 May 2020 05:27:24 -0700 Subject: [PATCH 4/6] Issue 207: Add Sentinels support for spark-redis library. Add docs --- doc/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/configuration.md b/doc/configuration.md index accf9849..38d878ac 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -3,7 +3,7 @@ The supported configuration parameters are: ## Spark Context configuration parameters * `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster -topology from the initial node, so there is no need to provide the rest of the cluster nodes. +topology from the initial node, so there is no need to provide the rest of the cluster nodes. For sentinel mode all sentinels should be add comma separated `sentinel1,sentinel2,...` * `spark.redis.port` - the initial node's TCP redis port. * `spark.redis.auth` - the initial node's AUTH password * `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode. From b66a30cf97c14ce7a2fab36bbe84c116dfc305bc Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 20 May 2020 05:28:03 -0700 Subject: [PATCH 5/6] Binary serde issue. Type inconsistency with Spark Catalyst --- .../provider/redis/util/Logging.scala | 6 +++++ .../provider/redis/util/SparkUtils.scala | 17 ++++++++++++++ .../sql/redis/BinaryRedisPersistence.scala | 7 +++++- .../spark/sql/redis/RedisSourceRelation.scala | 22 +++++++++---------- 4 files changed, 39 insertions(+), 13 deletions(-) create mode 100644 src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala diff --git a/src/main/scala/com/redislabs/provider/redis/util/Logging.scala b/src/main/scala/com/redislabs/provider/redis/util/Logging.scala index 814586d8..620b565d 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/Logging.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/Logging.scala @@ -29,6 +29,12 @@ trait Logging { } } + def logWarn(msg: => String): Unit = { + if (logger.isWarnEnabled) { + _logger.warn(msg) + } + } + def logDebug(msg: => String): Unit = { if (logger.isDebugEnabled) { _logger.debug(msg) diff --git a/src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala new file mode 100644 index 00000000..3b6de2b5 --- /dev/null +++ b/src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala @@ -0,0 +1,17 @@ +package com.redislabs.provider.redis.util + +import org.apache.spark.sql.types.StructType + +object SparkUtils { + /** + * Setting the schema column positions the same order as in requiredFields + * @param schema Current schema + * @param requiredColumns Column positions expecting by Catalyst + */ + def alignSchemaWithCatalyst(schema: StructType, requiredColumns: Seq[String]): StructType = { + val fieldsMap = schema.fields.map(f => (f.name, f)).toMap + StructType(requiredColumns.map { c => + fieldsMap(c) + }) + } +} diff --git a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala index c9b0a981..be4d7572 100644 --- a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala @@ -2,6 +2,7 @@ package org.apache.spark.sql.redis import java.nio.charset.StandardCharsets.UTF_8 +import com.redislabs.provider.redis.util.SparkUtils import org.apache.commons.lang3.SerializationUtils import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema @@ -34,6 +35,10 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] { override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType, requiredColumns: Seq[String]): Row = { val valuesArray: Array[Any] = SerializationUtils.deserialize(value) - new GenericRowWithSchema(valuesArray, schema) + // Aligning column positions with what Catalyst expecting + val alignedSchema = SparkUtils.alignSchemaWithCatalyst(schema, requiredColumns) + val names = schema.fieldNames + val alignedValuesArray = requiredColumns.toArray.map(f => valuesArray(names.indexOf(f))) + new GenericRowWithSchema(alignedValuesArray, alignedSchema) } } diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 0ab83ed4..8e43ec19 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -5,7 +5,7 @@ import java.util.{List => JList} import com.redislabs.provider.redis.rdd.Keys import com.redislabs.provider.redis.util.ConnectionUtils.withConnection -import com.redislabs.provider.redis.util.Logging +import com.redislabs.provider.redis.util.{Logging, SparkUtils} import com.redislabs.provider.redis.util.PipelineUtils._ import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, RedisEndpoint, RedisNode, toRedisContext} import org.apache.commons.lang3.SerializationUtils @@ -161,19 +161,17 @@ class RedisSourceRelation(override val sqlContext: SQLContext, new GenericRow(Array[Any]()) } } else { - // filter schema columns, it should be in the same order as given 'requiredColumns' - val requiredSchema = { - val fieldsMap = schema.fields.map(f => (f.name, f)).toMap - val requiredFields = requiredColumns.map { c => - fieldsMap(c) - } - StructType(requiredFields) - } - val keyType = + /* + For binary its crucial to have a schema, as we cen't infer it and catalyst requiredColumns doesn't guarantee + the same order. Thus the schema is only place where we can read correct attribute positions for binary + */ + val (keyType, requiredSchema) = if (persistenceModel == SqlOptionModelBinary) { - RedisDataTypeString + if (this.schema == null) + logWarn("Unable to identify the schema when reading a dataframe in Binary mode. It can cause type inconsistency!") + (RedisDataTypeString, this.schema) } else { - RedisDataTypeHash + (RedisDataTypeHash, SparkUtils.alignSchemaWithCatalyst(this.schema, requiredColumns)) } keysRdd.mapPartitions { partition => // grouped iterator to only allocate memory for a portion of rows From 4a185aaed999588b7656a75573713596e1b234c3 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 10 Jun 2020 04:06:54 -0700 Subject: [PATCH 6/6] Revert "Binary serde issue. Type inconsistency with Spark Catalyst" This reverts commit b66a30cf --- .../provider/redis/util/Logging.scala | 6 ----- .../provider/redis/util/SparkUtils.scala | 17 -------------- .../sql/redis/BinaryRedisPersistence.scala | 7 +----- .../spark/sql/redis/RedisSourceRelation.scala | 22 ++++++++++--------- 4 files changed, 13 insertions(+), 39 deletions(-) delete mode 100644 src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala diff --git a/src/main/scala/com/redislabs/provider/redis/util/Logging.scala b/src/main/scala/com/redislabs/provider/redis/util/Logging.scala index 620b565d..814586d8 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/Logging.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/Logging.scala @@ -29,12 +29,6 @@ trait Logging { } } - def logWarn(msg: => String): Unit = { - if (logger.isWarnEnabled) { - _logger.warn(msg) - } - } - def logDebug(msg: => String): Unit = { if (logger.isDebugEnabled) { _logger.debug(msg) diff --git a/src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala deleted file mode 100644 index 3b6de2b5..00000000 --- a/src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala +++ /dev/null @@ -1,17 +0,0 @@ -package com.redislabs.provider.redis.util - -import org.apache.spark.sql.types.StructType - -object SparkUtils { - /** - * Setting the schema column positions the same order as in requiredFields - * @param schema Current schema - * @param requiredColumns Column positions expecting by Catalyst - */ - def alignSchemaWithCatalyst(schema: StructType, requiredColumns: Seq[String]): StructType = { - val fieldsMap = schema.fields.map(f => (f.name, f)).toMap - StructType(requiredColumns.map { c => - fieldsMap(c) - }) - } -} diff --git a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala index be4d7572..c9b0a981 100644 --- a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala @@ -2,7 +2,6 @@ package org.apache.spark.sql.redis import java.nio.charset.StandardCharsets.UTF_8 -import com.redislabs.provider.redis.util.SparkUtils import org.apache.commons.lang3.SerializationUtils import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema @@ -35,10 +34,6 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] { override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType, requiredColumns: Seq[String]): Row = { val valuesArray: Array[Any] = SerializationUtils.deserialize(value) - // Aligning column positions with what Catalyst expecting - val alignedSchema = SparkUtils.alignSchemaWithCatalyst(schema, requiredColumns) - val names = schema.fieldNames - val alignedValuesArray = requiredColumns.toArray.map(f => valuesArray(names.indexOf(f))) - new GenericRowWithSchema(alignedValuesArray, alignedSchema) + new GenericRowWithSchema(valuesArray, schema) } } diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 8e43ec19..0ab83ed4 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -5,7 +5,7 @@ import java.util.{List => JList} import com.redislabs.provider.redis.rdd.Keys import com.redislabs.provider.redis.util.ConnectionUtils.withConnection -import com.redislabs.provider.redis.util.{Logging, SparkUtils} +import com.redislabs.provider.redis.util.Logging import com.redislabs.provider.redis.util.PipelineUtils._ import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, RedisEndpoint, RedisNode, toRedisContext} import org.apache.commons.lang3.SerializationUtils @@ -161,17 +161,19 @@ class RedisSourceRelation(override val sqlContext: SQLContext, new GenericRow(Array[Any]()) } } else { - /* - For binary its crucial to have a schema, as we cen't infer it and catalyst requiredColumns doesn't guarantee - the same order. Thus the schema is only place where we can read correct attribute positions for binary - */ - val (keyType, requiredSchema) = + // filter schema columns, it should be in the same order as given 'requiredColumns' + val requiredSchema = { + val fieldsMap = schema.fields.map(f => (f.name, f)).toMap + val requiredFields = requiredColumns.map { c => + fieldsMap(c) + } + StructType(requiredFields) + } + val keyType = if (persistenceModel == SqlOptionModelBinary) { - if (this.schema == null) - logWarn("Unable to identify the schema when reading a dataframe in Binary mode. It can cause type inconsistency!") - (RedisDataTypeString, this.schema) + RedisDataTypeString } else { - (RedisDataTypeHash, SparkUtils.alignSchemaWithCatalyst(this.schema, requiredColumns)) + RedisDataTypeHash } keysRdd.mapPartitions { partition => // grouped iterator to only allocate memory for a portion of rows