Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion soda/spark/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
"odbc": [
"pyodbc",
],
"databricks": ["databricks-sql-connector"],
"databricks": [
"databricks-sql-connector",
"databricks-sdk",
],
}
# TODO Fix the params
setup(
Expand Down
50 changes: 50 additions & 0 deletions soda/spark/soda/data_sources/spark_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,66 @@ def odbc_connection_function(


def databricks_connection_function(host: str, http_path: str, token: str, database: str, schema: str, **kwargs):
"""
Connection to databricks with databricks sql connector.

Supplying a token will enforce connection via personal access token.

host, client_id and client_secret keys can be supplied to the configuration parameter for m2m oauth.

Setting oauth_method to "databricks-oauth" will enforce a u2m oauth connection.

Read the python-sql-connector documentation for more information.

Parameters
----------
host : str
The databricks server host name.
http_path: str
The http_path to your databricks sql warehouse or cluster
token: str
Databricks personal access token
database: str
The databricks catalog
schema : str
The databricks schema

Returns
-------
out : databricks.sql.Connection
The databricks connection object
"""
from databricks import sql

user_agent_entry = f"soda-core-spark/{SODA_CORE_VERSION} (Databricks)"
logging.getLogger("databricks.sql").setLevel(logging.INFO)

auth_method = kwargs.get("auth_method")

if not token and not auth_method:
from databricks.sdk.core import Config, oauth_service_principal

config = Config(**kwargs.get("configuration", {}))

if not host:
host = config.hostname

def credential_provider():

return oauth_service_principal(config)

credentials_provider = credential_provider
else:
credentials_provider = None

connection = sql.connect(
server_hostname=host,
catalog=database,
schema=schema,
http_path=http_path,
access_token=token,
credentials_provider=credentials_provider,
auth_type=kwargs.get("auth_method"),
_user_agent_entry=user_agent_entry,
)
return connection
Expand Down