diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index d1e7a22c..fa8df8e9 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -241,6 +241,14 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { } } + /** + * @param kttls Pair RDD of K/TTL + */ + def toRedisEXPIRE(kttls: RDD[(String, Int)]) + (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) { + kttls.foreachPartition(partition => setExpire(partition, redisConfig)) + } + /** * @param kvs Pair RDD of K/V * @param ttl time to live @@ -360,6 +368,22 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { object RedisContext extends Serializable { + /** + * @param arr k/ttl which should be set to expire in the target host + */ + def setExpire(arr: Iterator[(String, Int)], redisConfig: RedisConfig) { + arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1). + mapValues(a => a.map(p => p._2)).foreach { + x => { + val conn = x._1.endpoint.connect() + val pipeline = conn.pipelined + x._2.foreach(x => pipeline.expire(x._1, x._2)) + pipeline.sync + conn.close + } + } + } + /** * @param arr k/vs which should be saved in the target host * save all the k/vs to the target host