Skip to content

Commit 522e21b

Browse files
committed
NEW FEATURE: spark-streaming support
1 parent ed7b50f commit 522e21b

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package com.redislabs.provider.redis
22

3+
import com.redislabs.provider.redis.streaming.RedisInputDStream
34
import org.apache.spark.SparkContext
45
import org.apache.spark.rdd.RDD
56

67
import com.redislabs.provider.redis.rdd._
8+
import org.apache.spark.storage.StorageLevel
9+
import org.apache.spark.streaming.StreamingContext
10+
import redis.clients.util.RedisInputStream
711

812
/**
913
* RedisContext extends sparkContext's functionality with redis functions
14+
*
1015
* @param sc a spark context
1116
*/
1217
class RedisContext(@transient val sc: SparkContext) extends Serializable {
@@ -213,6 +218,12 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
213218
}
214219
}
215220

221+
def createRedisStream(ssc: StreamingContext, keys: Array[String], storageLevel: StorageLevel)
222+
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
223+
new RedisInputDStream(ssc, keys, redisConfig, storageLevel)
224+
}
225+
226+
216227

217228
/**
218229
* @param kvs Pair RDD of K/V
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.redislabs.provider.redis.streaming
2+
3+
import com.redislabs.provider.redis.RedisConfig
4+
import org.apache.curator.utils.ThreadUtils
5+
import org.apache.spark.storage.StorageLevel
6+
import org.apache.spark.streaming.StreamingContext
7+
import org.apache.spark.streaming.receiver.Receiver
8+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
9+
10+
import redis.clients.jedis._
11+
12+
import scala.util.control.NonFatal
13+
14+
import scala.collection.JavaConversions._
15+
16+
class RedisInputDStream(_ssc: StreamingContext,
17+
keys: Array[String],
18+
redisConfig: RedisConfig,
19+
storageLevel: StorageLevel)
20+
extends ReceiverInputDStream[(String, String)](_ssc) {
21+
def getReceiver(): Receiver[(String, String)] = {
22+
new RedisReceiver(keys, storageLevel, redisConfig)
23+
}
24+
}
25+
26+
private class RedisReceiver(keys: Array[String],
27+
storageLevel: StorageLevel,
28+
redisConfig: RedisConfig)
29+
extends Receiver[(String, String)](storageLevel) {
30+
31+
def onStart() {
32+
val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streamming")
33+
try {
34+
keys.foreach{ key =>
35+
executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key))
36+
}
37+
} finally {
38+
executorPool.shutdown()
39+
}
40+
41+
}
42+
43+
def onStop() {
44+
}
45+
46+
private class MessageHandler(conn: Jedis, key: String) extends Runnable {
47+
def run() {
48+
val keys: Array[String] = Array(key)
49+
try {
50+
while(!isStopped) {
51+
conn.blpop(keys: _*).toList.foreach{ x =>
52+
store((key, x))
53+
}
54+
}
55+
} catch {
56+
case NonFatal(e) =>
57+
restart("Error receiving data", e)
58+
} finally {
59+
onStop()
60+
}
61+
}
62+
}
63+
}
64+

0 commit comments

Comments
 (0)