-
Notifications
You must be signed in to change notification settings - Fork 370
/
Copy pathRedisRddExtraSuite.scala
134 lines (112 loc) · 3.71 KB
/
RedisRddExtraSuite.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.redislabs.provider.redis.rdd
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
import org.scalatest.Matchers
import com.redislabs.provider.redis._
import com.redislabs.provider.redis.util.TestUtils
import redis.clients.jedis.exceptions.JedisConnectionException
import scala.collection.JavaConverters._
/**
* More RDD tests
*/
trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers {
implicit val redisConfig: RedisConfig
test("toRedisByteLISTs") {
val list1 = Seq("a1", "b1", "c1")
val list2 = Seq("a2", "b2", "c2")
val keyValues = Seq(
("binary-list1", list1),
("binary-list2", list2)
)
val keyValueBytes = keyValues.map { case (k, list) => (k.getBytes, list.map(_.getBytes())) }
val rdd = sc.parallelize(keyValueBytes)
sc.toRedisByteLISTs(rdd)
verifyList("binary-list1", list1)
verifyList("binary-list2", list2)
}
test("toRedisLISTs") {
val list1 = Seq("a1", "b1", "c1")
val list2 = Seq("a2", "b2", "c2")
val keyValues = Seq(
("list1", list1),
("list2", list2)
)
val rdd = sc.parallelize(keyValues)
sc.toRedisLISTs(rdd)
verifyList("list1", list1)
verifyList("list2", list2)
}
test("toRedisHASHes") {
val map1 = Map("k1" -> "v1", "k2" -> "v2")
val map2 = Map("k3" -> "v3", "k4" -> "v4")
val hashes = Seq(
("hash1", map1),
("hash2", map2)
)
val rdd = sc.parallelize(hashes)
sc.toRedisHASHes(rdd)
verifyHash("hash1", map1)
verifyHash("hash2", map2)
}
test("toRedisByteHASHes") {
val map1 = Map("k1" -> "v1", "k2" -> "v2")
val map2 = Map("k3" -> "v3", "k4" -> "v4")
val hashes = Seq(
("hash1", map1),
("hash2", map2)
)
val hashesBytes = hashes.map { case (k, hash) => (k.getBytes, hash.map { case (mapKey, mapVal) => (mapKey.getBytes, mapVal.getBytes) }) }
val rdd = sc.parallelize(hashesBytes)
sc.toRedisByteHASHes(rdd)
verifyHash("hash1", map1)
verifyHash("hash2", map2)
}
test("toRedisZETs") {
val map1 = Map("k1" -> "3.14", "k2" -> "2.71")
val map2 = Map("k3" -> "10", "k4" -> "12", "k5" -> "8", "k6" -> "2")
val zsets = Seq(
("zset1", map1),
("zset2", map2)
)
val rdd = sc.parallelize(zsets)
sc.toRedisZSETs(rdd)
verifyZSET("zset1", map1)
verifyZSET("zset2", map2)
}
test("connection fails with incorrect user/pass") {
assertThrows[JedisConnectionException] {
new RedisConfig(RedisEndpoint(
host = redisHost,
port = redisPort,
user = user,
auth = "wrong_password"))
}
}
test("connection with correct user/pass") {
val userConfig = new RedisConfig(RedisEndpoint(
host = redisHost,
port = redisPort,
user = user,
auth = userPassword))
val someKey = TestUtils.generateRandomKey()
val jedis = userConfig.connectionForKey(someKey)
jedis.set(someKey, "123")
jedis.get(someKey) should be("123")
// test RDD operation
sc.fromRedisKeyPattern(someKey)(redisConfig = userConfig)
.collect()(0) should be(someKey)
}
def verifyList(list: String, vals: Seq[String]): Unit = {
withConnection(redisConfig.getHost(list).endpoint.connect()) { conn =>
conn.lrange(list, 0, vals.size).asScala should be(vals.toList)
}
}
def verifyHash(hash: String, vals: Map[String, String]): Unit = {
withConnection(redisConfig.getHost(hash).endpoint.connect()) { conn =>
conn.hgetAll(hash).asScala should be(vals)
}
}
def verifyZSET(zset: String, vals: Map[String, String]): Unit = {
val zsetWithScore = sc.fromRedisZSetWithScore(zset).sortByKey().collect
zsetWithScore should be(vals.mapValues((v) => v.toDouble).toArray.sortBy(_._1))
}
}