Skip to content

Commit 12d9f2d

Browse files
authored
Merge pull request #639 from capYTS/master
添加hive output
2 parents ab7f201 + 45951b3 commit 12d9f2d

File tree

1 file changed

+69
-0
lines changed
  • waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch

1 file changed

+69
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.github.interestinglab.waterdrop.output.batch
2+
3+
import io.github.interestinglab.waterdrop.apis.BaseOutput
4+
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
5+
import org.apache.spark.sql.{DataFrameWriter, Dataset, Row, SparkSession}
6+
7+
import java.util
8+
import scala.collection.JavaConversions._
9+
10+
class Hive extends BaseOutput {
11+
12+
var config: Config = ConfigFactory.empty()
13+
14+
/**
15+
* Set Config.
16+
* */
17+
override def setConfig(config: Config): Unit = {
18+
this.config = config
19+
}
20+
21+
/**
22+
* Get Config.
23+
* */
24+
override def getConfig(): Config = {
25+
this.config
26+
}
27+
28+
override def checkConfig(): (Boolean, String) = {
29+
config.hasPath("sql") || (config.hasPath("source_table_name") && config.hasPath("result_table_name")) match {
30+
case true => (true ,"")
31+
case false => (false,"请使用 sql 或者 (source_table_name 和 result_table_name) ")
32+
}
33+
}
34+
35+
override def prepare(spark: SparkSession): Unit = {
36+
37+
}
38+
39+
override def process(df: Dataset[Row]): Unit = {
40+
val sparkSession = df.sparkSession
41+
config.hasPath("sql") match {
42+
case true => {
43+
val sql = config.getString("sql")
44+
sparkSession.sql(sql)
45+
}
46+
case _ =>{
47+
val sourceTableName = config.getString("source_table_name")
48+
val resultTableName = config.getString("result_table_name")
49+
val sinkColumns = config.hasPath("sink_columns") match {
50+
case true => config.getString("sink_columns")
51+
case false => "*"
52+
}
53+
val sinkFrame = sparkSession.sql(s"select $sinkColumns from $sourceTableName")
54+
val frameWriter: DataFrameWriter[Row] = config.hasPath("save_mode") match {
55+
case true => sinkFrame.write.mode(config.getString("save_mode"))
56+
case _ => sinkFrame.write
57+
}
58+
config.hasPath("partition_by") match {
59+
case true =>
60+
val partitionList: util.List[String] = config.getStringList("partition_by")
61+
frameWriter.partitionBy(partitionList:_*).saveAsTable(resultTableName)
62+
case _ => frameWriter.saveAsTable(resultTableName)
63+
}
64+
}
65+
}
66+
67+
}
68+
69+
}

0 commit comments

Comments
 (0)