Skip to content

Commit 2688047

Browse files
committed
add test for SQL
1 parent 8099a9d commit 2688047

File tree

2 files changed

+139
-0
lines changed

2 files changed

+139
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.redislabs.provider.redis.rdd
2+
3+
import org.apache.spark.{SparkContext, SparkConf}
4+
import org.scalatest.{BeforeAndAfterAll, ShouldMatchers, FunSuite}
5+
import org.apache.spark.sql.SQLContext
6+
import com.redislabs.provider.redis._
7+
8+
class RedisSparkSQLClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with ShouldMatchers {
9+
10+
var sqlContext: SQLContext = null
11+
override def beforeAll() {
12+
super.beforeAll()
13+
14+
sc = new SparkContext(new SparkConf()
15+
.setMaster("local").setAppName(getClass.getName)
16+
.set("redis.host", "127.0.0.1")
17+
.set("redis.port", "7379")
18+
)
19+
redisConfig = new RedisConfig(new RedisEndpoint("127.0.0.1", 7379))
20+
21+
// Flush all the hosts
22+
redisConfig.hosts.foreach( node => {
23+
val conn = node.connect
24+
conn.flushAll
25+
conn.close
26+
})
27+
28+
sqlContext = new SQLContext(sc)
29+
sqlContext.sql( s"""
30+
|CREATE TEMPORARY TABLE rl
31+
|(name STRING, score INT)
32+
|USING com.redislabs.provider.redis.sql
33+
|OPTIONS (table 'rl')
34+
""".stripMargin)
35+
36+
(1 to 64).foreach{
37+
index => {
38+
sqlContext.sql(s"insert overwrite table rl select t.* from (select 'rl${index}', ${index}) t")
39+
}
40+
}
41+
}
42+
43+
test("RedisKVRDD - default(cluster)") {
44+
val df = sqlContext.sql(
45+
s"""
46+
|SELECT *
47+
|FROM rl
48+
""".stripMargin)
49+
df.filter(df("score") > 10).count should be (54)
50+
df.filter(df("score") > 10 and df("score") < 20).count should be (9)
51+
}
52+
53+
test("RedisKVRDD - cluster") {
54+
implicit val c: RedisConfig = redisConfig
55+
val df = sqlContext.sql(
56+
s"""
57+
|SELECT *
58+
|FROM rl
59+
""".stripMargin)
60+
df.filter(df("score") > 10).count should be (54)
61+
df.filter(df("score") > 10 and df("score") < 20).count should be (9)
62+
}
63+
64+
override def afterAll(): Unit = {
65+
sc.stop
66+
System.clearProperty("spark.driver.port")
67+
}
68+
}
69+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.redislabs.provider.redis.rdd
2+
3+
import com.redislabs.provider.redis._
4+
import org.apache.spark.sql.SQLContext
5+
import org.apache.spark.{SparkConf, SparkContext}
6+
import org.scalatest.{BeforeAndAfterAll, FunSuite, ShouldMatchers}
7+
8+
class RedisSparkSQLStandaloneSuite extends FunSuite with ENV with BeforeAndAfterAll with ShouldMatchers {
9+
10+
var sqlContext: SQLContext = null
11+
override def beforeAll() {
12+
super.beforeAll()
13+
14+
sc = new SparkContext(new SparkConf()
15+
.setMaster("local").setAppName(getClass.getName)
16+
.set("redis.host", "127.0.0.1")
17+
.set("redis.port", "6379")
18+
.set("redis.auth", "passwd")
19+
)
20+
redisConfig = new RedisConfig(new RedisEndpoint("127.0.0.1", 6379, "passwd"))
21+
22+
// Flush all the hosts
23+
redisConfig.hosts.foreach( node => {
24+
val conn = node.connect
25+
conn.flushAll
26+
conn.close
27+
})
28+
29+
sqlContext = new SQLContext(sc)
30+
sqlContext.sql( s"""
31+
|CREATE TEMPORARY TABLE rl
32+
|(name STRING, score INT)
33+
|USING com.redislabs.provider.redis.sql
34+
|OPTIONS (table 'rl')
35+
""".stripMargin)
36+
37+
(1 to 64).foreach{
38+
index => {
39+
sqlContext.sql(s"insert overwrite table rl select t.* from (select 'rl${index}', ${index}) t")
40+
}
41+
}
42+
}
43+
44+
test("RedisKVRDD - default(cluster)") {
45+
val df = sqlContext.sql(
46+
s"""
47+
|SELECT *
48+
|FROM rl
49+
""".stripMargin)
50+
df.filter(df("score") > 10).count should be (54)
51+
df.filter(df("score") > 10 and df("score") < 20).count should be (9)
52+
}
53+
54+
test("RedisKVRDD - cluster") {
55+
implicit val c: RedisConfig = redisConfig
56+
val df = sqlContext.sql(
57+
s"""
58+
|SELECT *
59+
|FROM rl
60+
""".stripMargin)
61+
df.filter(df("score") > 10).count should be (54)
62+
df.filter(df("score") > 10 and df("score") < 20).count should be (9)
63+
}
64+
65+
override def afterAll(): Unit = {
66+
sc.stop
67+
System.clearProperty("spark.driver.port")
68+
}
69+
}
70+

0 commit comments

Comments
 (0)