Skip to content

Commit 7b59c76

Browse files
committed
Use jedisPool instead of single jedis connection
1 parent cdec1bb commit 7b59c76

File tree

2 files changed

+37
-29
lines changed

2 files changed

+37
-29
lines changed

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

+36-28
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,58 @@ package com.redislabs.provider.redis
22

33
import java.net.URI
44

5+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
56
import org.apache.spark.SparkConf
6-
import redis.clients.jedis.Jedis
7+
import redis.clients.jedis.{JedisPool, Jedis, Protocol}
78
import redis.clients.util.{JedisURIHelper, SafeEncoder, JedisClusterCRC16}
89
import scala.collection.JavaConversions._
910

1011

1112
/**
12-
* RedisEndpoint represents a redis connection endpoint info: host, port, auth password and db number
13+
* RedisEndpoint represents a redis connection endpoint info: host, port, auth password
14+
* db number, and timeout
15+
*
1316
* @param host the redis host or ip
1417
* @param port the redis port
1518
* @param auth the authentication password
1619
* @param dbNum database number (should be avoided in general)
1720
*/
18-
class RedisEndpoint(val host: String, val port: Int, val auth: String = "", val dbNum: Int = 0 )
21+
case class RedisEndpoint(val host: String = Protocol.DEFAULT_HOST,
22+
val port: Int = Protocol.DEFAULT_PORT,
23+
val auth: String = null,
24+
val dbNum: Int = Protocol.DEFAULT_DATABASE,
25+
val timeout: Int = Protocol.DEFAULT_TIMEOUT)
1926
extends Serializable {
2027

28+
@transient private var pool: JedisPool = null
29+
2130
/**
2231
* Constructor from spark config. set params with redis.host, redis.port, redis.auth and redis.db
32+
*
2333
* @param conf spark context config
2434
*/
2535
def this(conf: SparkConf) {
2636
this(
27-
conf.get("redis.host", "localhost"),
28-
conf.get("redis.port", "6379").toInt,
29-
conf.get("redis.auth", ""),
30-
conf.get("redis.db", "0").toInt
37+
conf.get("redis.host", Protocol.DEFAULT_HOST),
38+
conf.getInt("redis.port", Protocol.DEFAULT_PORT),
39+
conf.get("redis.auth", null),
40+
conf.getInt("redis.db", Protocol.DEFAULT_DATABASE),
41+
conf.getInt("redis.timeout", Protocol.DEFAULT_TIMEOUT)
3142
)
3243
}
3344

3445
/**
3546
* Constructor with Jedis URI
47+
*
3648
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]
3749
*/
38-
3950
def this(uri: URI) {
4051
this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri))
4152
}
4253

4354
/**
4455
* Constructor with Jedis URI from String
56+
*
4557
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]
4658
*/
4759
def this(uri :String) {
@@ -54,36 +66,32 @@ class RedisEndpoint(val host: String, val port: Int, val auth: String = "", val
5466
*
5567
* @return a new Jedis instance
5668
*/
57-
def connect():Jedis = {
58-
val client = new Jedis(this.host, this.port)
59-
60-
// if a password was set - auth
61-
if (!Option(auth).getOrElse("").isEmpty) {
62-
client.auth(auth)
69+
def connect(): Jedis = {
70+
if (pool == null) {
71+
pool = new JedisPool(new GenericObjectPoolConfig(), host, port, timeout, auth, dbNum)
6372
}
64-
65-
// if a db num was set, select it
66-
if (dbNum > 0) {
67-
client.select(dbNum)
68-
}
69-
70-
client
73+
pool.getResource
7174
}
7275
}
7376

7477
case class RedisNode(val endpoint: RedisEndpoint,
7578
val startSlot: Int,
7679
val endSlot: Int,
7780
val idx: Int,
78-
val total: Int)
81+
val total: Int) {
82+
def connect(): Jedis = {
83+
endpoint.connect
84+
}
85+
86+
}
7987

8088
/**
8189
* RedisConfig holds the state of the cluster nodes, and uses consistent hashing to map
8290
* keys to nodes
8391
*/
8492
class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
8593

86-
val currentAddr = initialHost.host
94+
val initialAddr = initialHost.host
8795

8896
val hosts = getHosts(initialHost)
8997
val nodes = getNodes(initialHost)
@@ -126,8 +134,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
126134
* @return jedis who is a connection for a given key
127135
*/
128136
def connectionForKey(key: String): Jedis = {
129-
val host = getHost(key).endpoint
130-
host.connect
137+
getHost(key).connect
131138
}
132139

133140
/**
@@ -194,7 +201,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
194201
val nodes = master +: slaves
195202
val range = nodes.size
196203
(0 until range).map(i =>
197-
RedisNode(new RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum),
204+
RedisNode(new RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum,
205+
initialHost.timeout),
198206
0, 16383, i, range)).toArray
199207
}
200208
}
@@ -205,7 +213,6 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
205213
*/
206214
private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
207215
val conn = initialHost.connect()
208-
val slots = conn.clusterSlots()
209216
val res = conn.clusterSlots().flatMap {
210217
slotInfoObj => {
211218
val slotInfo = slotInfoObj.asInstanceOf[java.util.List[java.lang.Object]]
@@ -224,7 +231,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
224231
val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
225232
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
226233
val port = node.get(1).toString.toInt
227-
RedisNode(new RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum),
234+
RedisNode(new RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
235+
initialHost.timeout),
228236
sPos,
229237
ePos,
230238
i,

src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class RedisKeysRDD(sc: SparkContext,
125125
extends RDD[String](sc, Seq.empty) with Logging with Keys {
126126

127127
override protected def getPreferredLocations(split: Partition): Seq[String] = {
128-
Seq(split.asInstanceOf[RedisPartition].redisConfig.currentAddr)
128+
Seq(split.asInstanceOf[RedisPartition].redisConfig.initialAddr)
129129
}
130130

131131
/**

0 commit comments

Comments
 (0)