Skip to content

Commit 12fd30d

Browse files
committed
Support ZSET Rdds from sorted sets
related to RedisLabs#13
1 parent 7b59c76 commit 12fd30d

File tree

2 files changed

+199
-28
lines changed

2 files changed

+199
-28
lines changed

scalastyle-config.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ var notAtAllAMagicNumber = 1234
7272
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
7373
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
7474
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
75-
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
76-
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
75+
<!--<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>-->
76+
<!--<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>-->
7777
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
7878
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
7979
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>

src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala

+197-26
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package com.redislabs.provider.redis.rdd
22

3+
import redis.clients.jedis._
4+
import redis.clients.util.JedisClusterCRC16
5+
6+
import scala.collection.JavaConversions._
7+
import scala.collection.JavaConverters._
8+
39
import java.util
10+
import java.util.concurrent.ConcurrentHashMap
411

512
import com.redislabs.provider.redis.{RedisNode, RedisConfig}
13+
import com.redislabs.provider.redis.partitioner._
14+
615
import org.apache.spark.rdd.RDD
716
import org.apache.spark._
8-
import redis.clients.jedis._
9-
import redis.clients.util.JedisClusterCRC16
1017

11-
import scala.collection.JavaConversions._
12-
import com.redislabs.provider.redis.partitioner._
18+
import scala.reflect.ClassTag
1319

1420

1521
class RedisKVRDD(prev: RDD[String],
@@ -29,7 +35,6 @@ class RedisKVRDD(prev: RDD[String],
2935
rddType match {
3036
case "kv" => getKV(nodes, keys)
3137
case "hash" => getHASH(nodes, keys)
32-
case "zset" => getZSET(nodes, keys)
3338
}
3439
}
3540

@@ -60,19 +65,6 @@ class RedisKVRDD(prev: RDD[String],
6065
}
6166
}.iterator
6267
}
63-
def getZSET(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = {
64-
groupKeysByNode(nodes, keys).flatMap {
65-
x =>
66-
{
67-
val conn = x._1.endpoint.connect()
68-
val zsetKeys = filterKeysByType(conn, x._2, "zset")
69-
val res = zsetKeys.flatMap(k => conn.zrangeWithScores(k, 0, -1)).
70-
map(tup => (tup.getElement, tup.getScore.toString)).iterator
71-
conn.close
72-
res
73-
}
74-
}.iterator
75-
}
7668
}
7769

7870
class RedisListRDD(prev: RDD[String], val rddType: String) extends RDD[String](prev) with Keys {
@@ -117,6 +109,78 @@ class RedisListRDD(prev: RDD[String], val rddType: String) extends RDD[String](p
117109
}
118110
}
119111

112+
class RedisZSetRDD[K](prev: RDD[String],
113+
zsetConf: RedisZSetConf,
114+
rddType: K)
115+
(implicit val kClassTag: ClassTag[K])
116+
extends RDD[K](prev) with Keys {
117+
118+
override def getPartitions: Array[Partition] = prev.partitions
119+
120+
override def compute(split: Partition, context: TaskContext): Iterator[K] = {
121+
val partition: RedisPartition = split.asInstanceOf[RedisPartition]
122+
val sPos = partition.slots._1
123+
val ePos = partition.slots._2
124+
val nodes = partition.redisConfig.getNodesBySlots(sPos, ePos)
125+
val keys = firstParent[String].iterator(split, context)
126+
val auth = partition.redisConfig.getAuth
127+
val db = partition.redisConfig.getDB
128+
zsetConf.getType match {
129+
case "byRange" => getZSetByRange(nodes, keys, zsetConf.getStartPos, zsetConf.getEndPos).
130+
asInstanceOf[Iterator[K]]
131+
case "byScore" => getZSetByScore(nodes, keys, zsetConf.getMinScore, zsetConf.getMaxScore).
132+
asInstanceOf[Iterator[K]]
133+
}
134+
}
135+
136+
private def getZSetByRange(nodes: Array[RedisNode],
137+
keys: Iterator[String],
138+
startPos: Long,
139+
endPos: Long) = {
140+
groupKeysByNode(nodes, keys).flatMap {
141+
x =>
142+
{
143+
val conn = x._1.endpoint.connect()
144+
val zsetKeys = filterKeysByType(conn, x._2, "zset")
145+
val res = {
146+
if (zsetConf.getWithScore) {
147+
zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos)).
148+
map(tup => (tup.getElement, tup.getScore)).iterator
149+
}
150+
else {
151+
zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos)).iterator
152+
}
153+
}
154+
conn.close
155+
res
156+
}
157+
}.iterator
158+
}
159+
160+
private def getZSetByScore(nodes: Array[RedisNode],
161+
keys: Iterator[String],
162+
startScore: Double,
163+
endScore: Double) = {
164+
groupKeysByNode(nodes, keys).flatMap {
165+
x =>
166+
{
167+
val conn = x._1.endpoint.connect()
168+
val zsetKeys = filterKeysByType(conn, x._2, "zset")
169+
val res = {
170+
if (zsetConf.getWithScore) {
171+
zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore)).
172+
map(tup => (tup.getElement, tup.getScore)).iterator
173+
} else {
174+
zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore)).iterator
175+
}
176+
}
177+
conn.close
178+
res
179+
}
180+
}.iterator
181+
}
182+
}
183+
120184
class RedisKeysRDD(sc: SparkContext,
121185
val redisConfig: RedisConfig,
122186
val keyPattern: String = "*",
@@ -212,11 +276,118 @@ class RedisKeysRDD(sc: SparkContext,
212276
def getHash(): RDD[(String, String)] = {
213277
new RedisKVRDD(this, "hash")
214278
}
215-
def getZSet(): RDD[(String, String)] = {
216-
new RedisKVRDD(this, "zset")
279+
def getZSet(): RDD[(String, Double)] = {
280+
val zsetConf: RedisZSetConf = new RedisZSetConf().
281+
set("withScore", "true").
282+
set("type", "byRange").
283+
set("startPos", "0").
284+
set("endPos", "-1")
285+
new RedisZSetRDD(this, zsetConf, ("String", 0.1))
286+
}
287+
def getZSetByRange(startPos: Long, endPos: Long, withScore: Boolean) = {
288+
val zsetConf: RedisZSetConf = new RedisZSetConf().
289+
set("withScore", withScore.toString).
290+
set("type", "byRange").
291+
set("startPos", startPos.toString).
292+
set("endPos", endPos.toString)
293+
// new RedisZSetRDD(this, zsetConf, ("String", 0.1))
294+
new RedisZSetRDD(this, zsetConf, if (withScore) ("String", 0.1) else "String")
295+
}
296+
def getZSetByScore(min: Double, max: Double, withScore: Boolean) = {
297+
val zsetConf: RedisZSetConf = new RedisZSetConf().
298+
set("withScore", withScore.toString).
299+
set("type", "byScore").
300+
set("minScore", min.toString).
301+
set("maxScore", max.toString)
302+
// new RedisZSetRDD(this, zsetConf, ("String", 0.1))
303+
new RedisZSetRDD(this, zsetConf, if (withScore) ("String", 0.1) else "String")
304+
}
305+
}
306+
307+
308+
class RedisZSetConf() extends Serializable {
309+
310+
private val settings = new ConcurrentHashMap[String, String]()
311+
312+
def set(key: String, value: String): RedisZSetConf = {
313+
if (key == null) {
314+
throw new NullPointerException("null key")
315+
}
316+
if (value == null) {
317+
throw new NullPointerException("null value for " + key)
318+
}
319+
settings.put(key, value)
320+
this
321+
}
322+
323+
def remove(key: String): RedisZSetConf = {
324+
settings.remove(key)
325+
this
326+
}
327+
328+
def contains(key: String): Boolean = {
329+
settings.containsKey(key)
330+
}
331+
332+
def get(key: String): String = {
333+
Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
334+
}
335+
336+
def get(key: String, defaultValue: String): String = {
337+
Option(settings.get(key)).getOrElse(defaultValue)
338+
}
339+
340+
def getInt(key: String): Int = {
341+
get(key).toInt
342+
}
343+
344+
def getInt(key: String, defaultValue: Int): Int = {
345+
get(key, defaultValue.toString).toInt
346+
}
347+
348+
def getLong(key: String): Long = {
349+
get(key).toLong
350+
}
351+
352+
def getLong(key: String, defaultValue: Long): Long = {
353+
get(key, defaultValue.toString).toLong
354+
}
355+
356+
def getDouble(key: String): Double = {
357+
get(key).toDouble
358+
}
359+
360+
def getDouble(key: String, defaultValue: Double): Double = {
361+
get(key, defaultValue.toString).toDouble
362+
}
363+
364+
def getBoolean(key: String): Boolean = {
365+
get(key).toBoolean
366+
}
367+
368+
def getBoolean(key: String, defaultValue: Boolean): Boolean = {
369+
get(key, defaultValue.toString).toBoolean
370+
}
371+
372+
def getAll: Array[(String, String)] = {
373+
settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
217374
}
375+
376+
def getType: String = get("type")
377+
378+
def getWithScore = getBoolean("withScore")
379+
380+
def getStartPos: Long = getLong("startPos", 0)
381+
382+
def getEndPos: Long = getLong("endPos", -1)
383+
384+
def getMinScore: Double = getDouble("minScore")
385+
386+
def getMaxScore: Double = getDouble("maxScore")
218387
}
219388

389+
390+
220391
trait Keys {
221392
/**
222393
* @param key
@@ -230,12 +401,12 @@ trait Keys {
230401
escape match {
231402
case true => judge(key.substring(1), false)
232403
case false => key.charAt(0) match {
233-
case '*' => true
234-
case '?' => true
235-
case '[' => true
236-
case '\\' => judge(key.substring(1), true)
237-
case _ => judge(key.substring(1), false)
238-
}
404+
case '*' => true
405+
case '?' => true
406+
case '[' => true
407+
case '\\' => judge(key.substring(1), true)
408+
case _ => judge(key.substring(1), false)
409+
}
239410
}
240411
}
241412
}

0 commit comments

Comments
 (0)