Skip to content

Commit bd5efa9

Browse files
authored
add the receiver stop function
When application is stoping , it will pop last a data from redis. It throws exception because it can't store the last data,and this data will lose. This commition has solved this bug in a way that stop the connect. It has a deficiencies.When stopd the connection ,it will throws ConnectException.
1 parent a1aa20e commit bd5efa9

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala

+12-3
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,37 @@ private class RedisReceiver[T: ClassTag](keys: Array[String],
2929
redisConfig: RedisConfig,
3030
streamType: Class[T])
3131
extends Receiver[T](storageLevel) {
32+
33+
var jedisConnect: Jedis = null
3234

3335
def onStart() {
3436
val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streaming")
3537
try {
3638
/* start a executor for each interested List */
37-
keys.foreach{ key =>
38-
executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key))
39+
keys.foreach{ key =>{
40+
jedisConnect = redisConfig.connectionForKey(key)
41+
executorPool.submit(new MessageHandler(jedisConnect, key))
42+
}
3943
}
4044
} finally {
4145
executorPool.shutdown()
4246
}
4347
}
4448

4549
def onStop() {
50+
/* quit the connect*/
51+
if (jedisConnect != null) {
52+
jedisConnect.quit()
53+
jedisConnect = null
54+
}
4655
}
4756

4857
private class MessageHandler(conn: Jedis, key: String) extends Runnable {
4958
def run() {
5059
try {
5160
while(!isStopped) {
5261
val response = conn.blpop(2, key)
53-
if (response == null) {
62+
if (response == null || response.isEmpty) {
5463

5564
} else if (classTag[T] == classTag[String]) {
5665
store(response.get(1).asInstanceOf[T])

0 commit comments

Comments
 (0)