Skip to content

Commit 7a2637f

Browse files
committed
split the streaming API into two separated ones
1 parent 129db3a commit 7a2637f

File tree

3 files changed

+38
-13
lines changed

3 files changed

+38
-13
lines changed

Diff for: README.md

+21-3
Original file line numberDiff line numberDiff line change
@@ -251,15 +251,33 @@ sc.toRedisZSET(zsetRDD, zsetName)
251251
The above example demonstrates storing data in Redis in a Sorted Set. The `zsetRDD` in the example should contain pairs of members and their scores, whereas `zsetName` is the name for that key.
252252

253253
### Streaming
254-
Spark-Redis support streaming data from Redis instance/cluster, currently streaming data are fetched from Redis' List by the `blpop` command. Users are required to provide an array which stores all the List names they are interested in. The [storageLevel](http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization) is `MEMORY_AND_DISK_SER_2` by default, you can change it on your demand. The stream is a `(listName, value)` stream by default, but if you don't care about which list feeds the value, set the `withStreamName` as `false` and you will get the only `value` stream.
254+
Spark-Redis support streaming data from Redis instance/cluster, currently streaming data are fetched from Redis' List by the `blpop` command. Users are required to provide an array which stores all the List names they are interested in. The [storageLevel](http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization) is `MEMORY_AND_DISK_SER_2` by default, you can change it on your demand.
255+
`createRedisStream` will create a `(listName, value)` stream, but if you don't care about which list feeds the value, you can use `createRedisStreamWithoutListname` to get the only `value` stream.
256+
257+
Use the following to get a `(listName, value)` stream from `foo` and `bar` list
258+
259+
```
260+
import org.apache.spark.streaming.{Seconds, StreamingContext}
261+
import org.apache.spark.storage.StorageLevel
262+
import com.redislabs.provider.redis._
263+
val ssc = new StreamingContext(sc, Seconds(1))
264+
val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
265+
redisStream.print
266+
ssc.awaitTermination()
267+
```
268+
269+
270+
Use the following to get a `value` stream from `foo` and `bar` list
271+
255272
```
256273
import org.apache.spark.streaming.{Seconds, StreamingContext}
257274
import org.apache.spark.storage.StorageLevel
258275
import com.redislabs.provider.redis._
259276
val ssc = new StreamingContext(sc, Seconds(1))
260-
val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2, withStreamName = true)
277+
val redisStream = ssc.createRedisStreamWithoutListname(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
278+
redisStream.print
279+
ssc.awaitTermination()
261280
```
262-
The above example will get a (listName, value) stream from `foo` list and `bar` list
263281

264282

265283
### Connecting to Multiple Redis Clusters/Instances

Diff for: src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

+14-9
Original file line numberDiff line numberDiff line change
@@ -394,19 +394,24 @@ class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serial
394394
/**
395395
* @param keys an Array[String] which consists all the Lists we want to listen to
396396
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
397-
* @param withStreamName if it is true the streaming will be (listName, value),
398-
* else the streaming will be only (value)
399-
* default as true
397+
* @return a stream of (listname, value)
400398
*/
401399
def createRedisStream(keys: Array[String],
402-
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
403-
withStreamName: Boolean = true)
400+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
404401
(implicit redisConfig: RedisConfig = new RedisConfig(new
405402
RedisEndpoint(ssc.sparkContext.getConf))) = {
406-
withStreamName match {
407-
case true => new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)])
408-
case false => new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[String])
409-
}
403+
new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)])
404+
}
405+
/**
406+
* @param keys an Array[String] which consists all the Lists we want to listen to
407+
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
408+
* @return a stream of (value)
409+
*/
410+
def createRedisStreamWithoutListname(keys: Array[String],
411+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
412+
(implicit redisConfig: RedisConfig = new RedisConfig(new
413+
RedisEndpoint(ssc.sparkContext.getConf))) = {
414+
new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[String])
410415
}
411416
}
412417

Diff for: src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package com.redislabs.provider.redis.streaming
22

3+
import com.redislabs.provider.redis.RedisConfig
34
import org.apache.curator.utils.ThreadUtils
45
import org.apache.spark.storage.StorageLevel
56
import org.apache.spark.streaming.StreamingContext
67
import org.apache.spark.streaming.receiver.Receiver
78
import org.apache.spark.streaming.dstream.ReceiverInputDStream
9+
810
import redis.clients.jedis._
11+
912
import scala.reflect.{ClassTag, classTag}
1013
import scala.util.control.NonFatal
11-
import com.redislabs.provider.redis.RedisConfig
1214

1315
class RedisInputDStream[T: ClassTag](_ssc: StreamingContext,
1416
keys: Array[String],

0 commit comments

Comments
 (0)