Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 39d8fe2

Browse files
author
Zeqi Cui
authored
Merge pull request #1 from Azure/initial-spark-sql-connector
Azure SQL DB / SQL Server Connector
2 parents 480f931 + 4e9b601 commit 39d8fe2

36 files changed

+2563
-10
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
11
*.class
22
*.log
3+
*.iml
4+
5+
#IDE
6+
.idea/*

README.md

Lines changed: 142 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,146 @@
1+
# Spark connector for Azure SQL Databases and SQL Server
12

2-
# Contributing
3+
The Spark connector for [Azure SQL Database](https://azure.microsoft.com/en-us/services/sql-database/) and [SQL Server](https://www.microsoft.com/en-us/sql-server/default.aspx) enables SQL databases, including Azure SQL Databases and SQL Server, to act as input data source or output data sink for Spark jobs. It allows you to utilize real time transactional data in big data analytics and persist results for adhoc queries or reporting.
34

4-
This project welcomes contributions and suggestions. Most contributions require you to agree to a
5-
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
6-
the rights to use your contribution. For details, visit https://cla.microsoft.com.
5+
Comparing to the built-in Spark connector, this connector provides the ability to bulk insert data into SQL databases. It can outperform row by row insertion with 10x to 20x faster performance. The Spark connector for Azure SQL Databases and SQL Server also supports AAD authentication. It allows you securely connecting to your Azure SQL databases from Azure Databricks using your AAD account. It provides similar interfaces with the built-in JDBC connector. It is easy to migrate your existing Spark jobs to use this new connector.
76

8-
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
9-
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions
10-
provided by the bot. You will only need to do this once across all repos using our CLA.
7+
## How to connect to Spark using this library
8+
This connector uses Microsoft SQLServer JDBC driver to fetch data from/to the Azure SQL Database.
9+
Results are of the `DataFrame` type.
1110

12-
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
13-
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
14-
contact [[email protected]](mailto:[email protected]) with any additional questions or comments.
11+
All connection properties in
12+
<a href="https://docs.microsoft.com/en-us/sql/connect/jdbc/setting-the-connection-properties">
13+
Microsoft JDBC Driver for SQL Server
14+
</a> are supported in this connector. Add connection properties as fields in the `com.microsoft.azure.sqldb.spark.config.Config` object.
15+
16+
17+
### Reading from Azure SQL Database or SQL Server
18+
```scala
19+
import com.microsoft.azure.sqldb.spark.config.Config
20+
import com.microsoft.azure.sqldb.spark.connect._
21+
22+
val config = Config(Map(
23+
"url" -> "mysqlserver.database.windows.net",
24+
"databaseName" -> "MyDatabase",
25+
"dbTable" -> "dbo.Clients"
26+
"user" -> "username",
27+
"password" -> "*********",
28+
"connectTimeout" -> "5", //seconds
29+
"queryTimeout" -> "5" //seconds
30+
))
31+
32+
val collection = sqlContext.read.sqlDB(config)
33+
collection.show()
34+
35+
```
36+
37+
### Writing to Azure SQL Database or SQL Server
38+
```scala
39+
import com.microsoft.azure.sqldb.spark.config.Config
40+
import com.microsoft.azure.sqldb.spark.connect._
41+
42+
// Aquire a DataFrame collection (val collection)
43+
44+
val config = Config(Map(
45+
"url" -> "mysqlserver.database.windows.net",
46+
"databaseName" -> "MyDatabase",
47+
"dbTable" -> "dbo.Clients"
48+
"user" -> "username",
49+
"password" -> "*********"
50+
))
51+
52+
import org.apache.spark.sql.SaveMode
53+
collection.write.mode(SaveMode.Append).sqlDB(config)
54+
55+
```
56+
### Pushdown query to Azure SQL Database or SQL Server
57+
For SELECT queries with expected return results, please use
58+
[Reading from Azure SQL Database using Scala](#reading-from-azure-sql-database-using-scala)
59+
```scala
60+
import com.microsoft.azure.sqldb.spark.config.Config
61+
import com.microsoft.azure.sqldb.spark.query._
62+
val query = """
63+
|UPDATE Customers
64+
|SET ContactName = 'Alfred Schmidt', City= 'Frankfurt'
65+
|WHERE CustomerID = 1;
66+
""".stripMargin
67+
68+
val config = Config(Map(
69+
"url" -> "mysqlserver.database.windows.net",
70+
"databaseName" -> "MyDatabase",
71+
"user" -> "username",
72+
"password" -> "*********",
73+
"queryCustom" -> query
74+
))
75+
76+
sqlContext.azurePushdownQuery(config)
77+
```
78+
### Bulk Copy to Azure SQL Database or SQL Server
79+
```scala
80+
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
81+
import com.microsoft.azure.sqldb.spark.config.Config
82+
import com.microsoft.azure.sqldb.spark.connect._
83+
84+
/**
85+
Add column Metadata.
86+
If not specified, metadata will be automatically added
87+
from the destination table, which may suffer performance.
88+
*/
89+
var bulkCopyMetadata = new BulkCopyMetadata
90+
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
91+
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
92+
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)
93+
94+
val bulkCopyConfig = Config(Map(
95+
"url" -> "mysqlserver.database.windows.net",
96+
"databaseName" -> "MyDatabase",
97+
"user" -> "username",
98+
"password" -> "*********",
99+
"databaseName" -> "MyDatabase",
100+
"dbTable" -> "dbo.Clients",
101+
"bulkCopyBatchSize" -> "2500",
102+
"bulkCopyTableLock" -> "true",
103+
"bulkCopyTimeout" -> "600"
104+
))
105+
106+
df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
107+
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.
108+
```
109+
110+
## Requirements
111+
Official supported versions
112+
113+
| Component | Versions Supported |
114+
| --------- | ------------------ |
115+
| Apache Spark | 2.0.2 or later |
116+
| Scala | 2.10 or later |
117+
| Microsoft JDBC Driver for SQL Server | 6.2 or later |
118+
| Microsoft SQL Server | SQL Server 2008 or later |
119+
| Azure SQL Databases | Supported |
120+
121+
## Download
122+
### Download from Maven
123+
*TBD*
124+
125+
### Build this project
126+
Currently, the connector project uses maven. To build the connector without dependencies, you can run:
127+
```sh
128+
mvn clean package
129+
```
130+
131+
## Contributing & Feedback
132+
133+
This project has adopted the [Microsoft Open Source Code of
134+
Conduct](https://opensource.microsoft.com/codeofconduct/). For more information
135+
see the [Code of Conduct
136+
FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact
137+
[[email protected]](mailto:[email protected]) with any additional
138+
questions or comments.
139+
140+
To give feedback and/or report an issue, open a [GitHub
141+
Issue](https://help.github.com/articles/creating-an-issue/).
142+
143+
144+
*Apache®, Apache Spark, and Spark® are either registered trademarks or
145+
trademarks of the Apache Software Foundation in the United States and/or other
146+
countries.*

docs/images/spark_sqldb_dataflow.png

434 KB
Loading

lib/mssql-jdbc-6.2.2.jre8.jar

806 KB
Binary file not shown.

pom.xml

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.microsoft.azure.sqldb.spark</groupId>
8+
<artifactId>azure-sqldb-spark</artifactId>
9+
<version>1.0.0</version>
10+
11+
<licenses>
12+
<license>
13+
<name>MIT License</name>
14+
<url>http://www.opensource.org/licenses/mit-license.php</url>
15+
</license>
16+
</licenses>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>junit</groupId>
21+
<artifactId>junit</artifactId>
22+
<version>4.8.1</version>
23+
<scope>test</scope>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.apache.spark</groupId>
27+
<artifactId>spark-core_2.11</artifactId>
28+
<version>2.2.1</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.spark</groupId>
32+
<artifactId>spark-sql_2.11</artifactId>
33+
<version>2.2.1</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.scalactic</groupId>
37+
<artifactId>scalactic_2.11</artifactId>
38+
<version>3.0.4</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.scalatest</groupId>
42+
<artifactId>scalatest_2.11</artifactId>
43+
<version>3.0.4</version>
44+
<scope>test</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>com.microsoft.azure</groupId>
48+
<artifactId>adal4j</artifactId>
49+
<version>1.2.0</version>
50+
</dependency>
51+
</dependencies>
52+
<developers>
53+
<developer>
54+
<name>Azure SQL DB Devs</name>
55+
<organization>Microsoft</organization>
56+
<organizationUrl>http://www.microsoft.com/</organizationUrl>
57+
</developer>
58+
</developers>
59+
<build>
60+
<plugins>
61+
<plugin>
62+
<groupId>org.scalastyle</groupId>
63+
<artifactId>scalastyle-maven-plugin</artifactId>
64+
<version>1.0.0</version>
65+
<configuration>
66+
<verbose>false</verbose>
67+
<failOnViolation>true</failOnViolation>
68+
<includeTestSourceDirectory>true</includeTestSourceDirectory>
69+
<failOnWarning>false</failOnWarning>
70+
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
71+
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
72+
<configLocation>${project.basedir}/lib/scalastyle_config.xml</configLocation>
73+
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
74+
<outputEncoding>UTF-8</outputEncoding>
75+
</configuration>
76+
<executions>
77+
<execution>
78+
<goals>
79+
<goal>check</goal>
80+
</goals>
81+
</execution>
82+
</executions>
83+
</plugin>
84+
<plugin>
85+
<artifactId>maven-compiler-plugin</artifactId>
86+
<configuration>
87+
<source>1.8</source>
88+
<target>1.8</target>
89+
</configuration>
90+
</plugin>
91+
<plugin>
92+
<groupId>org.apache.maven.plugins</groupId>
93+
<artifactId>maven-assembly-plugin</artifactId>
94+
<version>3.0.0</version>
95+
<configuration>
96+
<descriptorRefs>
97+
<descriptorRef>jar-with-dependencies</descriptorRef>
98+
</descriptorRefs>
99+
</configuration>
100+
<executions>
101+
<execution>
102+
<id>assemble-all</id>
103+
<phase>package</phase>
104+
<goals>
105+
<goal>single</goal>
106+
</goals>
107+
</execution>
108+
</executions>
109+
</plugin>
110+
<plugin>
111+
<groupId>org.apache.maven.plugins</groupId>
112+
<artifactId>maven-surefire-plugin</artifactId>
113+
<version>2.7</version>
114+
<configuration>
115+
<skipTests>true</skipTests>
116+
</configuration>
117+
</plugin>
118+
<plugin>
119+
<groupId>org.scalatest</groupId>
120+
<artifactId>scalatest-maven-plugin</artifactId>
121+
<version>1.0</version>
122+
<configuration>
123+
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
124+
<junitxml>.</junitxml>
125+
<filereports>WDF TestSuite.txt</filereports>
126+
</configuration>
127+
<executions>
128+
<execution>
129+
<id>test</id>
130+
<goals>
131+
<goal>test</goal>
132+
</goals>
133+
</execution>
134+
</executions>
135+
</plugin>
136+
</plugins>
137+
</build>
138+
</project>
Binary file not shown.
Binary file not shown.

samples/notebooks/Spark Connector for Azure SQL Databases and SQL Server.html

Lines changed: 42 additions & 0 deletions
Large diffs are not rendered by default.

samples/scripts/BulkCopySample.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Import libraries
2+
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
3+
import com.microsoft.azure.sqldb.spark.config.Config
4+
import com.microsoft.azure.sqldb.spark.connect._
5+
6+
val url = "[Enter your url here]"
7+
val databaseName = "[Enter your database name here]"
8+
val dbTable = "[Enter your database table here]"
9+
10+
val user = "[Enter your username here]"
11+
val password = "[Enter your password here]"
12+
13+
// Acquire data to be written.
14+
// df could be aquired in any way.
15+
val localTable = "[Enter your local persisted table here]"
16+
val df = spark.sql(s"SELECT * FROM $localTable")
17+
18+
val writeConfig = Config(Map(
19+
"url" -> url,
20+
"databaseName" -> databaseName,
21+
"dbTable" -> dbTable,
22+
"user" -> user,
23+
"password" -> password,
24+
"connectTimeout" -> "5",
25+
"bulkCopyBatchSize" -> "100000",
26+
"bulkCopyTableLock" -> "true",
27+
"bulkCopyTimeout" -> "600"
28+
))
29+
30+
df.bulkCopyToSqlDB(writeConfig)
31+
32+
/**
33+
For better performance, specify the column metadata of the table
34+
35+
var bulkCopyMetadata = new BulkCopyMetadata
36+
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
37+
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 128, 0)
38+
bulkCopyMetadata.addColumnMetadata(3, "MiddleName", java.sql.Types.NVARCHAR, 128, 0)
39+
bulkCopyMetadata.addColumnMetadata(4, "LastName", java.sql.Types.NVARCHAR, 128, 0)
40+
..........
41+
42+
df.bulkCopyToSqlDB(writeConfig, bulkCopyMetadata)
43+
**/

samples/scripts/ReadSample.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Import libraries
2+
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
3+
import com.microsoft.azure.sqldb.spark.config.Config
4+
import com.microsoft.azure.sqldb.spark.connect._
5+
6+
val url = "[Enter your url here]"
7+
val databaseName = "[Enter your database name here]"
8+
val dbTable = "[Enter your database table here]"
9+
10+
val user = "[Enter your username here]"
11+
val password = "[Enter your password here]"
12+
13+
// READ FROM CONFIG
14+
val readConfig = Config(Map(
15+
"url" -> url,
16+
"databaseName" -> databaseName,
17+
"user" -> user,
18+
"password" -> password,
19+
"connectTimeout" -> "5",
20+
"queryTimeout" -> "5",
21+
"dbTable" -> dbTable
22+
))
23+
24+
val df = sqlContext.read.sqlDB(readConfig)
25+
println("Total rows: " + df.count)
26+
df.show()
27+
28+
// TRADITIONAL SYNTAX
29+
import java.util.Properties
30+
31+
val properties = new Properties()
32+
properties.put("databaseName", databaseName)
33+
properties.put("user", user)
34+
properties.put("password", password)
35+
properties.put("connectTimeout", "5")
36+
properties.put("queryTimeout", "5")
37+
38+
val df = sqlContext.read.sqlDB(url, dbTable, properties)
39+
println("Total rows: " + df.count)
40+
df.show()

0 commit comments

Comments
 (0)