Skip to content

Commit 9928d70

Browse files
authored
fix(ingest/databricks): support hive metastore schemas with special char (#10049)
1 parent f0bdc24 commit 9928d70

File tree

7 files changed

+380
-136
lines changed

7 files changed

+380
-136
lines changed

metadata-ingestion/docs/sources/databricks/README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup.
22

3-
## Databricks Hive
3+
## Databricks Unity Catalog (new)
44

5-
The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace.
5+
The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have Unity Catalog Enabled Workspace, you can use the `unity-catalog` source (aka `databricks` source, see below for details) to integrate your metadata into DataHub as an alternate to the Hive pathway. This also ingests hive metastore catalog in Databricks and is recommended approach to ingest Databricks ecosystem in DataHub.
66

7-
## Databricks Unity Catalog (new)
7+
## Databricks Hive (old)
8+
9+
The alternative way to integrate is via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace.
810

9-
The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the `unity-catalog` source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway.
1011

1112
## Databricks Spark
1213

metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py

Lines changed: 163 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
TableProfile,
2222
TableReference,
2323
)
24+
from datahub.ingestion.source.unity.report import UnityCatalogReport
2425

2526
logger = logging.getLogger(__name__)
2627
HIVE_METASTORE = "hive_metastore"
@@ -66,9 +67,12 @@ class HiveMetastoreProxy(Closeable):
6667
as unity catalog apis do not return details about this legacy metastore.
6768
"""
6869

69-
def __init__(self, sqlalchemy_url: str, options: dict) -> None:
70+
def __init__(
71+
self, sqlalchemy_url: str, options: dict, report: UnityCatalogReport
72+
) -> None:
7073
try:
7174
self.inspector = HiveMetastoreProxy.get_inspector(sqlalchemy_url, options)
75+
self.report = report
7276
except Exception:
7377
# This means that there is no `hive_metastore` catalog in databricks workspace
7478
# Not tested but seems like the logical conclusion.
@@ -100,22 +104,53 @@ def hive_metastore_schemas(self, catalog: Catalog) -> Iterable[Schema]:
100104
)
101105

102106
def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]:
103-
views = self.inspector.get_view_names(schema.name)
107+
# NOTE: Ideally, we use `inspector.get_view_names` and `inspector.get_table_names` here instead of
108+
# making show queries in this class however Databricks dialect for databricks-sql-connector<3.0.0 does not
109+
# back-quote schemas with special char such as hyphen.
110+
# Currently, databricks-sql-connector is pinned to <3.0.0 due to requirement of SQLAlchemy > 2.0.21 for
111+
# later versions.
112+
views = self.get_view_names(schema.name)
104113
for table_name in views:
105114
yield self._get_table(schema, table_name, True)
106115

107-
for table_name in self.inspector.get_table_names(schema.name):
116+
for table_name in self.get_table_names(schema.name):
108117
if table_name in views:
109118
continue
110119
yield self._get_table(schema, table_name, False)
111120

121+
def get_table_names(self, schema_name: str) -> List[str]:
122+
try:
123+
rows = self._execute_sql(f"SHOW TABLES FROM `{schema_name}`")
124+
# 3 columns - database, tableName, isTemporary
125+
return [row.tableName for row in rows]
126+
except Exception as e:
127+
self.report.report_warning(
128+
"Failed to get tables for schema", f"{HIVE_METASTORE}.{schema_name}"
129+
)
130+
logger.warning(
131+
f"Failed to get tables {schema_name} due to {e}", exc_info=True
132+
)
133+
return []
134+
135+
def get_view_names(self, schema_name: str) -> List[str]:
136+
try:
137+
rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`")
138+
# 3 columns - database, tableName, isTemporary
139+
return [row.tableName for row in rows]
140+
except Exception as e:
141+
self.report.report_warning("Failed to get views for schema", schema_name)
142+
logger.warning(
143+
f"Failed to get views {schema_name} due to {e}", exc_info=True
144+
)
145+
return []
146+
112147
def _get_table(
113148
self,
114149
schema: Schema,
115150
table_name: str,
116151
is_view: bool = False,
117152
) -> Table:
118-
columns = self._get_columns(schema, table_name)
153+
columns = self._get_columns(schema.name, table_name)
119154
detailed_info = self._get_table_info(schema.name, table_name)
120155

121156
comment = detailed_info.pop("Comment", None)
@@ -134,9 +169,9 @@ def _get_table(
134169
columns=columns,
135170
storage_location=storage_location,
136171
data_source_format=datasource_format,
137-
view_definition=self._get_view_definition(schema.name, table_name)
138-
if is_view
139-
else None,
172+
view_definition=(
173+
self._get_view_definition(schema.name, table_name) if is_view else None
174+
),
140175
properties=detailed_info,
141176
owner=None,
142177
generation=None,
@@ -150,61 +185,69 @@ def _get_table(
150185

151186
def get_table_profile(
152187
self, ref: TableReference, include_column_stats: bool = False
153-
) -> TableProfile:
188+
) -> Optional[TableProfile]:
154189
columns = self._get_columns(
155-
Schema(
156-
id=ref.schema,
157-
name=ref.schema,
158-
# This is okay, as none of this is used in profiling
159-
catalog=self.hive_metastore_catalog(None),
160-
comment=None,
161-
owner=None,
162-
),
190+
ref.schema,
163191
ref.table,
164192
)
165193
detailed_info = self._get_table_info(ref.schema, ref.table)
166194

195+
if not columns and not detailed_info:
196+
return None
197+
167198
table_stats = (
168199
self._get_cached_table_statistics(detailed_info["Statistics"])
169200
if detailed_info.get("Statistics")
170201
else {}
171202
)
172203

204+
column_profiles: List[ColumnProfile] = []
205+
if include_column_stats:
206+
for column in columns:
207+
column_profile = self._get_column_profile(column.name, ref)
208+
if column_profile:
209+
column_profiles.append(column_profile)
210+
173211
return TableProfile(
174-
num_rows=int(table_stats[ROWS])
175-
if table_stats.get(ROWS) is not None
176-
else None,
177-
total_size=int(table_stats[BYTES])
178-
if table_stats.get(BYTES) is not None
179-
else None,
212+
num_rows=(
213+
int(table_stats[ROWS]) if table_stats.get(ROWS) is not None else None
214+
),
215+
total_size=(
216+
int(table_stats[BYTES]) if table_stats.get(BYTES) is not None else None
217+
),
180218
num_columns=len(columns),
181-
column_profiles=[
182-
self._get_column_profile(column.name, ref) for column in columns
183-
]
184-
if include_column_stats
185-
else [],
219+
column_profiles=column_profiles,
186220
)
187221

188-
def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile:
189-
190-
props = self._column_describe_extended(ref.schema, ref.table, column)
191-
col_stats = {}
192-
for prop in props:
193-
col_stats[prop[0]] = prop[1]
194-
return ColumnProfile(
195-
name=column,
196-
null_count=int(col_stats[NUM_NULLS])
197-
if col_stats.get(NUM_NULLS) is not None
198-
else None,
199-
distinct_count=int(col_stats[DISTINCT_COUNT])
200-
if col_stats.get(DISTINCT_COUNT) is not None
201-
else None,
202-
min=col_stats.get(MIN),
203-
max=col_stats.get(MAX),
204-
avg_len=col_stats.get(AVG_COL_LEN),
205-
max_len=col_stats.get(MAX_COL_LEN),
206-
version=col_stats.get(VERSION),
207-
)
222+
def _get_column_profile(
223+
self, column: str, ref: TableReference
224+
) -> Optional[ColumnProfile]:
225+
try:
226+
props = self._column_describe_extended(ref.schema, ref.table, column)
227+
col_stats = {}
228+
for prop in props:
229+
col_stats[prop[0]] = prop[1]
230+
return ColumnProfile(
231+
name=column,
232+
null_count=(
233+
int(col_stats[NUM_NULLS])
234+
if col_stats.get(NUM_NULLS) is not None
235+
else None
236+
),
237+
distinct_count=(
238+
int(col_stats[DISTINCT_COUNT])
239+
if col_stats.get(DISTINCT_COUNT) is not None
240+
else None
241+
),
242+
min=col_stats.get(MIN),
243+
max=col_stats.get(MAX),
244+
avg_len=col_stats.get(AVG_COL_LEN),
245+
max_len=col_stats.get(MAX_COL_LEN),
246+
version=col_stats.get(VERSION),
247+
)
248+
except Exception as e:
249+
logger.debug(f"Failed to get column profile for {ref}.{column} due to {e}")
250+
return None
208251

209252
def _get_cached_table_statistics(self, statistics: str) -> dict:
210253
# statistics is in format "xx bytes" OR "1382 bytes, 2 rows"
@@ -242,9 +285,14 @@ def _get_view_definition(self, schema_name: str, table_name: str) -> Optional[st
242285
)
243286
for row in rows:
244287
return row[0]
245-
except Exception:
288+
except Exception as e:
289+
self.report.report_warning(
290+
"Failed to get view definition for table",
291+
f"{HIVE_METASTORE}.{schema_name}.{table_name}",
292+
)
246293
logger.debug(
247-
f"Failed to get view definition for {schema_name}.{table_name}"
294+
f"Failed to get view definition for {schema_name}.{table_name} due to {e}",
295+
exc_info=True,
248296
)
249297
return None
250298

@@ -258,60 +306,81 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType:
258306
else:
259307
return HiveTableType.UNKNOWN
260308

309+
@lru_cache(maxsize=1)
261310
def _get_table_info(self, schema_name: str, table_name: str) -> dict:
262-
rows = self._describe_extended(schema_name, table_name)
263-
264-
index = rows.index(("# Detailed Table Information", "", ""))
265-
rows = rows[index + 1 :]
266-
# Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375
267311
# Generate properties dictionary.
268312
properties = {}
269-
active_heading = None
270-
for col_name, data_type, value in rows:
271-
col_name = col_name.rstrip()
272-
if col_name.startswith("# "):
273-
continue
274-
elif col_name == "" and data_type is None:
275-
active_heading = None
276-
continue
277-
elif col_name != "" and data_type is None:
278-
active_heading = col_name
279-
elif col_name != "" and data_type is not None:
280-
properties[col_name] = data_type.strip()
281-
else:
282-
# col_name == "", data_type is not None
283-
prop_name = "{} {}".format(active_heading, data_type.rstrip())
284-
properties[prop_name] = value.rstrip()
285313

314+
try:
315+
rows = self._describe_extended(schema_name, table_name)
316+
317+
index = rows.index(("# Detailed Table Information", "", ""))
318+
rows = rows[index + 1 :]
319+
# Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375
320+
321+
active_heading = None
322+
for col_name, data_type, value in rows:
323+
col_name = col_name.rstrip()
324+
if col_name.startswith("# "):
325+
continue
326+
elif col_name == "" and data_type is None:
327+
active_heading = None
328+
continue
329+
elif col_name != "" and data_type is None:
330+
active_heading = col_name
331+
elif col_name != "" and data_type is not None:
332+
properties[col_name] = data_type.strip()
333+
else:
334+
# col_name == "", data_type is not None
335+
prop_name = "{} {}".format(active_heading, data_type.rstrip())
336+
properties[prop_name] = value.rstrip()
337+
except Exception as e:
338+
self.report.report_warning(
339+
"Failed to get detailed info for table",
340+
f"{HIVE_METASTORE}.{schema_name}.{table_name}",
341+
)
342+
logger.debug(
343+
f"Failed to get detailed info for table {schema_name}.{table_name} due to {e}",
344+
exc_info=True,
345+
)
286346
return properties
287347

288-
def _get_columns(self, schema: Schema, table_name: str) -> List[Column]:
289-
rows = self._describe_extended(schema.name, table_name)
290-
348+
@lru_cache(maxsize=1)
349+
def _get_columns(self, schema_name: str, table_name: str) -> List[Column]:
291350
columns: List[Column] = []
292-
for i, row in enumerate(rows):
293-
if i == 0 and row[0].strip() == "col_name":
294-
continue # first row
295-
if row[0].strip() in (
296-
"",
297-
"# Partition Information",
298-
"# Detailed Table Information",
299-
):
300-
break
301-
columns.append(
302-
Column(
303-
name=row[0].strip(),
304-
id=f"{schema.id}.{table_name}.{row[0].strip()}",
305-
type_text=row[1].strip(),
306-
type_name=type_map.get(row[1].strip().lower()),
307-
type_scale=None,
308-
type_precision=None,
309-
position=None,
310-
nullable=None,
311-
comment=row[2],
351+
try:
352+
rows = self._describe_extended(schema_name, table_name)
353+
for i, row in enumerate(rows):
354+
if i == 0 and row[0].strip() == "col_name":
355+
continue # first row
356+
if row[0].strip() in (
357+
"",
358+
"# Partition Information",
359+
"# Detailed Table Information",
360+
):
361+
break
362+
columns.append(
363+
Column(
364+
name=row[0].strip(),
365+
id=f"{HIVE_METASTORE}.{schema_name}.{table_name}.{row[0].strip()}",
366+
type_text=row[1].strip(),
367+
type_name=type_map.get(row[1].strip().lower()),
368+
type_scale=None,
369+
type_precision=None,
370+
position=None,
371+
nullable=None,
372+
comment=row[2],
373+
)
312374
)
375+
except Exception as e:
376+
self.report.report_warning(
377+
"Failed to get columns for table",
378+
f"{HIVE_METASTORE}.{schema_name}.{table_name}",
379+
)
380+
logger.debug(
381+
f"Failed to get columns for table {schema_name}.{table_name} due to {e}",
382+
exc_info=True,
313383
)
314-
315384
return columns
316385

317386
@lru_cache(maxsize=1)

0 commit comments

Comments
 (0)