Skip to content

Commit 03027de

Browse files
committed
Streaming: do some cleaning
1 parent 522e21b commit 03027de

File tree

4 files changed

+21
-13
lines changed

4 files changed

+21
-13
lines changed

src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class RedisKeysRDD(sc: SparkContext,
191191
val keyPattern: String = "*",
192192
val partitionNum: Int = 3,
193193
val keys: Array[String] = null)
194-
extends RDD[String](sc, Seq.empty) with Logging with Keys {
194+
extends RDD[String](sc, Seq.empty) with Keys {
195195

196196
override protected def getPreferredLocations(split: Partition): Seq[String] = {
197197
Seq(split.asInstanceOf[RedisPartition].redisConfig.initialAddr)

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import org.apache.spark.rdd.RDD
77
import com.redislabs.provider.redis.rdd._
88
import org.apache.spark.storage.StorageLevel
99
import org.apache.spark.streaming.StreamingContext
10-
import redis.clients.util.RedisInputStream
1110

1211
/**
1312
* RedisContext extends sparkContext's functionality with redis functions
@@ -218,13 +217,6 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
218217
}
219218
}
220219

221-
def createRedisStream(ssc: StreamingContext, keys: Array[String], storageLevel: StorageLevel)
222-
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
223-
new RedisInputDStream(ssc, keys, redisConfig, storageLevel)
224-
}
225-
226-
227-
228220
/**
229221
* @param kvs Pair RDD of K/V
230222
* @param ttl time to live
@@ -393,7 +385,15 @@ object RedisContext extends Serializable {
393385
}
394386
}
395387

388+
class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable {
389+
def createRedisStream(keys: Array[String], storageLevel: StorageLevel)
390+
(implicit redisConfig: RedisConfig = new RedisConfig(new
391+
RedisEndpoint(ssc.sparkContext.getConf))) = {
392+
new RedisInputDStream(ssc, keys, redisConfig, storageLevel)
393+
}
394+
}
396395
trait RedisFunctions {
397396
implicit def toRedisContext(sc: SparkContext): RedisContext = new RedisContext(sc)
397+
implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc)
398398
}
399399

src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import redis.clients.jedis._
1111

1212
import scala.util.control.NonFatal
1313

14-
import scala.collection.JavaConversions._
15-
1614
class RedisInputDStream(_ssc: StreamingContext,
1715
keys: Array[String],
1816
redisConfig: RedisConfig,
@@ -48,8 +46,10 @@ private class RedisReceiver(keys: Array[String],
4846
val keys: Array[String] = Array(key)
4947
try {
5048
while(!isStopped) {
51-
conn.blpop(keys: _*).toList.foreach{ x =>
52-
store((key, x))
49+
val res = conn.blpop(Integer.MAX_VALUE, keys:_*)
50+
res match {
51+
case null => "Really?"
52+
case _ => store((res.get(0), res.get(1)))
5353
}
5454
}
5555
} catch {
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.redislabs.provider.redis.streaming
2+
3+
/**
4+
* Created by sunheehnus on 16/3/29.
5+
*/
6+
class RedisUtils {
7+
8+
}

0 commit comments

Comments
 (0)