Skip to content

Commit 129db3a

Browse files
committed
Streaming: add value only stream and update readme
1 parent 03027de commit 129db3a

File tree

4 files changed

+68
-36
lines changed

4 files changed

+68
-36
lines changed

README.md

+18-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ A library for reading and writing data from and to [Redis](http://redis.io) with
55

66
Spark-Redis provides access to all of Redis' data structures - String, Hash, List, Set and Sorted Set - from Spark as RDDs. The library can be used both with Redis stand-alone as well as clustered databases. When used with Redis cluster, Spark-Redis is aware of its partitioning scheme and adjusts in response to resharding and node failure events.
77

8+
Spark-Redis also provides Spark-Streaming support.
9+
810
## Minimal requirements
911
You'll need the the following to use Spark-Redis:
1012

@@ -15,7 +17,7 @@ You'll need the the following to use Spark-Redis:
1517

1618
## Known limitations
1719

18-
* Java, Python and R API bindings are not provided at this time
20+
* Java, Python and R API bindings are not provided at this time
1921
* The package was only tested with the following stack:
2022
- Apache Spark v1.4.0
2123
- Scala v2.10.4
@@ -72,13 +74,13 @@ import com.redislabs.provider.redis._
7274
sc = new SparkContext(new SparkConf()
7375
.setMaster("local")
7476
.setAppName("myApp")
75-
77+
7678
// initial redis host - can be any node in cluster mode
7779
.set("redis.host", "localhost")
78-
80+
7981
// initial redis port
8082
.set("redis.port", "6379")
81-
83+
8284
// optional redis AUTH password
8385
.set("redis.auth", "")
8486
)
@@ -248,6 +250,18 @@ sc.toRedisZSET(zsetRDD, zsetName)
248250

249251
The above example demonstrates storing data in Redis in a Sorted Set. The `zsetRDD` in the example should contain pairs of members and their scores, whereas `zsetName` is the name for that key.
250252

253+
### Streaming
254+
Spark-Redis support streaming data from Redis instance/cluster, currently streaming data are fetched from Redis' List by the `blpop` command. Users are required to provide an array which stores all the List names they are interested in. The [storageLevel](http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization) is `MEMORY_AND_DISK_SER_2` by default, you can change it on your demand. The stream is a `(listName, value)` stream by default, but if you don't care about which list feeds the value, set the `withStreamName` as `false` and you will get the only `value` stream.
255+
```
256+
import org.apache.spark.streaming.{Seconds, StreamingContext}
257+
import org.apache.spark.storage.StorageLevel
258+
import com.redislabs.provider.redis._
259+
val ssc = new StreamingContext(sc, Seconds(1))
260+
val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2, withStreamName = true)
261+
```
262+
The above example will get a (listName, value) stream from `foo` list and `bar` list
263+
264+
251265
### Connecting to Multiple Redis Clusters/Instances
252266

253267
```

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

+20-2
Original file line numberDiff line numberDiff line change
@@ -385,13 +385,31 @@ object RedisContext extends Serializable {
385385
}
386386
}
387387

388+
/**
389+
* RedisStreamingContext extends StreamingContext's functionality with Redis
390+
*
391+
* @param ssc a spark StreamingContext
392+
*/
388393
class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable {
389-
def createRedisStream(keys: Array[String], storageLevel: StorageLevel)
394+
/**
395+
* @param keys an Array[String] which consists all the Lists we want to listen to
396+
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
397+
* @param withStreamName if it is true the streaming will be (listName, value),
398+
* else the streaming will be only (value)
399+
* default as true
400+
*/
401+
def createRedisStream(keys: Array[String],
402+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
403+
withStreamName: Boolean = true)
390404
(implicit redisConfig: RedisConfig = new RedisConfig(new
391405
RedisEndpoint(ssc.sparkContext.getConf))) = {
392-
new RedisInputDStream(ssc, keys, redisConfig, storageLevel)
406+
withStreamName match {
407+
case true => new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)])
408+
case false => new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[String])
409+
}
393410
}
394411
}
412+
395413
trait RedisFunctions {
396414
implicit def toRedisContext(sc: SparkContext): RedisContext = new RedisContext(sc)
397415
implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc)

src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala

+30-22
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,65 @@
11
package com.redislabs.provider.redis.streaming
22

3-
import com.redislabs.provider.redis.RedisConfig
43
import org.apache.curator.utils.ThreadUtils
54
import org.apache.spark.storage.StorageLevel
65
import org.apache.spark.streaming.StreamingContext
76
import org.apache.spark.streaming.receiver.Receiver
87
import org.apache.spark.streaming.dstream.ReceiverInputDStream
9-
108
import redis.clients.jedis._
11-
9+
import scala.reflect.{ClassTag, classTag}
1210
import scala.util.control.NonFatal
11+
import com.redislabs.provider.redis.RedisConfig
1312

14-
class RedisInputDStream(_ssc: StreamingContext,
15-
keys: Array[String],
16-
redisConfig: RedisConfig,
17-
storageLevel: StorageLevel)
18-
extends ReceiverInputDStream[(String, String)](_ssc) {
19-
def getReceiver(): Receiver[(String, String)] = {
20-
new RedisReceiver(keys, storageLevel, redisConfig)
13+
class RedisInputDStream[T: ClassTag](_ssc: StreamingContext,
14+
keys: Array[String],
15+
storageLevel: StorageLevel,
16+
redisConfig: RedisConfig,
17+
streamType: Class[T])
18+
extends ReceiverInputDStream[T](_ssc) {
19+
def getReceiver(): Receiver[T] = {
20+
new RedisReceiver(keys, storageLevel, redisConfig, streamType)
2121
}
2222
}
2323

24-
private class RedisReceiver(keys: Array[String],
25-
storageLevel: StorageLevel,
26-
redisConfig: RedisConfig)
27-
extends Receiver[(String, String)](storageLevel) {
24+
25+
private class RedisReceiver[T: ClassTag](keys: Array[String],
26+
storageLevel: StorageLevel,
27+
redisConfig: RedisConfig,
28+
streamType: Class[T])
29+
extends Receiver[T](storageLevel) {
2830

2931
def onStart() {
30-
val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streamming")
32+
val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streaming")
3133
try {
34+
/* start a executor for each interested List */
3235
keys.foreach{ key =>
3336
executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key))
3437
}
3538
} finally {
3639
executorPool.shutdown()
3740
}
38-
3941
}
4042

4143
def onStop() {
4244
}
4345

4446
private class MessageHandler(conn: Jedis, key: String) extends Runnable {
4547
def run() {
46-
val keys: Array[String] = Array(key)
4748
try {
4849
while(!isStopped) {
49-
val res = conn.blpop(Integer.MAX_VALUE, keys:_*)
50-
res match {
51-
case null => "Really?"
52-
case _ => store((res.get(0), res.get(1)))
50+
val res = {
51+
val response = conn.blpop(0, key)
52+
if (classTag[T] == classTag[String]) {
53+
response.get(1)
54+
}
55+
else if (classTag[T] == classTag[(String, String)]) {
56+
(response.get(0), response.get(1))
57+
}
58+
else {
59+
throw new scala.Exception("Unknown Redis Streaming type")
60+
}
5361
}
62+
store(res.asInstanceOf[T])
5463
}
5564
} catch {
5665
case NonFatal(e) =>
@@ -61,4 +70,3 @@ private class RedisReceiver(keys: Array[String],
6170
}
6271
}
6372
}
64-

src/main/scala/com/redislabs/provider/redis/streaming/RedisUtils.scala

-8
This file was deleted.

0 commit comments

Comments
 (0)