Skip to content

Commit cdec1bb

Browse files
committed
support expiration
1 parent 08e1a99 commit cdec1bb

File tree

2 files changed

+91
-29
lines changed

2 files changed

+91
-29
lines changed

Diff for: src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

+44-29
Original file line numberDiff line numberDiff line change
@@ -41,55 +41,56 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
4141

4242
/**
4343
* @param kvs Pair RDD of K/V
44+
* @param ttl time to live
4445
*/
45-
def toRedisKV(kvs: RDD[(String, String)])
46+
def toRedisKV(kvs: RDD[(String, String)], ttl: Int = 0)
4647
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
4748

48-
kvs.foreachPartition(partition => setKVs(partition, redisConfig))
49+
kvs.foreachPartition(partition => setKVs(partition, ttl, redisConfig))
4950
}
5051

5152
/**
5253
* @param kvs Pair RDD of K/V
5354
* @param hashName target hash's name which hold all the kvs
55+
* @param ttl time to live
5456
*/
55-
def toRedisHASH(kvs: RDD[(String, String)],
56-
hashName: String)
57+
def toRedisHASH(kvs: RDD[(String, String)], hashName: String, ttl: Int = 0)
5758
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
5859

59-
kvs.foreachPartition(partition => setHash(hashName, partition, redisConfig))
60+
kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig))
6061
}
6162

6263
/**
6364
* @param kvs Pair RDD of K/V
6465
* @param zsetName target zset's name which hold all the kvs
66+
* @param ttl time to live
6567
*/
66-
def toRedisZSET(kvs: RDD[(String, String)],
67-
zsetName: String)
68+
def toRedisZSET(kvs: RDD[(String, String)], zsetName: String, ttl: Int = 0)
6869
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
6970

70-
kvs.foreachPartition(partition => setZset(zsetName, partition, redisConfig))
71+
kvs.foreachPartition(partition => setZset(zsetName, partition, ttl, redisConfig))
7172
}
7273

7374
/**
7475
* @param vs RDD of values
7576
* @param setName target set's name which hold all the vs
77+
* @param ttl time to live
7678
*/
77-
def toRedisSET(vs: RDD[String],
78-
setName: String)
79+
def toRedisSET(vs: RDD[String], setName: String, ttl: Int = 0)
7980
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
8081

81-
vs.foreachPartition(partition => setSet(setName, partition, redisConfig))
82+
vs.foreachPartition(partition => setSet(setName, partition, ttl, redisConfig))
8283
}
8384

8485
/**
8586
* @param vs RDD of values
8687
* @param listName target list's name which hold all the vs
88+
* @param ttl time to live
8789
*/
88-
def toRedisLIST(vs: RDD[String],
89-
listName: String)
90+
def toRedisLIST(vs: RDD[String], listName: String, ttl: Int = 0)
9091
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
9192

92-
vs.foreachPartition(partition => setList(listName, partition, redisConfig))
93+
vs.foreachPartition(partition => setList(listName, partition, ttl, redisConfig))
9394
}
9495

9596
/**
@@ -114,15 +115,21 @@ object RedisContext extends Serializable {
114115
/**
115116
* @param arr k/vs which should be saved in the target host
116117
* save all the k/vs to the target host
118+
* @param ttl time to live
117119
*/
118-
def setKVs(arr: Iterator[(String, String)], redisConfig: RedisConfig) {
120+
def setKVs(arr: Iterator[(String, String)], ttl: Int, redisConfig: RedisConfig) {
119121

120122
arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1).
121123
mapValues(a => a.map(p => p._2)).foreach {
122124
x => {
123125
val conn = x._1.endpoint.connect()
124126
val pipeline = x._1.endpoint.connect.pipelined
125-
x._2.foreach(x => pipeline.set(x._1, x._2))
127+
if (ttl <= 0) {
128+
x._2.foreach(x => pipeline.set(x._1, x._2))
129+
}
130+
else {
131+
x._2.foreach(x => pipeline.setex(x._1, ttl, x._2))
132+
}
126133
pipeline.sync
127134
conn.close
128135
}
@@ -131,43 +138,49 @@ object RedisContext extends Serializable {
131138

132139

133140
/**
134-
* @param key
141+
* @param hashName
135142
* @param arr k/vs which should be saved in the target host
136143
* save all the k/vs to hashName(list type) to the target host
144+
* @param ttl time to live
137145
*/
138-
def setHash(key: String, arr: Iterator[(String, String)], redisConfig: RedisConfig) {
146+
def setHash(hashName: String, arr: Iterator[(String, String)], ttl: Int, redisConfig: RedisConfig) {
139147

140-
val conn = redisConfig.connectionForKey(key)
148+
val conn = redisConfig.connectionForKey(hashName)
141149
val pipeline = conn.pipelined
142-
arr.foreach(x => pipeline.hset(key, x._1, x._2))
150+
arr.foreach(x => pipeline.hset(hashName, x._1, x._2))
151+
if (ttl > 0) pipeline.expire(hashName, ttl)
143152
pipeline.sync
144153
conn.close
145154
}
146155

147156
/**
148-
* @param key
157+
* @param zsetName
149158
* @param arr k/vs which should be saved in the target host
150159
* save all the k/vs to zsetName(zset type) to the target host
160+
* @param ttl time to live
151161
*/
152-
def setZset(key: String, arr: Iterator[(String, String)], redisConfig: RedisConfig) {
162+
def setZset(zsetName: String, arr: Iterator[(String, String)], ttl: Int, redisConfig: RedisConfig) {
153163

154-
val jedis = redisConfig.connectionForKey(key)
164+
val jedis = redisConfig.connectionForKey(zsetName)
155165
val pipeline = jedis.pipelined
156-
arr.foreach(x => pipeline.zadd(key, x._2.toDouble, x._1))
166+
arr.foreach(x => pipeline.zadd(zsetName, x._2.toDouble, x._1))
167+
if (ttl > 0) pipeline.expire(zsetName, ttl)
157168
pipeline.sync
158169
jedis.close
159170
}
160171

161172
/**
162-
* @param key
173+
* @param setName
163174
* @param arr values which should be saved in the target host
164175
* save all the values to setName(set type) to the target host
176+
* @param ttl time to live
165177
*/
166-
def setSet(key: String, arr: Iterator[String], redisConfig: RedisConfig) {
178+
def setSet(setName: String, arr: Iterator[String], ttl: Int, redisConfig: RedisConfig) {
167179

168-
val jedis = redisConfig.connectionForKey(key)
180+
val jedis = redisConfig.connectionForKey(setName)
169181
val pipeline = jedis.pipelined
170-
arr.foreach(pipeline.sadd(key, _))
182+
arr.foreach(pipeline.sadd(setName, _))
183+
if (ttl > 0) pipeline.expire(setName, ttl)
171184
pipeline.sync
172185
jedis.close
173186
}
@@ -176,12 +189,14 @@ object RedisContext extends Serializable {
176189
* @param listName
177190
* @param arr values which should be saved in the target host
178191
* save all the values to listName(list type) to the target host
192+
* @param ttl time to live
179193
*/
180-
def setList(listName: String, arr: Iterator[String], redisConfig: RedisConfig) {
194+
def setList(listName: String, arr: Iterator[String], ttl: Int, redisConfig: RedisConfig) {
181195

182196
val jedis = redisConfig.connectionForKey(listName)
183197
val pipeline = jedis.pipelined
184198
arr.foreach(pipeline.rpush(listName, _))
199+
if (ttl > 0) pipeline.expire(listName, ttl)
185200
pipeline.sync
186201
jedis.close
187202
}

Diff for: src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDSuite.scala

+47
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,53 @@ class RedisRDDSuite extends FunSuite with ENV with BeforeAndAfterAll with Should
174174
setContents should be (ws)
175175
}
176176

177+
test("Expire - default(standalone)") {
178+
val expireTime = 1
179+
val prefix = s"#expire in ${expireTime}#:"
180+
val wcnts = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
181+
reduceByKey(_ + _).map(x => (prefix + x._1, x._2.toString))
182+
val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty))
183+
sc.toRedisKV(wcnts, expireTime)
184+
sc.toRedisZSET(wcnts, prefix + "all:words:cnt:sortedset", expireTime)
185+
sc.toRedisHASH(wcnts, prefix + "all:words:cnt:hash", expireTime)
186+
sc.toRedisLIST(wds, prefix + "all:words:list", expireTime)
187+
sc.toRedisSET(wds, prefix + "all:words:set", expireTime)
188+
Thread.sleep(expireTime * 1000 + 1)
189+
sc.fromRedisKeyPattern(prefix + "*").count should be (0)
190+
}
191+
192+
test("Expire - standalone") {
193+
val expireTime = 1
194+
val prefix = s"#expire in ${expireTime}#:"
195+
implicit val c: RedisConfig = redisConfigStandalone
196+
val wcnts = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
197+
reduceByKey(_ + _).map(x => (prefix + x._1, x._2.toString))
198+
val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty))
199+
sc.toRedisKV(wcnts, expireTime)
200+
sc.toRedisZSET(wcnts, prefix + "all:words:cnt:sortedset", expireTime)
201+
sc.toRedisHASH(wcnts, prefix + "all:words:cnt:hash", expireTime)
202+
sc.toRedisLIST(wds, prefix + "all:words:list", expireTime)
203+
sc.toRedisSET(wds, prefix + "all:words:set", expireTime)
204+
Thread.sleep(expireTime * 1000 + 1)
205+
sc.fromRedisKeyPattern(prefix + "*").count should be (0)
206+
}
207+
208+
test("Expire - cluster") {
209+
val expireTime = 1
210+
val prefix = s"#expire in ${expireTime}#:"
211+
implicit val c: RedisConfig = redisConfigCluster
212+
val wcnts = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
213+
reduceByKey(_ + _).map(x => (prefix + x._1, x._2.toString))
214+
val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty))
215+
sc.toRedisKV(wcnts, expireTime)
216+
sc.toRedisZSET(wcnts, prefix + "all:words:cnt:sortedset", expireTime)
217+
sc.toRedisHASH(wcnts, prefix + "all:words:cnt:hash", expireTime)
218+
sc.toRedisLIST(wds, prefix + "all:words:list", expireTime)
219+
sc.toRedisSET(wds, prefix + "all:words:set", expireTime)
220+
Thread.sleep(expireTime * 1000 + 1)
221+
sc.fromRedisKeyPattern(prefix + "*").count should be (0)
222+
}
223+
177224
override def afterAll(): Unit = {
178225
sc.stop
179226
System.clearProperty("spark.driver.port")

0 commit comments

Comments
 (0)