diff --git a/.gitignore b/.gitignore index d1366cd5..dcb7bdbf 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,9 @@ target/ *.pyc sbt/*.jar +# test artifacts +checkpoint-test + # sbt specific .cache/ .history/ diff --git a/README.md b/README.md index 7c1cda22..85305b00 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ The master branch contains the recent development for the next release. | Spark-Redis | Spark | Redis | Supported Scala Versions | |---------------------------------------------------------------------------|-------| ---------------- | ------------------------ | | [master](https://github.com/RedisLabs/spark-redis/) | 3.2.x | >=2.9.0 | 2.12 | + | [4.0](https://github.com/RedisLabs/spark-redis/tree/branch-4.0) | 4.0.x | >=2.9.0 | 2.13 | | [3.0](https://github.com/RedisLabs/spark-redis/tree/branch-3.0) | 3.0.x | >=2.9.0 | 2.12 | | [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 | | [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 | diff --git a/doc/getting-started.md b/doc/getting-started.md index acd14e93..78e792e1 100644 --- a/doc/getting-started.md +++ b/doc/getting-started.md @@ -6,28 +6,17 @@ com.redislabs - spark-redis_2.11 - 2.4.2 + spark-redis_2.13 + 4.0.0 ``` -Or - -```xml - - - com.redislabs - spark-redis_2.12 - 2.4.2 - - -``` ### SBT ```scala -libraryDependencies += "com.redislabs" %% "spark-redis" % "2.4.2" +libraryDependencies += "com.redislabs" %% "spark-redis" % "4.0.0" ``` ### Build form source @@ -115,3 +104,9 @@ val ssc = new StreamingContext(sc, Seconds(1)) val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2) ``` + +or + +```scala +// add example with StructuredStreaming +``` diff --git a/pom.xml b/pom.xml index 80663f35..ef4e7c7d 100644 --- a/pom.xml +++ b/pom.xml @@ -2,8 +2,8 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.redislabs - spark-redis_2.12 - 3.1.0-SNAPSHOT + spark-redis_2.13 + 4.0.0-SNAPSHOT Spark-Redis A Spark library for Redis http://github.com/RedisLabs/spark-redis @@ -46,14 +46,39 @@ UTF-8 UTF-8 - 1.8 - 2.12 - ${scala.major.version}.0 - 3.9.0 - 3.2.1 + 17 + 2.13 + ${scala.major.version}.15 + 5.2.0 + 4.0.0-preview2 1.0 + + -XX:+IgnoreUnrecognizedVMOptions + -Djavax.net.ssl.trustStorePassword=password + -Djavax.net.ssl.trustStore=./src/test/resources/tls/clientkeystore + -Djavax.net.ssl.trustStoreType=jceks + -Djdk.reflect.useDirectMethodHandle=false + -Dio.netty.tryReflectionSetAccessible=true + --add-modules=jdk.incubator.vector + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + + + ossrh @@ -89,7 +114,7 @@ net.alchim31.maven scala-maven-plugin - 3.4.6 + 4.9.1 ${scala.complete.version} @@ -125,7 +150,7 @@ org.scalastyle scalastyle-maven-plugin - 0.7.0 + 1.0.0 false false @@ -157,7 +182,7 @@ org.apache.maven.plugins maven-assembly-plugin - 2.5.4 + 3.7.1 jar-with-dependencies @@ -245,7 +270,7 @@ ${project.build.directory}/surefire-reports . WDF TestSuite.txt - -XX:MaxPermSize=256m -Xmx2g -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.trustStore=./src/test/resources/tls/clientkeystore -Djavax.net.ssl.trustStoreType=jceks + -ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:+IgnoreUnrecognizedVMOptions -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.trustStore=./src/test/resources/tls/clientkeystore -Djavax.net.ssl.trustStoreType=jceks -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED com.redislabs.provider.redis.util.BenchmarkTest @@ -373,7 +398,7 @@ org.apache.maven.plugins maven-gpg-plugin - 3.0.1 + 3.2.7 --pinentry-mode diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index a4a2d61f..d8422d9a 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -5,7 +5,7 @@ import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} import java.time.Duration import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ object ConnectionPool { @@ -13,15 +13,16 @@ object ConnectionPool { new ConcurrentHashMap[RedisEndpoint, JedisPool]() def connect(re: RedisEndpoint): Jedis = { - val pool = pools.getOrElseUpdate(re, + + val pool = pools.asScala.getOrElseUpdate(re, { - val poolConfig: JedisPoolConfig = new JedisPoolConfig(); + val poolConfig: JedisPoolConfig = new JedisPoolConfig() poolConfig.setMaxTotal(250) poolConfig.setMaxIdle(32) poolConfig.setTestOnBorrow(false) poolConfig.setTestOnReturn(false) poolConfig.setTestWhileIdle(false) - poolConfig.setSoftMinEvictableIdleTime(Duration.ofMinutes(1)) + poolConfig.setSoftMinEvictableIdleDuration(Duration.ofMinutes(1)) poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30)) poolConfig.setNumTestsPerEvictionRun(-1) @@ -36,10 +37,9 @@ object ConnectionPool { } catch { case e: JedisConnectionException if e.getCause.toString. - contains("ERR max number of clients reached") => { + contains("ERR max number of clients reached") => if (sleepTime < 500) sleepTime *= 2 Thread.sleep(sleepTime) - } case e: Exception => throw e } } diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index 34f41944..34c8144c 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -1,12 +1,13 @@ package com.redislabs.provider.redis import java.net.URI - import org.apache.spark.SparkConf +import redis.clients.jedis.resps.{ClusterShardInfo, ClusterShardNodeInfo} import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder} import redis.clients.jedis.{Jedis, Protocol} -import scala.collection.JavaConversions._ +import java.nio.charset.Charset +import scala.jdk.CollectionConverters._ /** @@ -71,7 +72,8 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, * @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL */ def this(uri: URI) { - this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri), + this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), + JedisURIHelper.getDBIndex(uri), Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme) } @@ -130,7 +132,8 @@ object ReadWriteConfig { val RddWriteIteratorGroupingSizeKey = "spark.redis.rdd.write.iterator.grouping.size" val RddWriteIteratorGroupingSizeDefault = 1000 - val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault, RddWriteIteratorGroupingSizeDefault) + val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault, + RddWriteIteratorGroupingSizeDefault) def fromSparkConf(conf: SparkConf): ReadWriteConfig = { ReadWriteConfig( @@ -325,11 +328,14 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { */ private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = { val conn = initialHost.connect() - val res = conn.clusterSlots().flatMap { - slotInfoObj => { - val slotInfo = slotInfoObj.asInstanceOf[java.util.List[java.lang.Object]] - val sPos = slotInfo.get(0).toString.toInt - val ePos = slotInfo.get(1).toString.toInt + + val res = conn.clusterShards().asScala.flatMap { + shardInfoObj: ClusterShardInfo => { + val slotInfo = shardInfoObj.getSlots + + // todo: Can we have more than 1 node per ClusterShard? + val nodeInfo = shardInfoObj.getNodes.get(0) + /* * We will get all the nodes with the slots range [sPos, ePos], * and create RedisNode for each nodes, the total field of all @@ -339,10 +345,11 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { * And the idx of a master is always 0, we rely on this fact to * filter master. */ - (0 until (slotInfo.size - 2)).map(i => { - val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]] - val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) - val port = node.get(1).toString.toInt + (0 until (slotInfo.size)).map(i => { + val host = SafeEncoder.encode(nodeInfo.getIp.getBytes(Charset.forName("UTF8"))) + val port = nodeInfo.getPort.toInt + val slotStart = slotInfo.get(i).get(0).toInt + val slotEnd = slotInfo.get(i).get(1).toInt val endpoint = RedisEndpoint( host = host, port = port, @@ -351,7 +358,9 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { dbNum = initialHost.dbNum, timeout = initialHost.timeout, ssl = initialHost.ssl) - RedisNode(endpoint, sPos, ePos, i, slotInfo.size - 2) + val role = nodeInfo.getRole + val idx = if (role == "master") 0 else i + RedisNode(endpoint, slotStart, slotEnd, idx, slotInfo.size) }) } }.toArray diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 40417451..b2e8cfc0 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -8,10 +8,11 @@ import com.redislabs.provider.redis.util.PipelineUtils.mapWithPipeline import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisNode} import org.apache.spark._ import org.apache.spark.rdd.RDD -import redis.clients.jedis.{Jedis, ScanParams} +import redis.clients.jedis.Jedis +import redis.clients.jedis.params.ScanParams import redis.clients.jedis.util.JedisClusterCRC16 -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.reflect.{ClassTag, classTag} import scala.util.{Failure, Success, Try} @@ -62,7 +63,7 @@ class RedisKVRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val res = nodeKeys.flatMap{k => - ignoreJedisWrongTypeException(Try(conn.hgetAll(k).toMap)).get + ignoreJedisWrongTypeException(Try(conn.hgetAll(k).asScala)).get }.flatten.iterator conn.close() res @@ -92,7 +93,7 @@ class RedisListRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val res: Iterator[String] = nodeKeys.flatMap{k => - ignoreJedisWrongTypeException(Try(conn.smembers(k).toSet)).get + ignoreJedisWrongTypeException(Try(conn.smembers(k).asScala)).get }.flatten .iterator conn.close() @@ -104,7 +105,7 @@ class RedisListRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val res = nodeKeys.flatMap{ k => - ignoreJedisWrongTypeException(Try(conn.lrange(k, 0, -1))).get + ignoreJedisWrongTypeException(Try(conn.lrange(k, 0, -1).asScala)).get }.flatten.iterator conn.close() res @@ -150,12 +151,12 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val res = { if (classTag[T] == classTag[(String, Double)]) { nodeKeys.flatMap{k => - ignoreJedisWrongTypeException(Try(conn.zrangeWithScores(k, startPos, endPos))).get + ignoreJedisWrongTypeException(Try(conn.zrangeWithScores(k, startPos, endPos).asScala)).get }.flatten .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { nodeKeys.flatMap{k => - ignoreJedisWrongTypeException(Try(conn.zrange(k, startPos, endPos))).get + ignoreJedisWrongTypeException(Try(conn.zrange(k, startPos, endPos).asScala)).get }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") @@ -175,13 +176,13 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val res = { if (classTag[T] == classTag[(String, Double)]) { nodeKeys.flatMap{k => - ignoreJedisWrongTypeException(Try(conn.zrangeByScoreWithScores(k, startScore, endScore))).get + ignoreJedisWrongTypeException(Try(conn.zrangeByScoreWithScores(k, startScore, endScore).asScala)).get }. flatten .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { nodeKeys.flatMap{ k => - ignoreJedisWrongTypeException(Try(conn.zrangeByScore(k, startScore, endScore))).get + ignoreJedisWrongTypeException(Try(conn.zrangeByScore(k, startScore, endScore).asScala)).get }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") @@ -227,14 +228,16 @@ class RedisKeysRDD(sc: SparkContext, val hosts = redisConfig.hosts.sortBy(_.startSlot) - if (hosts.size == partitionNum) { + if (hosts.length == partitionNum) { hosts.map(x => (x.endpoint.host, x.endpoint.port, x.startSlot, x.endSlot)) - } else if (hosts.size < partitionNum) { - val presExtCnt = partitionNum / hosts.size - val lastExtCnt = if (presExtCnt * hosts.size < partitionNum) (presExtCnt + partitionNum % hosts.size) else presExtCnt + } else if (hosts.length < partitionNum) { + val presExtCnt = partitionNum / hosts.length + val lastExtCnt = if (presExtCnt * hosts.length < partitionNum) { + presExtCnt + partitionNum % hosts.length + } else { presExtCnt } hosts.zipWithIndex.flatMap { case (host, idx) => { - split(host, if (idx == hosts.size - 1) lastExtCnt else presExtCnt) + split(host, if (idx == hosts.length - 1) lastExtCnt else presExtCnt) } } } else { @@ -449,17 +452,17 @@ trait Keys { val endpoints = nodes.map(_.endpoint).distinct if (isRedisRegex(keyPattern)) { - endpoints.iterator.map { endpoint => + endpoints.iterator.flatMap { endpoint => val keys = new util.HashSet[String]() val conn = endpoint.connect() val params = new ScanParams().`match`(keyPattern).count(readWriteConfig.scanCount) - keys.addAll(scanKeys(conn, params).filter { key => + keys.addAll(scanKeys(conn, params).asScala.filter { key => val slot = JedisClusterCRC16.getSlot(key) slot >= sPos && slot <= ePos - }) + }.asJava) conn.close() - keys.iterator() - }.flatten + keys.iterator().asScala + } } else { val slot = JedisClusterCRC16.getSlot(keyPattern) if (slot >= sPos && slot <= ePos) Iterator(keyPattern) else Iterator() diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index b8c17706..21e34300 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -5,7 +5,9 @@ import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import com.redislabs.provider.redis.util.PipelineUtils._ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import scala.collection.JavaConversions.mapAsJavaMap + +import scala.jdk.CollectionConverters._ +import scala.language.implicitConversions /** * RedisContext extends sparkContext's functionality with redis functions @@ -447,7 +449,7 @@ object RedisContext extends Serializable { withConnection(node.endpoint.connect()) { conn => foreachWithPipeline(conn, arr) { (pipeline, a) => val (key, hashFields) = a._2 - pipeline.hmset(key, hashFields) + pipeline.hmset(key, hashFields.asJava) if (ttl > 0) pipeline.expire(key, ttl.toLong) } } @@ -477,7 +479,7 @@ object RedisContext extends Serializable { withConnection(node.endpoint.connect()) { conn => foreachWithPipeline(conn, arr) { (pipeline, a) => val (key, hashFields) = a._2 - pipeline.hmset(key, hashFields) + pipeline.hmset(key, hashFields.asJava) if (ttl > 0) pipeline.expire(key, ttl.toLong) } } diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala b/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala deleted file mode 100644 index 4eb8e9c3..00000000 --- a/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala +++ /dev/null @@ -1,74 +0,0 @@ -package com.redislabs.provider.redis.streaming - -import com.redislabs.provider.redis.RedisConfig -import org.apache.curator.utils.ThreadUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.dstream.ReceiverInputDStream - -import redis.clients.jedis._ - -import scala.reflect.{ClassTag, classTag} -import scala.util.control.NonFatal - -/** - * Receives messages from Redis List - */ -class RedisInputDStream[T: ClassTag](_ssc: StreamingContext, - keys: Array[String], - storageLevel: StorageLevel, - redisConfig: RedisConfig, - streamType: Class[T]) - extends ReceiverInputDStream[T](_ssc) { - def getReceiver(): Receiver[T] = { - new RedisReceiver(keys, storageLevel, redisConfig, streamType) - } -} - - -private class RedisReceiver[T: ClassTag](keys: Array[String], - storageLevel: StorageLevel, - redisConfig: RedisConfig, - streamType: Class[T]) - extends Receiver[T](storageLevel) { - - def onStart() { - val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streaming") - try { - /* start a executor for each interested List */ - keys.foreach{ key => - executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key)) - } - } finally { - executorPool.shutdown() - } - } - - def onStop() { - } - - private class MessageHandler(conn: Jedis, key: String) extends Runnable { - def run() { - try { - while(!isStopped) { - val response = conn.blpop(2, key) - if (response == null || response.isEmpty) { - // no-op - } else if (classTag[T] == classTag[String]) { - store(response.get(1).asInstanceOf[T]) - } else if (classTag[T] == classTag[(String, String)]) { - store((response.get(0), response.get(1)).asInstanceOf[T]) - } else { - throw new scala.Exception("Unknown Redis Streaming type") - } - } - } catch { - case NonFatal(e) => - restart("Error receiving data", e) - } finally { - onStop() - } - } - } -} diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala b/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala deleted file mode 100644 index 9f8dbd9f..00000000 --- a/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala +++ /dev/null @@ -1,195 +0,0 @@ -package com.redislabs.provider.redis.streaming - -import java.util.AbstractMap.SimpleEntry -import com.redislabs.provider.redis.util.PipelineUtils.foreachWithPipeline -import com.redislabs.provider.redis.util.{Logging, StreamUtils} -import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig} -import org.apache.curator.utils.ThreadUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.receiver.Receiver -import org.sparkproject.guava.util.concurrent.RateLimiter -import redis.clients.jedis.{Jedis, StreamEntry, StreamEntryID} - -import scala.collection.JavaConversions._ - -/** - * Receives messages from Redis Stream - */ -class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig], - redisConfig: RedisConfig, - readWriteConfig: ReadWriteConfig, - storageLevel: StorageLevel) - extends Receiver[StreamItem](storageLevel) with Logging { - - override def onStart(): Unit = { - logInfo("Starting Redis Stream Receiver") - val executorPool = ThreadUtils.newFixedThreadPool(consumersConfig.size, "RedisStreamMessageHandler") - try { - // start consumers in separate threads - for (c <- consumersConfig) { - executorPool.submit(new MessageHandler(c, redisConfig, readWriteConfig)) - } - } finally { - // terminate threads after the work is done - executorPool.shutdown() - } - } - - override def onStop(): Unit = { - } - - private class MessageHandler(conf: ConsumerConfig, - redisConfig: RedisConfig, - implicit val readWriteConfig: ReadWriteConfig) extends Runnable { - - val jedis: Jedis = redisConfig.connectionForKey(conf.streamKey) - val rateLimiterOpt: Option[RateLimiter] = conf.rateLimitPerConsumer.map(r => RateLimiter.create(r)) - - override def run(): Unit = { - logInfo(s"Starting MessageHandler $conf") - try { - createConsumerGroupIfNotExist() - receiveUnacknowledged() - receiveNewMessages() - } catch { - case e: Exception => - restart("Error handling message. Restarting.", e) - } - } - - def createConsumerGroupIfNotExist(): Unit = { - val entryId = conf.offset match { - case Earliest => new StreamEntryID(0, 0) - case Latest => StreamEntryID.LAST_ENTRY - case IdOffset(v1, v2) => new StreamEntryID(v1, v2) - } - StreamUtils.createConsumerGroupIfNotExist(jedis, conf.streamKey, conf.groupName, entryId) - } - - def receiveUnacknowledged(): Unit = { - logInfo(s"Starting receiving unacknowledged messages for key ${conf.streamKey}") - var continue = true - val unackId = new SimpleEntry(conf.streamKey, new StreamEntryID(0, 0)) - - while (!isStopped && continue) { - val response = jedis.xreadGroup( - conf.groupName, - conf.consumerName, - conf.batchSize, - conf.block, - false, - unackId) - - val unackMessagesMap = response.map(e => (e.getKey, e.getValue)).toMap - val entries = unackMessagesMap(conf.streamKey) - if (entries.isEmpty) { - continue = false - } - storeAndAck(conf.streamKey, entries) - } - } - - def receiveNewMessages(): Unit = { - logInfo(s"Starting receiving new messages for key ${conf.streamKey}") - val newMessId = new SimpleEntry(conf.streamKey, StreamEntryID.UNRECEIVED_ENTRY) - - while (!isStopped) { - val response = jedis.xreadGroup( - conf.groupName, - conf.consumerName, - conf.batchSize, - conf.block, - false, - newMessId) - - if (response != null) { - for (streamMessages <- response) { - val key = streamMessages.getKey - val entries = streamMessages.getValue - storeAndAck(key, entries) - } - } - } - } - - def storeAndAck(streamKey: String, entries: Seq[StreamEntry]): Unit = { - if (entries.nonEmpty) { - // limit the rate if it's enabled - rateLimiterOpt.foreach(_.acquire(entries.size)) - val streamItems = entriesToItems(streamKey, entries) - // call store(multiple-records) to reliably store in Spark memory - store(streamItems.iterator) - // ack redis - foreachWithPipeline(jedis, entries) { (pipeline, entry) => - pipeline.xack(streamKey, conf.groupName, entry.getID) - } - } - } - - def entriesToItems(key: String, entries: Seq[StreamEntry]): Seq[StreamItem] = { - entries.map { e => - val itemId = ItemId(e.getID.getTime, e.getID.getSequence) - StreamItem(key, itemId, e.getFields.toMap) - } - } - } - -} - -/** - * @param streamKey redis stream key - * @param groupName consumer group name - * @param consumerName consumer name - * @param offset stream offset - * @param rateLimitPerConsumer maximum retrieved messages per second per single consumer - * @param batchSize maximum number of pulled items in a read API call - * @param block time in milliseconds to wait for data in a blocking read API call - */ -case class ConsumerConfig(streamKey: String, - groupName: String, - consumerName: String, - offset: Offset = Latest, - rateLimitPerConsumer: Option[Int] = None, - batchSize: Int = 100, - block: Long = 500) - -/** - * Represents an offset in the stream - */ -sealed trait Offset - -/** - * Latest offset, known as a '$' special id - */ -case object Latest extends Offset - -/** - * Earliest offset, '0-0' id - */ -case object Earliest extends Offset - -/** - * Specific id in the form of 'v1-v2' - * - * @param v1 first token of the id - * @param v2 second token of the id - */ -case class IdOffset(v1: Long, v2: Long) extends Offset - -/** - * Item id in the form of 'v1-v2' - * - * @param v1 first token of the id - * @param v2 second token of the id - */ -case class ItemId(v1: Long, v2: Long) - -/** - * Represent an item in the stream - * - * @param streamKey stream key - * @param id item(entry) id - * @param fields key/value map of item fields - */ -case class StreamItem(streamKey: String, id: ItemId, fields: Map[String, String]) - diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/package.scala b/src/main/scala/com/redislabs/provider/redis/streaming/package.scala deleted file mode 100644 index d3fbbdf6..00000000 --- a/src/main/scala/com/redislabs/provider/redis/streaming/package.scala +++ /dev/null @@ -1,5 +0,0 @@ -package com.redislabs.provider.redis - -package object streaming extends RedisStreamingFunctions { - -} diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/redisStreamingFunctions.scala b/src/main/scala/com/redislabs/provider/redis/streaming/redisStreamingFunctions.scala deleted file mode 100644 index a2bd81e4..00000000 --- a/src/main/scala/com/redislabs/provider/redis/streaming/redisStreamingFunctions.scala +++ /dev/null @@ -1,56 +0,0 @@ -package com.redislabs.provider.redis.streaming - -import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream.InputDStream - -/** - * RedisStreamingContext extends StreamingContext's functionality with Redis - * - * @param ssc a spark StreamingContext - */ -class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable { - /** - * @param keys an Array[String] which consists all the Lists we want to listen to - * @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2 - * @return a stream of (listname, value) - */ - def createRedisStream(keys: Array[String], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) - (implicit - redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): - RedisInputDStream[(String, String)] = { - new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)]) - } - - /** - * @param keys an Array[String] which consists all the Lists we want to listen to - * @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2 - * @return a stream of (value) - */ - def createRedisStreamWithoutListname(keys: Array[String], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) - (implicit - redisConf: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): - RedisInputDStream[String] = { - new RedisInputDStream(ssc, keys, storageLevel, redisConf, classOf[String]) - } - - def createRedisXStream(consumersConfig: Seq[ConsumerConfig], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) - (implicit - redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): - InputDStream[StreamItem] = { - val readWriteConfig = ReadWriteConfig.fromSparkConf(ssc.sparkContext.getConf) - val receiver = new RedisStreamReceiver(consumersConfig, redisConfig, readWriteConfig, storageLevel) - ssc.receiverStream(receiver) - } -} - -trait RedisStreamingFunctions { - - implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc) - -} - diff --git a/src/main/scala/com/redislabs/provider/redis/util/CollectionUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/CollectionUtils.scala index 5b375e9e..27eed24c 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/CollectionUtils.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/CollectionUtils.scala @@ -1,16 +1,16 @@ package com.redislabs.provider.redis.util -import scala.collection.IterableLike -import scala.collection.generic.CanBuildFrom +import scala.collection.IterableOps +import scala.collection.BuildFrom /** * @author The Viet Nguyen */ object CollectionUtils { - implicit class RichCollection[A, Repr](val xs: IterableLike[A, Repr]) extends AnyVal { + implicit class RichCollection[A, Repr](val xs: IterableOps[A, Iterable, Repr]) extends AnyVal { - def distinctBy[B, That](f: A => B)(implicit cbf: CanBuildFrom[Repr, A, That]): That = { + def distinctBy[B, That](f: A => B)(implicit cbf: BuildFrom[Repr, A, That]): That = { val builder = cbf(xs.repr) val iterator = xs.iterator var set = Set[B]() diff --git a/src/main/scala/com/redislabs/provider/redis/util/ConnectionUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/ConnectionUtils.scala index 21abbc31..77b4da3f 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/ConnectionUtils.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/ConnectionUtils.scala @@ -8,7 +8,7 @@ import redis.clients.jedis.Jedis import redis.clients.jedis.commands.ProtocolCommand import redis.clients.jedis.util.SafeEncoder -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** * @author The Viet Nguyen @@ -35,13 +35,18 @@ object ConnectionUtils { def xinfo(command: String, args: String*): Map[String, Any] = { val client = jedis.getClient val combinedArgs = command +: args + client.sendCommand(XINFO, combinedArgs: _*) - val response = asList(client.getOne).asScala + // just an object + val response = asList(client.getOne) command match { case SubCommandStream => - asMap(response) + asMap(response.asScala.toSeq) case SubCommandGroups => - response.map(m => asList(m)).map(_.asScala).map(asMap) + response.asScala.toList + .map(m => asList(m)) + .map(_.asScala.toSeq) + .map(asMap) .map(m => String.valueOf(m("name")) -> m).toMap } } diff --git a/src/main/scala/com/redislabs/provider/redis/util/PipelineUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/PipelineUtils.scala index 900d0d88..5918cea2 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/PipelineUtils.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/PipelineUtils.scala @@ -5,7 +5,7 @@ import java.util.{List => JList} import com.redislabs.provider.redis.ReadWriteConfig import redis.clients.jedis.{Jedis, Pipeline} -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.collection.{TraversableOnce, mutable} object PipelineUtils { @@ -22,7 +22,7 @@ object PipelineUtils { * @param f function to applied for each item in the sequence * @return response from the server */ - def mapWithPipeline[A](conn: Jedis, items: TraversableOnce[A])(f: (Pipeline, A) => Unit) + def mapWithPipeline[A](conn: Jedis, items: IterableOnce[A])(f: (Pipeline, A) => Unit) (implicit readWriteConfig: ReadWriteConfig): Seq[AnyRef] = { val totalResp = mutable.ListBuffer[JList[AnyRef]]() @@ -45,7 +45,7 @@ object PipelineUtils { totalResp += resp } - totalResp.flatMap(_.asScala) + totalResp.flatMap(_.asScala).toSeq } /** diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 52105106..edfacf4b 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -2,12 +2,12 @@ package org.apache.spark.sql.redis import java.util.UUID import java.util.{List => JList} - import com.redislabs.provider.redis.rdd.Keys import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import com.redislabs.provider.redis.util.Logging import com.redislabs.provider.redis.util.PipelineUtils._ -import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, RedisEndpoint, RedisNode, toRedisContext} +import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, + RedisNode, toRedisContext} import org.apache.commons.lang3.SerializationUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -15,10 +15,9 @@ import org.apache.spark.sql.redis.RedisSourceRelation._ import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import redis.clients.jedis.{PipelineBase, Protocol} +import redis.clients.jedis.{AbstractPipeline, Protocol} -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ class RedisSourceRelation(override val sqlContext: SQLContext, parameters: Map[String, String], @@ -61,7 +60,7 @@ class RedisSourceRelation(override val sqlContext: SQLContext, private val keysPatternOpt: Option[String] = parameters.get(SqlOptionKeysPattern) private val numPartitions = parameters.get(SqlOptionNumPartitions).map(_.toInt) .getOrElse(SqlOptionNumPartitionsDefault) - private val persistenceModel = parameters.getOrDefault(SqlOptionModel, SqlOptionModelHash) + private val persistenceModel = parameters.asJava.getOrDefault(SqlOptionModel, SqlOptionModelHash) private val persistence = RedisPersistence(persistenceModel) private val tableNameOpt: Option[String] = parameters.get(SqlOptionTableName) private val ttl = parameters.get(SqlOptionTTL).map(_.toInt).getOrElse(0) @@ -112,7 +111,7 @@ class RedisSourceRelation(override val sqlContext: SQLContext, groupKeysByNode(redisConfig.hosts, partition).foreach { case (node, keys) => val conn = node.connect() foreachWithPipeline(conn, keys) { (pipeline, key) => - (pipeline: PipelineBase).del(key) // fix ambiguous reference to overloaded definition + (pipeline: AbstractPipeline).del(key) // fix ambiguous reference to overloaded definition } conn.close() } @@ -238,7 +237,8 @@ class RedisSourceRelation(override val sqlContext: SQLContext, logInfo(s"saving schema $key") val schemaNode = getMasterNode(redisConfig.hosts, key) val conn = schemaNode.connect() - val schemaBytes = SerializationUtils.serialize(schema) + val schemaBytes: Array[Byte] = SerializationUtils.serialize(schema) + conn.set(key.getBytes, schemaBytes) conn.close() schema @@ -290,7 +290,7 @@ class RedisSourceRelation(override val sqlContext: SQLContext, } else { keysAndValues.filter { case (_, null) => false // binary model - case (_, value: JList[_]) if value.forall(_ == null) => false // hash model + case (_, value: JList[_]) if value.asScala.forall(_ == null) => false // hash model case _ => true } } diff --git a/src/main/scala/org/apache/spark/sql/redis/stream/RedisSourceTypes.scala b/src/main/scala/org/apache/spark/sql/redis/stream/RedisSourceTypes.scala index af87a9a1..21b9dc67 100644 --- a/src/main/scala/org/apache/spark/sql/redis/stream/RedisSourceTypes.scala +++ b/src/main/scala/org/apache/spark/sql/redis/stream/RedisSourceTypes.scala @@ -1,8 +1,8 @@ package org.apache.spark.sql.redis.stream import java.util.{List => JList, Map => JMap} - -import redis.clients.jedis.{StreamEntryID, StreamEntry => JStreamEntry} +import redis.clients.jedis.resps.{StreamEntry => JStreamEntry} +import redis.clients.jedis.StreamEntryID /** * @author The Viet Nguyen diff --git a/src/main/scala/org/apache/spark/sql/redis/stream/RedisStreamReader.scala b/src/main/scala/org/apache/spark/sql/redis/stream/RedisStreamReader.scala index 86751628..cbc82f2b 100644 --- a/src/main/scala/org/apache/spark/sql/redis/stream/RedisStreamReader.scala +++ b/src/main/scala/org/apache/spark/sql/redis/stream/RedisStreamReader.scala @@ -2,14 +2,14 @@ package org.apache.spark.sql.redis.stream import java.util.AbstractMap.SimpleEntry import java.util.{Map => JMap} - import com.redislabs.provider.redis.RedisConfig import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import com.redislabs.provider.redis.util.Logging import org.apache.spark.sql.redis.stream.RedisSourceTypes.{StreamEntry, StreamEntryBatch, StreamEntryBatches} import redis.clients.jedis.StreamEntryID +import redis.clients.jedis.params.XReadGroupParams -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.math.Ordering.Implicits._ /** @@ -26,7 +26,7 @@ class RedisStreamReader(redisConfig: RedisConfig) extends Logging with Serializa ) val res = filterStreamEntries(offsetRange) { - val startEntryOffset = new SimpleEntry(config.streamKey, StreamEntryID.UNRECEIVED_ENTRY) + val startEntryOffset = new SimpleEntry(config.streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY) Iterator.continually { readStreamEntryBatches(offsetRange, startEntryOffset) } @@ -37,15 +37,21 @@ class RedisStreamReader(redisConfig: RedisConfig) extends Logging with Serializa private def readStreamEntryBatches(offsetRange: RedisSourceOffsetRange, startEntryOffset: JMap.Entry[String, StreamEntryID]): StreamEntryBatches = { val config = offsetRange.config + + // we don't need acknowledgement, if spark processing fails, it will request the same batch again + val xReadGroupParams = XReadGroupParams.xReadGroupParams() + .block(config.block) + .count(config.batchSize) + .noAck() + + + withConnection(redisConfig.connectionForKey(config.streamKey)) { conn => - // we don't need acknowledgement, if spark processing fails, it will request the same batch again - val noAck = true + val response = conn.xreadGroup(config.groupName, config.consumerName, - config.batchSize, - config.block, - noAck, - startEntryOffset) + xReadGroupParams, + Map[String, StreamEntryID]((startEntryOffset.getKey, startEntryOffset.getValue)).asJava) logDebug(s"Got entries: $response") response } diff --git a/src/test/scala/com/redislabs/provider/redis/SparkRedisSuite.scala b/src/test/scala/com/redislabs/provider/redis/SparkRedisSuite.scala index fa52c1f6..f80e323d 100644 --- a/src/test/scala/com/redislabs/provider/redis/SparkRedisSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/SparkRedisSuite.scala @@ -23,7 +23,9 @@ trait SparkRedisSuite extends FunSuite with Env with Keys with BeforeAndAfterAll object TestSqlImplicits extends SQLImplicits { - override protected def _sqlContext: SQLContext = spark.sqlContext + protected def _sqlContext: SQLContext = spark.sqlContext + + override protected def session: SparkSession = spark } } diff --git a/src/test/scala/com/redislabs/provider/redis/SparkStreamingRedisSuite.scala b/src/test/scala/com/redislabs/provider/redis/SparkStreamingRedisSuite.scala deleted file mode 100644 index 1218d577..00000000 --- a/src/test/scala/com/redislabs/provider/redis/SparkStreamingRedisSuite.scala +++ /dev/null @@ -1,28 +0,0 @@ -package com.redislabs.provider.redis - -import com.redislabs.provider.redis.env.Env -import com.redislabs.provider.redis.util.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.scalatest.{BeforeAndAfterEach, FunSuite} - -/** - * For spark streaming test we have to create spark and streaming context for each test - */ -trait SparkStreamingRedisSuite extends FunSuite with Env with BeforeAndAfterEach with Logging { - - override protected def beforeEach(): Unit = { - super.beforeEach() - spark = SparkSession.builder().config(conf).getOrCreate() - sc = spark.sparkContext - ssc = new StreamingContext(sc, Seconds(1)) - } - - override protected def afterEach(): Unit = { - ssc.stop() - spark.stop - System.clearProperty("spark.driver.port") - super.afterEach() - } - -} diff --git a/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala index 86982b5d..f7b5c073 100644 --- a/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala @@ -4,7 +4,7 @@ import com.redislabs.provider.redis.util.Person.{TableNamePrefix, data} import com.redislabs.provider.redis.util.TestUtils.{generateTableName, interceptSparkErr} import org.apache.spark.sql.redis.{RedisFormat, SqlOptionTableName} import org.scalatest.Matchers -import redis.clients.jedis.exceptions.JedisConnectionException +import redis.clients.jedis.exceptions.{JedisAccessControlException, JedisConnectionException} /** * Basic dataframe test with user/password authentication @@ -25,7 +25,7 @@ trait AclDataframeSuite extends RedisDataframeSuite with Matchers { } test("incorrect password in dataframe options") { - interceptSparkErr[JedisConnectionException] { + interceptSparkErr[JedisAccessControlException] { val tableName = generateTableName(TableNamePrefix) val df = spark.createDataFrame(data) df.write.format(RedisFormat) diff --git a/src/test/scala/com/redislabs/provider/redis/env/Env.scala b/src/test/scala/com/redislabs/provider/redis/env/Env.scala index 20fcd166..49e22939 100644 --- a/src/test/scala/com/redislabs/provider/redis/env/Env.scala +++ b/src/test/scala/com/redislabs/provider/redis/env/Env.scala @@ -2,7 +2,6 @@ package com.redislabs.provider.redis.env import com.redislabs.provider.redis.RedisConfig import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.StreamingContext import org.apache.spark.{SparkConf, SparkContext} trait Env { @@ -10,7 +9,6 @@ trait Env { val conf: SparkConf var spark: SparkSession = _ var sc: SparkContext = _ - var ssc: StreamingContext = _ val redisHost = "127.0.0.1" val redisPort = 6379 diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala index 17102052..9879c6af 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala @@ -4,9 +4,9 @@ import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import org.scalatest.Matchers import com.redislabs.provider.redis._ import com.redislabs.provider.redis.util.TestUtils -import redis.clients.jedis.exceptions.JedisConnectionException +import redis.clients.jedis.exceptions.JedisAccessControlException -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** * More RDD tests @@ -65,7 +65,12 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers { ("hash1", map1), ("hash2", map2) ) - val hashesBytes = hashes.map { case (k, hash) => (k.getBytes, hash.map { case (mapKey, mapVal) => (mapKey.getBytes, mapVal.getBytes) }) } + val hashesBytes = hashes.map { + case (k, hash) => (k.getBytes, hash.map { + case (mapKey, mapVal) => (mapKey.getBytes, mapVal.getBytes) + }) + } + val rdd = sc.parallelize(hashesBytes) sc.toRedisByteHASHes(rdd) @@ -74,7 +79,7 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers { } test("connection fails with incorrect user/pass") { - assertThrows[JedisConnectionException] { + assertThrows[JedisAccessControlException] { new RedisConfig(RedisEndpoint( host = redisHost, port = redisPort, diff --git a/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala b/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala deleted file mode 100644 index 1cbef3b5..00000000 --- a/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala +++ /dev/null @@ -1,170 +0,0 @@ -package com.redislabs.provider.redis.stream - -import com.redislabs.provider.redis.streaming.{ConsumerConfig, Earliest} -import com.redislabs.provider.redis.util.ConnectionUtils.withConnection -import com.redislabs.provider.redis.util.TestUtils -import com.redislabs.provider.redis.SparkStreamingRedisSuite -import com.redislabs.provider.redis.streaming._ -import org.apache.spark.storage.StorageLevel -import org.scalatest.Matchers -import org.scalatest.concurrent.Eventually._ -import org.scalatest.time.{Millis, Span} -import redis.clients.jedis.StreamEntryID - -import scala.collection.JavaConversions._ - -// scalastyle:off multiple.string.literals -trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers { - - // timeout for eventually function - implicit val patienceConfig = PatienceConfig(timeout = scaled(Span(5000, Millis))) - - test("createRedisXStream, 1 stream, 1 consumer") { - val streamKey = TestUtils.generateRandomKey() - - // the data can be written to the stream earlier than we start receiver, so set offset to Earliest - val stream = ssc.createRedisXStream(Seq(ConsumerConfig(streamKey, "g1", "c1", Earliest)), StorageLevel.MEMORY_ONLY) - - val _redisConfig = redisConfig // to make closure serializable - - // iterate over items and save to redis list - // repartition to 1 to avoid concurrent write issues - stream.repartition(1).foreachRDD { rdd => - rdd.foreachPartition { partition => - for (item <- partition) { - val listKey = s"${item.streamKey}:list" - withConnection(_redisConfig.connectionForKey(listKey)) { conn => - conn.rpush(listKey, s"${item.id.v1}-${item.id.v2} " + item.fields.mkString(" ")) - } - } - } - } - - // write to stream - withConnection(redisConfig.connectionForKey(streamKey)) { conn => - conn.xadd(streamKey, new StreamEntryID(1, 0), Map("a" -> "1", "z" -> "4")) - conn.xadd(streamKey, new StreamEntryID(1, 1), Map("b" -> "2")) - conn.xadd(streamKey, new StreamEntryID(2, 0), Map("c" -> "3")) - } - - ssc.start() - - // eventually there should be items in the list - val listKey = s"$streamKey:list" - withConnection(redisConfig.connectionForKey(listKey)) { conn => - eventually { - conn.llen(listKey) shouldBe 3 - conn.lpop(listKey) should be("1-0 a -> 1 z -> 4") - conn.lpop(listKey) should be("1-1 b -> 2") - conn.lpop(listKey) should be("2-0 c -> 3") - } - } - } - - test("createRedisXStream, 1 stream, 2 consumers") { - val streamKey = TestUtils.generateRandomKey() - - // the data can be written to the stream earlier than we start receiver, so set offset to Earliest - val stream = ssc.createRedisXStream(Seq( - ConsumerConfig(streamKey, "g1", "c1", Earliest, batchSize = 1), - ConsumerConfig(streamKey, "g1", "c2", Earliest, batchSize = 1) - ), StorageLevel.MEMORY_ONLY) - - val _redisConfig = redisConfig // to make closure serializable - - // iterate over items and save to redis list - // repartition to 1 to avoid concurrent write issues - stream.repartition(1).foreachRDD { rdd => - rdd.foreachPartition { partition => - for (item <- partition) { - val listKey = s"${item.streamKey}:list" - withConnection(_redisConfig.connectionForKey(listKey)) { conn => - conn.rpush(listKey, s"${item.id.v1}-${item.id.v2} " + item.fields.mkString(" ")) - } - } - } - } - - // write to stream - withConnection(redisConfig.connectionForKey(streamKey)) { conn => - conn.xadd(streamKey, new StreamEntryID(1, 0), Map("a" -> "1", "z" -> "4")) - conn.xadd(streamKey, new StreamEntryID(1, 1), Map("b" -> "2")) - conn.xadd(streamKey, new StreamEntryID(2, 0), Map("c" -> "3")) - } - - ssc.start() - - // eventually there should be items in the list, the ordering is not deterministic - val listKey = s"$streamKey:list" - withConnection(redisConfig.connectionForKey(listKey)) { conn => - eventually { - conn.llen(listKey) shouldBe 3 - (1 to 3).map(_ => conn.lpop(listKey)).toSet shouldBe Set( - "1-0 a -> 1 z -> 4", - "1-1 b -> 2", - "2-0 c -> 3" - ) - } - } - } - - test("createRedisXStream, 2 streams, 2 consumers") { - val stream1Key = TestUtils.generateRandomKey() - val stream2Key = TestUtils.generateRandomKey() - - logInfo("stream1Key " + stream1Key) - logInfo("stream2Key " + stream2Key) - - // the data can be written to the stream earlier than we start receiver, so set offset to Earliest - val stream = ssc.createRedisXStream(Seq( - ConsumerConfig(stream1Key, "g1", "c1", Earliest, batchSize = 1), - ConsumerConfig(stream2Key, "g1", "c2", Earliest, batchSize = 1) - ), StorageLevel.MEMORY_ONLY) - - val _redisConfig = redisConfig // to make closure serializable - - // iterate over items and save to redis list - // repartition to 1 to avoid concurrent write issues - stream.repartition(1).foreachRDD { rdd => - rdd.foreachPartition { partition => - for (item <- partition) { - val listKey = s"${item.streamKey}:list" - withConnection(_redisConfig.connectionForKey(listKey)) { conn => - conn.rpush(listKey, s"${item.id.v1}-${item.id.v2} " + item.fields.mkString(" ")) - } - } - } - } - - // write to stream - withConnection(redisConfig.connectionForKey(stream1Key)) { conn => - conn.xadd(stream1Key, new StreamEntryID(1, 0), Map("a" -> "1", "z" -> "4")) - } - withConnection(redisConfig.connectionForKey(stream2Key)) { conn => - conn.xadd(stream2Key, new StreamEntryID(1, 1), Map("b" -> "2")) - conn.xadd(stream2Key, new StreamEntryID(2, 0), Map("c" -> "3")) - } - - ssc.start() - - // eventually there should be items in the list - val list1Key = s"$stream1Key:list" - withConnection(redisConfig.connectionForKey(list1Key)) { conn => - eventually { - conn.llen(list1Key) shouldBe 1 - conn.lpop(list1Key) should be("1-0 a -> 1 z -> 4") - } - } - - val list2Key = s"$stream2Key:list" - withConnection(redisConfig.connectionForKey(list2Key)) { conn => - eventually { - conn.llen(list2Key) shouldBe 2 - conn.lpop(list2Key) should be("1-1 b -> 2") - conn.lpop(list2Key) should be("2-0 c -> 3") - } - } - - } - -} diff --git a/src/test/scala/com/redislabs/provider/redis/stream/cluster/RedisXStreamClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/stream/cluster/RedisXStreamClusterSuite.scala deleted file mode 100644 index 9cce78d0..00000000 --- a/src/test/scala/com/redislabs/provider/redis/stream/cluster/RedisXStreamClusterSuite.scala +++ /dev/null @@ -1,6 +0,0 @@ -package com.redislabs.provider.redis.stream.cluster - -import com.redislabs.provider.redis.env.RedisClusterEnv -import com.redislabs.provider.redis.stream.RedisXStreamSuite - -class RedisXStreamClusterSuite extends RedisXStreamSuite with RedisClusterEnv diff --git a/src/test/scala/com/redislabs/provider/redis/stream/standalone/RedisXStreamStandaloneSuite.scala b/src/test/scala/com/redislabs/provider/redis/stream/standalone/RedisXStreamStandaloneSuite.scala deleted file mode 100644 index ba8c5004..00000000 --- a/src/test/scala/com/redislabs/provider/redis/stream/standalone/RedisXStreamStandaloneSuite.scala +++ /dev/null @@ -1,6 +0,0 @@ -package com.redislabs.provider.redis.stream.standalone - -import com.redislabs.provider.redis.env.RedisStandaloneEnv -import com.redislabs.provider.redis.stream.RedisXStreamSuite - -class RedisXStreamStandaloneSuite extends RedisXStreamSuite with RedisStandaloneEnv diff --git a/src/test/scala/com/redislabs/provider/redis/util/ConnectionUtilsTest.scala b/src/test/scala/com/redislabs/provider/redis/util/ConnectionUtilsTest.scala index 60f2663d..2e3b7cb3 100644 --- a/src/test/scala/com/redislabs/provider/redis/util/ConnectionUtilsTest.scala +++ b/src/test/scala/com/redislabs/provider/redis/util/ConnectionUtilsTest.scala @@ -5,7 +5,7 @@ import com.redislabs.provider.redis.util.ConnectionUtils.{JedisExt, XINFO} import org.scalatest.{FunSuite, Matchers} import redis.clients.jedis.StreamEntryID -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** * @author The Viet Nguyen diff --git a/src/test/scala/org/apache/spark/sql/redis/stream/RedisStreamSourceSuite.scala b/src/test/scala/org/apache/spark/sql/redis/stream/RedisStreamSourceSuite.scala index cd85e825..c04c64a7 100644 --- a/src/test/scala/org/apache/spark/sql/redis/stream/RedisStreamSourceSuite.scala +++ b/src/test/scala/org/apache/spark/sql/redis/stream/RedisStreamSourceSuite.scala @@ -2,7 +2,6 @@ package org.apache.spark.sql.redis.stream import java.io.File import java.util.UUID - import com.redislabs.provider.redis.RedisConfig import com.redislabs.provider.redis.env.Env import com.redislabs.provider.redis.util.ConnectionUtils.{JedisExt, XINFO, withConnection} @@ -14,8 +13,9 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.{FunSuite, Matchers} import redis.clients.jedis.StreamEntryID -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.concurrent.duration.DurationLong +import scala.language.postfixOps /** * @author The Viet Nguyen