Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prepare for Spark 4 #394

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ target/
*.pyc
sbt/*.jar

# test artifacts
checkpoint-test

# sbt specific
.cache/
.history/
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The master branch contains the recent development for the next release.
| Spark-Redis | Spark | Redis | Supported Scala Versions |
|---------------------------------------------------------------------------|-------| ---------------- | ------------------------ |
| [master](https://github.com/RedisLabs/spark-redis/) | 3.2.x | >=2.9.0 | 2.12 |
| [4.0](https://github.com/RedisLabs/spark-redis/tree/branch-4.0) | 4.0.x | >=2.9.0 | 2.13 |
| [3.0](https://github.com/RedisLabs/spark-redis/tree/branch-3.0) | 3.0.x | >=2.9.0 | 2.12 |
| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
Expand Down
23 changes: 9 additions & 14 deletions doc/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,17 @@
<dependencies>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.11</artifactId>
<version>2.4.2</version>
<artifactId>spark-redis_2.13</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
```

Or

```xml
<dependencies>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.12</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
```

### SBT

```scala
libraryDependencies += "com.redislabs" %% "spark-redis" % "2.4.2"
libraryDependencies += "com.redislabs" %% "spark-redis" % "4.0.0"
```

### Build form source
Expand Down Expand Up @@ -115,3 +104,9 @@ val ssc = new StreamingContext(sc, Seconds(1))
val redisStream = ssc.createRedisStream(Array("foo", "bar"),
storageLevel = StorageLevel.MEMORY_AND_DISK_2)
```

or

```scala
// add example with StructuredStreaming
```
49 changes: 37 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.12</artifactId>
<version>3.1.0-SNAPSHOT</version>
<artifactId>spark-redis_2.13</artifactId>
<version>4.0.0-SNAPSHOT</version>
<name>Spark-Redis</name>
<description>A Spark library for Redis</description>
<url>http://github.com/RedisLabs/spark-redis</url>
Expand Down Expand Up @@ -46,14 +46,39 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.major.version>2.12</scala.major.version>
<scala.complete.version>${scala.major.version}.0</scala.complete.version>
<jedis.version>3.9.0</jedis.version>
<spark.version>3.2.1</spark.version>
<java.version>17</java.version>
<scala.major.version>2.13</scala.major.version>
<scala.complete.version>${scala.major.version}.15</scala.complete.version>
<jedis.version>5.2.0</jedis.version>
<spark.version>4.0.0-preview2</spark.version>
<plugins.scalatest.version>1.0</plugins.scalatest.version>
<extraJavaTestArgs>
-XX:+IgnoreUnrecognizedVMOptions
-Djavax.net.ssl.trustStorePassword=password
-Djavax.net.ssl.trustStore=./src/test/resources/tls/clientkeystore
-Djavax.net.ssl.trustStoreType=jceks
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
--add-modules=jdk.incubator.vector
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
</extraJavaTestArgs>
</properties>



<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
Expand Down Expand Up @@ -89,7 +114,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<version>4.9.1</version>
<configuration>
<scalaVersion>${scala.complete.version}</scalaVersion>
<javacArgs>
Expand Down Expand Up @@ -125,7 +150,7 @@
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.7.0</version>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnWarning>false</failOnWarning>
Expand Down Expand Up @@ -157,7 +182,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.4</version>
<version>3.7.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down Expand Up @@ -245,7 +270,7 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
<argLine>-XX:MaxPermSize=256m -Xmx2g -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.trustStore=./src/test/resources/tls/clientkeystore -Djavax.net.ssl.trustStoreType=jceks</argLine>
<argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:+IgnoreUnrecognizedVMOptions -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.trustStore=./src/test/resources/tls/clientkeystore -Djavax.net.ssl.trustStoreType=jceks -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED</argLine>
<tagsToExclude>com.redislabs.provider.redis.util.BenchmarkTest</tagsToExclude>
</configuration>
<executions>
Expand Down Expand Up @@ -373,7 +398,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>3.0.1</version>
<version>3.2.7</version>
<configuration>
<gpgArguments>
<arg>--pinentry-mode</arg>
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@ import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
import scala.jdk.CollectionConverters._


object ConnectionPool {
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] =
new ConcurrentHashMap[RedisEndpoint, JedisPool]()

def connect(re: RedisEndpoint): Jedis = {
val pool = pools.getOrElseUpdate(re,

val pool = pools.asScala.getOrElseUpdate(re,
{
val poolConfig: JedisPoolConfig = new JedisPoolConfig();
val poolConfig: JedisPoolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(250)
poolConfig.setMaxIdle(32)
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setSoftMinEvictableIdleTime(Duration.ofMinutes(1))
poolConfig.setSoftMinEvictableIdleDuration(Duration.ofMinutes(1))
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30))
poolConfig.setNumTestsPerEvictionRun(-1)

Expand All @@ -36,10 +37,9 @@ object ConnectionPool {
}
catch {
case e: JedisConnectionException if e.getCause.toString.
contains("ERR max number of clients reached") => {
contains("ERR max number of clients reached") =>
if (sleepTime < 500) sleepTime *= 2
Thread.sleep(sleepTime)
}
case e: Exception => throw e
}
}
Expand Down
37 changes: 23 additions & 14 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.redislabs.provider.redis

import java.net.URI

import org.apache.spark.SparkConf
import redis.clients.jedis.resps.{ClusterShardInfo, ClusterShardNodeInfo}
import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder}
import redis.clients.jedis.{Jedis, Protocol}

import scala.collection.JavaConversions._
import java.nio.charset.Charset
import scala.jdk.CollectionConverters._


/**
Expand Down Expand Up @@ -71,7 +72,8 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
* @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
*/
def this(uri: URI) {
this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri),
JedisURIHelper.getDBIndex(uri),
Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme)
}

Expand Down Expand Up @@ -130,7 +132,8 @@ object ReadWriteConfig {
val RddWriteIteratorGroupingSizeKey = "spark.redis.rdd.write.iterator.grouping.size"
val RddWriteIteratorGroupingSizeDefault = 1000

val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault, RddWriteIteratorGroupingSizeDefault)
val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault,
RddWriteIteratorGroupingSizeDefault)

def fromSparkConf(conf: SparkConf): ReadWriteConfig = {
ReadWriteConfig(
Expand Down Expand Up @@ -325,11 +328,14 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
*/
private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
val conn = initialHost.connect()
val res = conn.clusterSlots().flatMap {
slotInfoObj => {
val slotInfo = slotInfoObj.asInstanceOf[java.util.List[java.lang.Object]]
val sPos = slotInfo.get(0).toString.toInt
val ePos = slotInfo.get(1).toString.toInt

val res = conn.clusterShards().asScala.flatMap {
shardInfoObj: ClusterShardInfo => {
val slotInfo = shardInfoObj.getSlots

// todo: Can we have more than 1 node per ClusterShard?
val nodeInfo = shardInfoObj.getNodes.get(0)

/*
* We will get all the nodes with the slots range [sPos, ePos],
* and create RedisNode for each nodes, the total field of all
Expand All @@ -339,10 +345,11 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
* And the idx of a master is always 0, we rely on this fact to
* filter master.
*/
(0 until (slotInfo.size - 2)).map(i => {
val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
val port = node.get(1).toString.toInt
(0 until (slotInfo.size)).map(i => {
val host = SafeEncoder.encode(nodeInfo.getIp.getBytes(Charset.forName("UTF8")))
val port = nodeInfo.getPort.toInt
val slotStart = slotInfo.get(i).get(0).toInt
val slotEnd = slotInfo.get(i).get(1).toInt
val endpoint = RedisEndpoint(
host = host,
port = port,
Expand All @@ -351,7 +358,9 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
dbNum = initialHost.dbNum,
timeout = initialHost.timeout,
ssl = initialHost.ssl)
RedisNode(endpoint, sPos, ePos, i, slotInfo.size - 2)
val role = nodeInfo.getRole
val idx = if (role == "master") 0 else i
RedisNode(endpoint, slotStart, slotEnd, idx, slotInfo.size)
})
}
}.toArray
Expand Down
Loading