diff --git a/src/sempy_labs/_ai.py b/src/sempy_labs/_ai.py index b26e4ad2..b569c1c3 100644 --- a/src/sempy_labs/_ai.py +++ b/src/sempy_labs/_ai.py @@ -1,10 +1,13 @@ import sempy import sempy.fabric as fabric import pandas as pd -from pyspark.sql import SparkSession from typing import List, Optional, Union from IPython.display import display import sempy_labs._icons as icons +from sempy_labs._helper_functions import ( + _read_delta_table, + _run_spark_sql_query, +) def optimize_semantic_model(dataset: str, workspace: Optional[str] = None): @@ -186,13 +189,13 @@ def generate_aggs( query = query[:-1] - spark = SparkSession.builder.getOrCreate() fromTablePath = create_abfss_path( lakehouse_id=lakehouse_id, lakehouse_workspace_id=lakehouse_workspace_id, delta_table_name=lakeTName, ) - df = spark.read.format("delta").load(fromTablePath) + + df = _read_delta_table(fromTablePath) tempTableName = f"delta_table_{lakeTName}" df.createOrReplaceTempView(tempTableName) sqlQuery = f"{query} \n FROM {tempTableName} {groupBy}" @@ -201,7 +204,7 @@ def generate_aggs( print(sqlQuery) # Save query to spark dataframe - spark_df = spark.sql(sqlQuery) + spark_df = _run_spark_sql_query(sqlQuery) f"\nCreating/updating the '{aggLakeTName}' table in the lakehouse..." # Write spark dataframe to delta table aggFilePath = create_abfss_path( @@ -419,7 +422,7 @@ def generate_aggs( # dfP = fabric.list_partitions(dataset = dataset, workspace = workspace) # isDirectLake = any(r['Mode'] == 'DirectLake' for i, r in dfP.iterrows()) -# spark = SparkSession.builder.getOrCreate() +# spark = _create_spark_session() # views = spark.sql(f"SHOW VIEWS IN {lakehouse}").collect() # for view in views: # viewName = view['viewName'] diff --git a/src/sempy_labs/_delta_analyzer.py b/src/sempy_labs/_delta_analyzer.py index f8bf49e9..10fea1bf 100644 --- a/src/sempy_labs/_delta_analyzer.py +++ b/src/sempy_labs/_delta_analyzer.py @@ -1,9 +1,8 @@ import pandas as pd import datetime -from typing import Dict +from typing import Dict, Optional import pyarrow.dataset as ds import pyarrow.parquet as pq -from pyspark.sql import SparkSession from sempy_labs._helper_functions import ( create_abfss_path, save_as_delta_table, @@ -12,19 +11,24 @@ _update_dataframe_datatypes, resolve_workspace_name_and_id, resolve_lakehouse_name_and_id, + _read_delta_table, + _delta_table_row_count, ) from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables from sempy_labs.lakehouse._lakehouse import lakehouse_attached import sempy_labs._icons as icons +from uuid import UUID def delta_analyzer( table_name: str, approx_distinct_count: bool = True, export: bool = False, + lakehouse: Optional[str | UUID] = None, + workspace: Optional[str | UUID] = None, ) -> Dict[str, pd.DataFrame]: """ - Analyzes a delta table and shows the results in dictionary containing a set of 5 dataframes. The table being analyzed must be in the lakehouse attached to the notebook. + Analyzes a delta table and shows the results in dictionary containing a set of 5 dataframes. If 'export' is set to True, the results will be saved to delta tables in the lakehouse attached to the notebook. The 5 dataframes returned by this function are: @@ -44,26 +48,52 @@ def delta_analyzer( If True, uses approx_count_distinct to calculate the cardinality of each column. If False, uses COUNT(DISTINCT) instead. export : bool, default=False If True, exports the resulting dataframes to delta tables in the lakehouse attached to the notebook. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. + Defaults to None which resolves to the lakehouse attached to the notebook. + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID used by the lakehouse. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. Returns ------- Dict[str, pandas.DataFrame] A dictionary of pandas dataframes showing semantic model objects which violated the best practice analyzer rules. """ + import notebookutils - if not lakehouse_attached(): - raise ValueError( - f"{icons.red_dot} No lakehouse is attached to this notebook. Please attach a lakehouse to the notebook before running the Delta Analyzer." - ) + display_toggle = notebookutils.common.configs.pandas_display + + # Turn off notebookutils display + if display_toggle is True: + notebookutils.common.configs.pandas_display = False prefix = "SLL_DeltaAnalyzer_" now = datetime.datetime.now() - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace=None) + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace=workspace) (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( - lakehouse=None, workspace=None + lakehouse=lakehouse, workspace=workspace ) path = create_abfss_path(lakehouse_id, workspace_id, table_name) - table_path = f"/lakehouse/default/Tables/{table_name}" + lake_path = create_abfss_path(lakehouse_id, workspace_id) + mounts = notebookutils.fs.mounts() + mount_point = f"/{workspace_name.replace(' ', '')}{lakehouse_name.replace(' ', '')}" + if not any(i.get("source") == lake_path for i in mounts): + # Mount lakehouse if not mounted + notebookutils.fs.mount(lake_path, mount_point) + print( + f"{icons.green_dot} Mounted the '{lakehouse_name}' lakehouse within the '{workspace_name}' to the notebook." + ) + + mounts = notebookutils.fs.mounts() + local_path = next( + i.get("localPath") for i in mounts if i.get("source") == lake_path + ) + table_path = f"{local_path}/Tables/{table_name}" + + # Set back to original value + notebookutils.common.configs.pandas_display = display_toggle parquet_file_df_columns = { "ParquetFile": "string", @@ -95,39 +125,21 @@ def delta_analyzer( row_group_df = _create_dataframe(columns=row_group_df_columns) column_chunk_df = _create_dataframe(columns=column_chunk_df_columns) - spark = SparkSession.builder.getOrCreate() # delta_table = DeltaTable.forPath(spark, path) # detail_df = spark.sql(f"DESCRIBE DETAIL `{table_name}`").collect()[0] # num_files = detail_df.numFiles # size_in_bytes = detail_df.sizeInBytes - latest_files = spark.read.format("delta").load(path).inputFiles() + latest_files = _read_delta_table(path).inputFiles() file_paths = [f.split("/")[-1] for f in latest_files] - row_count = spark.table(table_name).count() + row_count = _delta_table_row_count(table_name) row_groups = 0 max_rows_per_row_group = 0 min_rows_per_row_group = float("inf") - # dt = DeltaTable.forPath(spark, path) - # schema = dt.toDF().schema - # is_vorder = False - # if ( - # dt.detail() - # .collect()[0] - # .asDict() - # .get("properties") - # .get("delta.parquet.vorder.enabled") - # == "true" - # ): - # is_vorder = True schema = ds.dataset(table_path).schema.metadata is_vorder = any(b"vorder" in key for key in schema.keys()) - # v_order_level = ( - # int(schema.get(b"com.microsoft.parquet.vorder.level").decode("utf-8")) - # if is_vorder - # else None - # ) for file_name in file_paths: parquet_file = pq.ParquetFile(f"{table_path}/{file_name}") @@ -236,6 +248,7 @@ def delta_analyzer( column_name=col_name, function="approx", lakehouse=lakehouse_name, + workspace=workspace, ) else: dc = _get_column_aggregate( @@ -243,6 +256,7 @@ def delta_analyzer( column_name=col_name, function="distinctcount", lakehouse=lakehouse_name, + workspace=workspace, ) if "Cardinality" not in column_df.columns: @@ -264,6 +278,10 @@ def delta_analyzer( save_table = f"{prefix}Summary" if export: + if not lakehouse_attached(): + raise ValueError( + f"{icons.red_dot} No lakehouse is attached to this notebook. Please attach a lakehouse to the notebook before running the Delta Analyzer." + ) dfL = get_lakehouse_tables() dfL_filt = dfL[dfL["Table Name"] == save_table] if dfL_filt.empty: diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index f825370c..389d9378 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -31,7 +31,9 @@ def _build_url(url: str, params: dict) -> str: def create_abfss_path( - lakehouse_id: UUID, lakehouse_workspace_id: UUID, delta_table_name: str + lakehouse_id: UUID, + lakehouse_workspace_id: UUID, + delta_table_name: Optional[str] = None, ) -> str: """ Creates an abfss path for a delta table in a Fabric lakehouse. @@ -42,18 +44,22 @@ def create_abfss_path( ID of the Fabric lakehouse. lakehouse_workspace_id : uuid.UUID ID of the Fabric workspace. - delta_table_name : str + delta_table_name : str, default=None Name of the delta table name. Returns ------- str - An abfss path which can be used to save/reference a delta table in a Fabric lakehouse. + An abfss path which can be used to save/reference a delta table in a Fabric lakehouse or lakehouse. """ fp = _get_default_file_path() + path = f"abfss://{lakehouse_workspace_id}@{fp}/{lakehouse_id}" + + if delta_table_name is not None: + path += f"/Tables/{delta_table_name}" - return f"abfss://{lakehouse_workspace_id}@{fp}/{lakehouse_id}/Tables/{delta_table_name}" + return path def _get_default_file_path() -> str: @@ -539,7 +545,7 @@ def save_as_delta_table( ) dataframe.columns = dataframe.columns.str.replace(" ", "_") - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() type_mapping = { "string": StringType(), @@ -1248,7 +1254,6 @@ def _get_column_aggregate( default_value: int = 0, ) -> int: - from pyspark.sql import SparkSession from pyspark.sql.functions import approx_count_distinct from pyspark.sql import functions as F @@ -1257,7 +1262,7 @@ def _get_column_aggregate( lakehouse_id = resolve_lakehouse_id(lakehouse, workspace) path = create_abfss_path(lakehouse_id, workspace_id, table_name) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() df = spark.read.format("delta").load(path) if function in {"COUNTDISTINCT", "DISTINCTCOUNT"}: @@ -1591,3 +1596,43 @@ def _print_success(item_name, item_type, workspace_name, action="created"): ) else: raise NotImplementedError + + +def _pure_python_notebook() -> bool: + + from sempy.fabric._environment import _on_jupyter + + return _on_jupyter() + + +def _create_spark_session(): + + if _pure_python_notebook(): + raise ValueError( + f"{icons.red_dot} This function is only available in a PySpark notebook." + ) + + from pyspark.sql import SparkSession + + return SparkSession.builder.getOrCreate() + + +def _read_delta_table(path: str) -> pd.DataFrame: + + spark = _create_spark_session() + + return spark.read.format("delta").load(path) + + +def _delta_table_row_count(table_name: str) -> int: + + spark = _create_spark_session() + + return spark.table(table_name).count() + + +def _run_spark_sql_query(query): + + spark = _create_spark_session() + + return spark.sql(query) diff --git a/src/sempy_labs/_list_functions.py b/src/sempy_labs/_list_functions.py index 5d7201bb..acbd4777 100644 --- a/src/sempy_labs/_list_functions.py +++ b/src/sempy_labs/_list_functions.py @@ -9,6 +9,7 @@ _update_dataframe_datatypes, _base_api, _create_dataframe, + _run_spark_sql_query, ) from sempy._utils._log import log import pandas as pd @@ -584,14 +585,12 @@ def list_columns( query = f"{query} FROM {lakehouse}.{lakeTName}" sql_statements.append((table_name, query)) - spark = SparkSession.builder.getOrCreate() - for o in sql_statements: tName = o[0] query = o[1] # Run the query - df = spark.sql(query) + df = _run_spark_sql_query(query) for column in df.columns: x = df.collect()[0][column] diff --git a/src/sempy_labs/_model_bpa.py b/src/sempy_labs/_model_bpa.py index 7d5b8a04..ac94f719 100644 --- a/src/sempy_labs/_model_bpa.py +++ b/src/sempy_labs/_model_bpa.py @@ -14,6 +14,7 @@ get_language_codes, _get_column_aggregate, resolve_workspace_name_and_id, + _create_spark_session, ) from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached from sempy_labs.tom import connect_semantic_model @@ -181,7 +182,6 @@ def translate_using_po(rule_file): def translate_using_spark(rule_file): from synapse.ml.services import Translate - from pyspark.sql import SparkSession rules_temp = rule_file.copy() rules_temp = rules_temp.drop(["Expression", "URL", "Severity"], axis=1) @@ -195,7 +195,7 @@ def translate_using_spark(rule_file): ] ) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() dfRules = spark.createDataFrame(rules_temp, schema) columns = ["Category", "Rule Name", "Description"] diff --git a/src/sempy_labs/_translations.py b/src/sempy_labs/_translations.py index b15dd591..e079bd97 100644 --- a/src/sempy_labs/_translations.py +++ b/src/sempy_labs/_translations.py @@ -5,6 +5,7 @@ import sempy_labs._icons as icons from sempy_labs._helper_functions import ( get_language_codes, + _create_spark_session, ) from uuid import UUID @@ -40,7 +41,6 @@ def translate_semantic_model( from synapse.ml.services import Translate from pyspark.sql.functions import col, flatten - from pyspark.sql import SparkSession from sempy_labs.tom import connect_semantic_model icons.sll_tags.append("TranslateSemanticModel") @@ -145,7 +145,7 @@ def _clean_text(text, exclude_chars): [df_prep, pd.DataFrame(new_data, index=[0])], ignore_index=True ) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() df = spark.createDataFrame(df_prep) columns = ["Name", "Description", "Display Folder"] diff --git a/src/sempy_labs/_vertipaq.py b/src/sempy_labs/_vertipaq.py index bcee3102..1f143040 100644 --- a/src/sempy_labs/_vertipaq.py +++ b/src/sempy_labs/_vertipaq.py @@ -6,7 +6,6 @@ import shutil import datetime import warnings -from pyspark.sql import SparkSession from sempy_labs._helper_functions import ( format_dax_object_name, resolve_lakehouse_name, @@ -15,6 +14,7 @@ _get_column_aggregate, resolve_workspace_name_and_id, resolve_dataset_name_and_id, + _create_spark_session, ) from sempy_labs._list_functions import list_relationships, list_tables from sempy_labs.lakehouse import lakehouse_attached, get_lakehouse_tables @@ -197,7 +197,7 @@ def vertipaq_analyzer( ) sql_statements = [] - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() # Loop through tables for lakeTName in dfC_flt["Query"].unique(): query = "SELECT " @@ -275,7 +275,7 @@ def vertipaq_analyzer( dfR.rename(columns={"Source": "To Lake Column"}, inplace=True) dfR.drop(columns=["Column Object"], inplace=True) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() for i, r in dfR.iterrows(): fromTable = r["From Lake Table"] fromColumn = r["From Lake Column"] diff --git a/src/sempy_labs/_warehouses.py b/src/sempy_labs/_warehouses.py index d804a874..0e1f57d9 100644 --- a/src/sempy_labs/_warehouses.py +++ b/src/sempy_labs/_warehouses.py @@ -93,7 +93,7 @@ def list_warehouses(workspace: Optional[str | UUID] = None) -> pd.DataFrame: (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) responses = _base_api( - reqeust=f"/v1/workspaces/{workspace_id}/warehouses", uses_pagination=True + request=f"/v1/workspaces/{workspace_id}/warehouses", uses_pagination=True ) for r in responses: diff --git a/src/sempy_labs/lakehouse/_get_lakehouse_columns.py b/src/sempy_labs/lakehouse/_get_lakehouse_columns.py index 4f551186..56f3bdb4 100644 --- a/src/sempy_labs/lakehouse/_get_lakehouse_columns.py +++ b/src/sempy_labs/lakehouse/_get_lakehouse_columns.py @@ -1,10 +1,10 @@ import pandas as pd -from pyspark.sql import SparkSession from sempy_labs._helper_functions import ( format_dax_object_name, resolve_workspace_name_and_id, resolve_lakehouse_name_and_id, _create_dataframe, + _create_spark_session, ) from typing import Optional from sempy._utils._log import log @@ -51,7 +51,7 @@ def get_lakehouse_columns( lakehouse=lakehouse, workspace=workspace_id ) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() tables = get_lakehouse_tables( lakehouse=lakehouse_id, workspace=workspace_id, extended=False, count_rows=False diff --git a/src/sempy_labs/lakehouse/_get_lakehouse_tables.py b/src/sempy_labs/lakehouse/_get_lakehouse_tables.py index 04df473d..73ac8d9b 100644 --- a/src/sempy_labs/lakehouse/_get_lakehouse_tables.py +++ b/src/sempy_labs/lakehouse/_get_lakehouse_tables.py @@ -1,6 +1,5 @@ import sempy.fabric as fabric import pandas as pd -from pyspark.sql import SparkSession import pyarrow.parquet as pq import datetime from sempy_labs._helper_functions import ( @@ -10,6 +9,7 @@ save_as_delta_table, _base_api, _create_dataframe, + _create_spark_session, ) from sempy_labs.directlake._guardrails import ( get_sku_size, @@ -112,7 +112,7 @@ def get_lakehouse_tables( if extended: sku_value = get_sku_size(workspace_id) guardrail = get_directlake_guardrails_for_sku(sku_value) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() df["Files"] = None df["Row Groups"] = None df["Table Size"] = None diff --git a/src/sempy_labs/lakehouse/_lakehouse.py b/src/sempy_labs/lakehouse/_lakehouse.py index 4c241869..9b385843 100644 --- a/src/sempy_labs/lakehouse/_lakehouse.py +++ b/src/sempy_labs/lakehouse/_lakehouse.py @@ -6,6 +6,7 @@ _base_api, resolve_lakehouse_name_and_id, resolve_workspace_name_and_id, + _create_spark_session, ) import sempy_labs._icons as icons import re @@ -54,7 +55,6 @@ def optimize_lakehouse_tables( or if no lakehouse attached, resolves to the workspace of the notebook. """ - from pyspark.sql import SparkSession from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables from delta import DeltaTable @@ -69,7 +69,7 @@ def optimize_lakehouse_tables( else: tables_filt = lakeTablesDelta.copy() - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() for _, r in (bar := tqdm(tables_filt.iterrows())): tableName = r["Table Name"] @@ -122,7 +122,7 @@ def vacuum_lakehouse_tables( else: tables_filt = lakeTablesDelta.copy() - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() spark.conf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true") for _, r in (bar := tqdm(tables_filt.iterrows())): diff --git a/src/sempy_labs/lakehouse/_shortcuts.py b/src/sempy_labs/lakehouse/_shortcuts.py index 2afd3f65..6df63b82 100644 --- a/src/sempy_labs/lakehouse/_shortcuts.py +++ b/src/sempy_labs/lakehouse/_shortcuts.py @@ -219,7 +219,7 @@ def delete_shortcut( ) -def reset_shortcut_cache(workspace: Optional[str | UUID]): +def reset_shortcut_cache(workspace: Optional[str | UUID] = None): """ Deletes any cached files that were stored while reading from shortcuts. diff --git a/src/sempy_labs/migration/_migrate_calctables_to_lakehouse.py b/src/sempy_labs/migration/_migrate_calctables_to_lakehouse.py index 49ea7990..9abc9547 100644 --- a/src/sempy_labs/migration/_migrate_calctables_to_lakehouse.py +++ b/src/sempy_labs/migration/_migrate_calctables_to_lakehouse.py @@ -9,9 +9,9 @@ create_abfss_path, retry, generate_guid, + _create_spark_session, ) from sempy_labs.tom import connect_semantic_model -from pyspark.sql import SparkSession from typing import Optional from sempy._utils._log import log import sempy_labs._icons as icons @@ -98,7 +98,7 @@ def migrate_calc_tables_to_lakehouse( if killFunction: return - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() if len(dfP_filt) == 0: print( diff --git a/src/sempy_labs/migration/_refresh_calc_tables.py b/src/sempy_labs/migration/_refresh_calc_tables.py index b921a3ff..cdb3a151 100644 --- a/src/sempy_labs/migration/_refresh_calc_tables.py +++ b/src/sempy_labs/migration/_refresh_calc_tables.py @@ -2,7 +2,6 @@ import pandas as pd import re from sempy_labs._helper_functions import retry -from pyspark.sql import SparkSession from sempy_labs.tom import connect_semantic_model from typing import Optional from sempy._utils._log import log @@ -11,6 +10,7 @@ from sempy_labs._helper_functions import ( resolve_workspace_name_and_id, resolve_dataset_name_and_id, + _create_spark_session, ) @@ -29,7 +29,7 @@ def refresh_calc_tables(dataset: str | UUID, workspace: Optional[str | UUID] = N or if no lakehouse attached, resolves to the workspace of the notebook. """ - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) (dataset_name, dataset_id) = resolve_dataset_name_and_id(dataset, workspace_id) icons.sll_tags.append("DirectLakeMigration") diff --git a/src/sempy_labs/report/_report_functions.py b/src/sempy_labs/report/_report_functions.py index 1c16b9e8..7118de0a 100644 --- a/src/sempy_labs/report/_report_functions.py +++ b/src/sempy_labs/report/_report_functions.py @@ -19,6 +19,7 @@ resolve_dataset_id, _update_dataframe_datatypes, _base_api, + _create_spark_session, ) from typing import List, Optional, Union from sempy._utils._log import log @@ -726,7 +727,6 @@ def translate_report_titles( or if no lakehouse attached, resolves to the workspace of the notebook. """ from synapse.ml.services import Translate - from pyspark.sql import SparkSession (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) @@ -738,7 +738,7 @@ def translate_report_titles( reportJson = get_report_json(report=report, workspace=workspace_id) dfV = list_report_visuals(report=report, workspace=workspace_id) - spark = SparkSession.builder.getOrCreate() + spark = _create_spark_session() df = spark.createDataFrame(dfV) columnToTranslate = "Title"