diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala b/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala index 79a2741a..1b00f303 100644 --- a/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala +++ b/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala @@ -29,13 +29,17 @@ private class RedisReceiver[T: ClassTag](keys: Array[String], redisConfig: RedisConfig, streamType: Class[T]) extends Receiver[T](storageLevel) { + + var jedisConnect: Jedis = null def onStart() { val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streaming") try { /* start a executor for each interested List */ - keys.foreach{ key => - executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key)) + keys.foreach{ key =>{ + jedisConnect = redisConfig.connectionForKey(key) + executorPool.submit(new MessageHandler(jedisConnect, key)) + } } } finally { executorPool.shutdown() @@ -43,6 +47,11 @@ private class RedisReceiver[T: ClassTag](keys: Array[String], } def onStop() { + /* quit the connect*/ + if (jedisConnect != null) { + jedisConnect.quit() + jedisConnect = null + } } private class MessageHandler(conn: Jedis, key: String) extends Runnable { @@ -50,7 +59,7 @@ private class RedisReceiver[T: ClassTag](keys: Array[String], try { while(!isStopped) { val response = conn.blpop(2, key) - if (response == null) { + if (response == null || response.isEmpty) { } else if (classTag[T] == classTag[String]) { store(response.get(1).asInstanceOf[T])