Skip to content

Commit 16d0754

Browse files
committed
update comments for the RedisNode refactor
1 parent 405fa0b commit 16d0754

File tree

3 files changed

+67
-73
lines changed

3 files changed

+67
-73
lines changed

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.redislabs.provider.redis
33
import java.net.URI
44

55
import org.apache.spark.SparkConf
6-
import redis.clients.jedis.{JedisFactory, Jedis}
6+
import redis.clients.jedis.Jedis
77
import redis.clients.util.{JedisURIHelper, SafeEncoder, JedisClusterCRC16}
88
import scala.collection.JavaConversions._
99

@@ -18,7 +18,6 @@ import scala.collection.JavaConversions._
1818
class RedisEndpoint(val host: String, val port: Int, val auth: String = "", val dbNum: Int = 0 )
1919
extends Serializable {
2020

21-
2221
/**
2322
* Constructor from spark config. set params with redis.host, redis.port, redis.auth and redis.db
2423
* @param conf spark context config
@@ -32,7 +31,6 @@ class RedisEndpoint(val host: String, val port: Int, val auth: String = "", val
3231
)
3332
}
3433

35-
3634
/**
3735
* Constructor with Jedis URI
3836
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]
@@ -50,11 +48,10 @@ class RedisEndpoint(val host: String, val port: Int, val auth: String = "", val
5048
this(URI.create(uri))
5149
}
5250

53-
5451
/**
5552
* Connect tries to open a connection to the redis endpoint,
5653
* optionally authenticating and selecting a db
57-
*
54+
*
5855
* @return a new Jedis instance
5956
*/
6057
def connect():Jedis = {
@@ -71,7 +68,6 @@ class RedisEndpoint(val host: String, val port: Int, val auth: String = "", val
7168
}
7269

7370
client
74-
7571
}
7672
}
7773

@@ -87,16 +83,21 @@ case class RedisNode(val endpoint: RedisEndpoint,
8783
*/
8884
class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
8985

90-
9186
val currentAddr = initialHost.host
9287

9388
val hosts = getHosts(initialHost)
9489
val nodes = getNodes(initialHost)
9590

91+
/**
92+
* @return initialHost's auth
93+
*/
9694
def getAuth: String = {
9795
initialHost.auth
9896
}
9997

98+
/**
99+
* @return selected db number
100+
*/
100101
def getDB :Int = {
101102
initialHost.dbNum
102103
}
@@ -106,6 +107,11 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
106107
hosts(rnd)
107108
}
108109

110+
/**
111+
* @param sPos start slot number
112+
* @param ePos end slot number
113+
* @return a list of RedisNode whose slots union [sPos, ePos] is not null
114+
*/
109115
def getNodesBySlots(sPos: Int, ePos: Int): Array[RedisNode] = {
110116
def inter(sPos1: Int, ePos1: Int, sPos2: Int, ePos2: Int) =
111117
if (sPos1 <= sPos2) ePos1 >= sPos2 else ePos2 >= sPos1
@@ -114,14 +120,17 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
114120
filter(_.idx == 0) //master only now
115121
}
116122

117-
/** Get a jedis connection for a given key */
123+
/**
124+
* @param key
125+
* @return jedis who is a connection for a given key
126+
*/
118127
def connectionForKey(key: String): Jedis = {
119128
val host = getHost(key).endpoint
120129
host.connect
121130
}
122131

123132
/**
124-
* @param initialHost any addr and port of a cluster or a single server
133+
* @param initialHost any redis endpoint of a cluster or a single server
125134
* @return true if the target server is in cluster mode
126135
*/
127136
private def clusterEnabled(initialHost: RedisEndpoint): Boolean = {
@@ -144,16 +153,16 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
144153

145154

146155
/**
147-
* @param initialHost any addr and port of a cluster or a single server
148-
* @return list of hosts(addr, port, startSlot, endSlot)
156+
* @param initialHost any redis endpoint of a cluster or a single server
157+
* @return list of host nodes
149158
*/
150159
private def getHosts(initialHost: RedisEndpoint): Array[RedisNode] = {
151160
getNodes(initialHost).filter(_.idx == 0)
152161
}
153162

154163
/**
155-
* @param initialHost any addr and port of a single server
156-
* @return list of nodes(addr, port, index, range, startSlot, endSlot)
164+
* @param initialHost any redis endpoint of a single server
165+
* @return list of nodes
157166
*/
158167
private def getNonClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
159168
val master = (initialHost.host, initialHost.port)
@@ -190,8 +199,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
190199
}
191200

192201
/**
193-
* @param initialHost any addr and port of a cluster server
194-
* @return list of nodes(addr, port, index, range, startSlot, endSlot)
202+
* @param initialHost any redis endpoint of a cluster server
203+
* @return list of nodes
195204
*/
196205
private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
197206
val conn = initialHost.connect()
@@ -218,8 +227,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
218227
}
219228

220229
/**
221-
* @param initialHost any addr and port of a cluster or a single server
222-
* @return list of nodes(addr, port, index, range, startSlot, endSlot)
230+
* @param initialHost any redis endpoint of a cluster or a single server
231+
* @return list of nodes
223232
*/
224233
def getNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
225234
if (clusterEnabled(initialHost)) {

src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala

Lines changed: 36 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.redislabs.provider.redis.rdd
22

33
import java.util
44

5-
import com.redislabs.provider.redis.{RedisNode, RedisEndpoint, RedisConfig}
5+
import com.redislabs.provider.redis.{RedisNode, RedisConfig}
66
import org.apache.spark.rdd.RDD
77
import org.apache.spark._
88
import redis.clients.jedis._
@@ -33,9 +33,7 @@ class RedisKVRDD(prev: RDD[String],
3333
}
3434
}
3535

36-
def getKV(nodes: Array[RedisNode],
37-
keys: Iterator[String]): Iterator[(String, String)] = {
38-
36+
def getKV(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = {
3937
groupKeysByNode(nodes, keys).flatMap {
4038
x =>
4139
{
@@ -50,8 +48,7 @@ class RedisKVRDD(prev: RDD[String],
5048
}
5149
}.iterator
5250
}
53-
def getHASH(nodes: Array[RedisNode],
54-
keys: Iterator[String]): Iterator[(String, String)] = {
51+
def getHASH(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = {
5552
groupKeysByNode(nodes, keys).flatMap {
5653
x =>
5754
{
@@ -63,8 +60,7 @@ class RedisKVRDD(prev: RDD[String],
6360
}
6461
}.iterator
6562
}
66-
def getZSET(nodes: Array[RedisNode],
67-
keys: Iterator[String]): Iterator[(String, String)] = {
63+
def getZSET(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = {
6864
groupKeysByNode(nodes, keys).flatMap {
6965
x =>
7066
{
@@ -79,9 +75,7 @@ class RedisKVRDD(prev: RDD[String],
7975
}
8076
}
8177

82-
class RedisListRDD(prev: RDD[String],
83-
val rddType: String)
84-
extends RDD[String](prev) with Keys {
78+
class RedisListRDD(prev: RDD[String], val rddType: String) extends RDD[String](prev) with Keys {
8579

8680
override def getPartitions: Array[Partition] = prev.partitions
8781

@@ -97,8 +91,7 @@ class RedisListRDD(prev: RDD[String],
9791
}
9892
}
9993

100-
def getSET(nodes: Array[RedisNode],
101-
keys: Iterator[String]): Iterator[String] = {
94+
def getSET(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[String] = {
10295
groupKeysByNode(nodes, keys).flatMap {
10396
x =>
10497
{
@@ -110,8 +103,7 @@ class RedisListRDD(prev: RDD[String],
110103
}
111104
}.iterator
112105
}
113-
def getLIST(nodes: Array[RedisNode],
114-
keys: Iterator[String]): Iterator[String] = {
106+
def getLIST(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[String] = {
115107
groupKeysByNode(nodes, keys).flatMap {
116108
x =>
117109
{
@@ -132,17 +124,16 @@ class RedisKeysRDD(sc: SparkContext,
132124
val keys: Array[String] = null)
133125
extends RDD[String](sc, Seq.empty) with Logging with Keys {
134126

135-
136127
override protected def getPreferredLocations(split: Partition): Seq[String] = {
137128
Seq(split.asInstanceOf[RedisPartition].redisConfig.currentAddr)
138129
}
139130

140131
/**
141-
* hosts(ip:String, port:Int, startSlot:Int, endSlot:Int) are generated by the redis-cluster's
142-
* hash tragedy and partitionNum to divied the cluster to partitionNum
143-
*
144-
* @return hosts
145-
*/
132+
* hosts(ip:String, port:Int, startSlot:Int, endSlot:Int) are generated by the redis-cluster's
133+
* hash tragedy and partitionNum to divide the cluster to partitionNum
134+
*
135+
* @return hosts
136+
*/
146137
private def scaleHostsWithPartitionNum(): Seq[(String, Int, Int, Int)] = {
147138
def split(host: RedisNode, cnt: Int) = {
148139
val endpoint = host.endpoint
@@ -205,7 +196,7 @@ class RedisKeysRDD(sc: SparkContext,
205196
if (Option(this.keys).isDefined) {
206197
this.keys.iterator
207198
} else {
208-
getKeys(nodes, sPos, ePos, keyPattern, partition.redisConfig).iterator
199+
getKeys(nodes, sPos, ePos, keyPattern).iterator
209200
}
210201

211202
}
@@ -228,9 +219,9 @@ class RedisKeysRDD(sc: SparkContext,
228219

229220
trait Keys {
230221
/**
231-
* @param key
232-
* @return true if the key is a RedisRegex
233-
*/
222+
* @param key
223+
* @return true if the key is a RedisRegex
224+
*/
234225
private def isRedisRegex(key: String) = {
235226
def judge(key: String, escape: Boolean): Boolean = {
236227
if (key.length == 0) {
@@ -252,10 +243,10 @@ trait Keys {
252243
}
253244

254245
/**
255-
* @param jedis
256-
* @param params
257-
* @return keys of params pattern in jedis
258-
*/
246+
* @param jedis
247+
* @param params
248+
* @return keys of params pattern in jedis
249+
*/
259250
private def scanKeys(jedis: Jedis, params: ScanParams): util.HashSet[String] = {
260251
val keys = new util.HashSet[String]
261252
var cursor = "0"
@@ -268,17 +259,16 @@ trait Keys {
268259
}
269260

270261
/**
271-
* @param nodes list of (IP:String, port:Int, index:Int, range:Int, startSlot:Int, endSlot:Int)
272-
* @param sPos start position of slots
273-
* @param ePos end position of slots
274-
* @param keyPattern
275-
* return keys whose slot is in [sPos, ePos]
276-
*/
262+
* @param nodes list of RedisNode
263+
* @param sPos start position of slots
264+
* @param ePos end position of slots
265+
* @param keyPattern
266+
* return keys whose slot is in [sPos, ePos]
267+
*/
277268
def getKeys(nodes: Array[RedisNode],
278269
sPos: Int,
279270
ePos: Int,
280-
keyPattern: String,
281-
config: RedisConfig): util.HashSet[String] = {
271+
keyPattern: String): util.HashSet[String] = {
282272
val keys = new util.HashSet[String]()
283273
if (isRedisRegex(keyPattern)) {
284274
nodes.foreach(node => {
@@ -299,10 +289,10 @@ trait Keys {
299289
}
300290

301291
/**
302-
* @param nodes list of (IP:String, port:Int, index:Int, range:Int, startSlot:Int, endSlot:Int)
303-
* @param keys list of keys
304-
* return (node: (key1, key2, ...), node2: (key3, key4,...), ...)
305-
*/
292+
* @param nodes list of RedisNode
293+
* @param keys list of keys
294+
* return (node: (key1, key2, ...), node2: (key3, key4,...), ...)
295+
*/
306296
def groupKeysByNode(nodes: Array[RedisNode], keys: Iterator[String]):
307297
Array[(RedisNode, Array[String])] = {
308298
def getNode(key: String): RedisNode = {
@@ -315,11 +305,11 @@ trait Keys {
315305
}
316306

317307
/**
318-
* @param conn
319-
* @param keys
320-
* keys are guaranteed that they belongs with the server jedis connected to.
321-
* Filter all the keys of "t" type.
322-
*/
308+
* @param conn
309+
* @param keys
310+
* keys are guaranteed that they belongs with the server jedis connected to.
311+
* return keys of "t" type
312+
*/
323313
def filterKeysByType(conn: Jedis, keys:Array[String], t:String): Array[String] = {
324314
val pipeline = conn.pipelined
325315
keys.foreach(pipeline.`type`)

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package com.redislabs.provider.redis
22

33
import org.apache.spark.SparkContext
44
import org.apache.spark.rdd.RDD
5-
import redis.clients.jedis.Jedis
65

7-
import redis.clients.util.{SafeEncoder, JedisClusterCRC16}
86
import com.redislabs.provider.redis.rdd._
97

108
/**
@@ -26,17 +24,19 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
2624
RedisKeysRDD = {
2725

2826
new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null);
29-
3027
}
3128

32-
29+
/**
30+
* @param keys an array of keys
31+
* @param partitionNum number of partitions
32+
* @return RedisKeysRDD of simple Keys stored in redis server
33+
*/
3334
def fromRedisKeys(keys: Array[String],
3435
partitionNum: Int = 3)
3536
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
3637
RedisKeysRDD = {
3738

3839
new RedisKeysRDD(sc, redisConfig, "", partitionNum, keys);
39-
4040
}
4141

4242
/**
@@ -117,7 +117,6 @@ object RedisContext extends Serializable {
117117
*/
118118
def setKVs(arr: Iterator[(String, String)], redisConfig: RedisConfig) {
119119

120-
121120
arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1).
122121
mapValues(a => a.map(p => p._2)).foreach {
123122
x => {
@@ -138,7 +137,6 @@ object RedisContext extends Serializable {
138137
*/
139138
def setHash(key: String, arr: Iterator[(String, String)], redisConfig: RedisConfig) {
140139

141-
142140
val conn = redisConfig.connectionForKey(key)
143141
val pipeline = conn.pipelined
144142
arr.foreach(x => pipeline.hset(key, x._1, x._2))
@@ -167,7 +165,6 @@ object RedisContext extends Serializable {
167165
*/
168166
def setSet(key: String, arr: Iterator[String], redisConfig: RedisConfig) {
169167

170-
171168
val jedis = redisConfig.connectionForKey(key)
172169
val pipeline = jedis.pipelined
173170
arr.foreach(pipeline.sadd(key, _))
@@ -207,8 +204,6 @@ object RedisContext extends Serializable {
207204
pipeline.sync
208205
jedis.close
209206
}
210-
211-
212207
}
213208

214209
trait RedisFunctions {

0 commit comments

Comments
 (0)