-
Notifications
You must be signed in to change notification settings - Fork 370
/
Copy pathConnectionPool.scala
73 lines (64 loc) · 2.4 KB
/
ConnectionPool.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.redislabs.provider.redis
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, JedisSentinelPool}
import redis.clients.jedis.exceptions.JedisConnectionException
import java.util.concurrent.ConcurrentHashMap
import redis.clients.jedis.util.Pool
import scala.collection.JavaConversions._
object ConnectionPool {
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, Pool[Jedis]] =
new ConcurrentHashMap[RedisEndpoint, Pool[Jedis]]()
private lazy val buildPoolConfig = {
val poolConfig: JedisPoolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(250)
poolConfig.setMaxIdle(32)
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setMinEvictableIdleTimeMillis(60000)
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
poolConfig.setNumTestsPerEvictionRun(-1)
poolConfig
}
def connect(re: RedisEndpoint): Jedis = {
val pool = pools.getOrElseUpdate(re,
{
val poolConfig = buildPoolConfig
if (null == re.master || re.master.trim.isEmpty) {
new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
} else {
val sentinels = re.host.split(",").map(x => x + ":" + re.port).toSet
new JedisSentinelPool(
re.master.trim, //masterName
sentinels, //set of sentinels
poolConfig, //initial poolConfig
re.timeout, //initial timeOut
2000, //initialsocketTimeout
null, //initaluser
re.auth, //initialPassword
re.dbNum, //initialDbNum
null, //clientName
2000, //SentinelConnTimeout
2000, //SentinelSocketTimeout
null, //SentinelUser
re.sentinelAuth, //SentinelPassword
null //SentinelClientName
)
}
}
)
var sleepTime: Int = 4
var conn: Jedis = null
while (conn == null) {
try {
conn = pool.getResource
}
catch {
case e: JedisConnectionException if e.getCause.toString.contains("ERR max number of clients reached") =>
if (sleepTime < 500) sleepTime *= 2
Thread.sleep(sleepTime)
case e: Exception => throw e
}
}
conn
}
}