Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 207: Add Sentinels support for spark-redis library #245

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
33 changes: 33 additions & 0 deletions doc/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,36 @@ def twoEndpointExample ( sc: SparkContext) = {
}
```
If you want to use multiple Redis clusters/instances, an implicit RedisConfig can be used in a code block to specify the target cluster/instance.

### Connecting to Sentinels
#### Using parameters
```scala
df
.option("table", "table")
.option("key.column", "key")
.option("host", "host1,host2,host3")
.option("port", "6000")
.option("dbNum", "0")
.option("timeout", "2000")
.option("auth", "pwd")
.option("ssl", "true")
.option("sentinel.master", "mymaster")
.option("sentinel.auth", "sentinelPwd")
```

#### Using sparkContext
```scala
val spark = SparkSession
.builder()
.appName("myApp")
.master("local[*]")
.config("spark.redis.host", "host1,host2,host3")
.config("spark.redis.port", "6000")
.config("spark.redis.auth", "passwd")
.config("spark.redis.ssl", "true")
.config("spark.redis.sentinel.master", "mymaster")
.config("spark.redis.sentinel.auth", "sentinelPwd")
.getOrCreate()

val sc = spark.sparkContext
```
5 changes: 4 additions & 1 deletion doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ The supported configuration parameters are:
## Spark Context configuration parameters

* `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster
topology from the initial node, so there is no need to provide the rest of the cluster nodes.
topology from the initial node, so there is no need to provide the rest of the cluster nodes. For sentinel mode all sentinels should be add comma separated `sentinel1,sentinel2,...`
* `spark.redis.port` - the initial node's TCP redis port.
* `spark.redis.auth` - the initial node's AUTH password
* `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode.
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default
* `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100.
* `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100.
* `spark.redis.ssl` - set to true to use tls
* `spark.redis.sentinel.master` - master node name in Sentinel mode
* `spark.redis.sentinel.auth` - the sentinel's password




2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<java.version>1.8</java.version>
<scala.major.version>2.11</scala.major.version>
<scala.complete.version>${scala.major.version}.12</scala.complete.version>
<jedis.version>3.2.0</jedis.version>
<jedis.version>3.3.0</jedis.version>
<spark.version>2.4.1</spark.version>
<plugins.scalatest.version>1.0</plugins.scalatest.version>
</properties>
Expand Down
62 changes: 43 additions & 19 deletions src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
Original file line number Diff line number Diff line change
@@ -1,31 +1,58 @@
package com.redislabs.provider.redis

import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, JedisSentinelPool}
import redis.clients.jedis.exceptions.JedisConnectionException

import java.util.concurrent.ConcurrentHashMap

import redis.clients.jedis.util.Pool

import scala.collection.JavaConversions._


object ConnectionPool {
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] =
new ConcurrentHashMap[RedisEndpoint, JedisPool]()
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, Pool[Jedis]] =
new ConcurrentHashMap[RedisEndpoint, Pool[Jedis]]()

private lazy val buildPoolConfig = {
val poolConfig: JedisPoolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(250)
poolConfig.setMaxIdle(32)
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setMinEvictableIdleTimeMillis(60000)
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
poolConfig.setNumTestsPerEvictionRun(-1)

poolConfig
}

def connect(re: RedisEndpoint): Jedis = {
val pool = pools.getOrElseUpdate(re,
{
val poolConfig: JedisPoolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(250)
poolConfig.setMaxIdle(32)
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setMinEvictableIdleTimeMillis(60000)
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
poolConfig.setNumTestsPerEvictionRun(-1)

new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
val poolConfig = buildPoolConfig

if (null == re.master || re.master.trim.isEmpty) {
new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
} else {
val sentinels = re.host.split(",").map(x => x + ":" + re.port).toSet
new JedisSentinelPool(
re.master.trim, //masterName
sentinels, //set of sentinels
poolConfig, //initial poolConfig
re.timeout, //initial timeOut
2000, //initialsocketTimeout
null, //initaluser
re.auth, //initialPassword
re.dbNum, //initialDbNum
null, //clientName
2000, //SentinelConnTimeout
2000, //SentinelSocketTimeout
null, //SentinelUser
re.sentinelAuth, //SentinelPassword
null //SentinelClientName
)
}
}
)
var sleepTime: Int = 4
Expand All @@ -35,15 +62,12 @@ object ConnectionPool {
conn = pool.getResource
}
catch {
case e: JedisConnectionException if e.getCause.toString.
contains("ERR max number of clients reached") => {
case e: JedisConnectionException if e.getCause.toString.contains("ERR max number of clients reached") =>
if (sleepTime < 500) sleepTime *= 2
Thread.sleep(sleepTime)
}
case e: Exception => throw e
}
}
conn
}
}

16 changes: 10 additions & 6 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
auth: String = null,
dbNum: Int = Protocol.DEFAULT_DATABASE,
timeout: Int = Protocol.DEFAULT_TIMEOUT,
ssl: Boolean = false)
ssl: Boolean = false,
master: String = null,
sentinelAuth: String = null)
extends Serializable {

/**
Expand All @@ -39,7 +41,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
conf.get("spark.redis.auth", null),
conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE),
conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT),
conf.getBoolean("spark.redis.ssl", false)
conf.getBoolean("spark.redis.ssl", defaultValue = false),
conf.get("spark.redis.sentinel.master", null),
conf.get("spark.redis.sentinel.auth", null)
)
}

Expand Down Expand Up @@ -253,8 +257,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt

//simply re-enter this function witht he master host/port
getNonClusterNodes(initialHost = new RedisEndpoint(host, port,
initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl))
getNonClusterNodes(initialHost = RedisEndpoint(host, port,
initialHost.auth, initialHost.dbNum, initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth))

} else {
//this is a master - take its slaves
Expand All @@ -270,7 +274,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val range = nodes.length
(0 until range).map(i =>
RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum,
initialHost.timeout, initialHost.ssl),
initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth),
0, 16383, i, range)).toArray
}
}
Expand Down Expand Up @@ -300,7 +304,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
val port = node.get(1).toString.toInt
RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
initialHost.timeout, initialHost.ssl),
initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth),
sPos,
ePos,
i,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ trait Logging {
}
}

def logWarn(msg: => String): Unit = {
if (logger.isWarnEnabled) {
_logger.warn(msg)
}
}

def logDebug(msg: => String): Unit = {
if (logger.isDebugEnabled) {
_logger.debug(msg)
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.redislabs.provider.redis.util

import org.apache.spark.sql.types.StructType

object SparkUtils {
/**
* Setting the schema column positions the same order as in requiredFields
* @param schema Current schema
* @param requiredColumns Column positions expecting by Catalyst
*/
def alignSchemaWithCatalyst(schema: StructType, requiredColumns: Seq[String]): StructType = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these changes are not related to Sentinels support. Could you please create a separate PR for that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added another PR #252

val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
StructType(requiredColumns.map { c =>
fieldsMap(c)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.spark.sql.redis

import java.nio.charset.StandardCharsets.UTF_8

import com.redislabs.provider.redis.util.SparkUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
Expand Down Expand Up @@ -34,6 +35,10 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType,
requiredColumns: Seq[String]): Row = {
val valuesArray: Array[Any] = SerializationUtils.deserialize(value)
new GenericRowWithSchema(valuesArray, schema)
// Aligning column positions with what Catalyst expecting
val alignedSchema = SparkUtils.alignSchemaWithCatalyst(schema, requiredColumns)
val names = schema.fieldNames
val alignedValuesArray = requiredColumns.toArray.map(f => valuesArray(names.indexOf(f)))
new GenericRowWithSchema(alignedValuesArray, alignedSchema)
}
}
28 changes: 14 additions & 14 deletions src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.{List => JList}

import com.redislabs.provider.redis.rdd.Keys
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
import com.redislabs.provider.redis.util.Logging
import com.redislabs.provider.redis.util.{Logging, SparkUtils}
import com.redislabs.provider.redis.util.PipelineUtils._
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, RedisEndpoint, RedisNode, toRedisContext}
import org.apache.commons.lang3.SerializationUtils
Expand Down Expand Up @@ -40,8 +40,10 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
val auth = parameters.getOrElse("auth", null)
val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE)
val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT)
val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false)
RedisEndpoint(host, port, auth, dbNum, timeout, ssl)
val ssl = parameters.get("ssl").exists(_.toBoolean)
val master = parameters.getOrElse("sentinel.master", null)
val sentinelAuth = parameters.getOrElse("sentinel.auth", null)
RedisEndpoint(host, port, auth, dbNum, timeout, ssl, master, sentinelAuth)
}
)
}
Expand Down Expand Up @@ -159,19 +161,17 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
new GenericRow(Array[Any]())
}
} else {
// filter schema columns, it should be in the same order as given 'requiredColumns'
val requiredSchema = {
val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
val requiredFields = requiredColumns.map { c =>
fieldsMap(c)
}
StructType(requiredFields)
}
val keyType =
/*
For binary its crucial to have a schema, as we cen't infer it and catalyst requiredColumns doesn't guarantee
the same order. Thus the schema is only place where we can read correct attribute positions for binary
*/
val (keyType, requiredSchema) =
if (persistenceModel == SqlOptionModelBinary) {
RedisDataTypeString
if (this.schema == null)
logWarn("Unable to identify the schema when reading a dataframe in Binary mode. It can cause type inconsistency!")
(RedisDataTypeString, this.schema)
} else {
RedisDataTypeHash
(RedisDataTypeHash, SparkUtils.alignSchemaWithCatalyst(this.schema, requiredColumns))
}
keysRdd.mapPartitions { partition =>
// grouped iterator to only allocate memory for a portion of rows
Expand Down