Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added _pure_python_notebook #443

Merged
merged 7 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/sempy_labs/_ai.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import sempy
import sempy.fabric as fabric
import pandas as pd
from pyspark.sql import SparkSession
from typing import List, Optional, Union
from IPython.display import display
import sempy_labs._icons as icons
from sempy_labs._helper_functions import (
_read_delta_table,
_run_spark_sql_query,
)


def optimize_semantic_model(dataset: str, workspace: Optional[str] = None):
Expand Down Expand Up @@ -186,13 +189,13 @@ def generate_aggs(

query = query[:-1]

spark = SparkSession.builder.getOrCreate()
fromTablePath = create_abfss_path(
lakehouse_id=lakehouse_id,
lakehouse_workspace_id=lakehouse_workspace_id,
delta_table_name=lakeTName,
)
df = spark.read.format("delta").load(fromTablePath)

df = _read_delta_table(fromTablePath)
tempTableName = f"delta_table_{lakeTName}"
df.createOrReplaceTempView(tempTableName)
sqlQuery = f"{query} \n FROM {tempTableName} {groupBy}"
Expand All @@ -201,7 +204,7 @@ def generate_aggs(
print(sqlQuery)

# Save query to spark dataframe
spark_df = spark.sql(sqlQuery)
spark_df = _run_spark_sql_query(sqlQuery)
f"\nCreating/updating the '{aggLakeTName}' table in the lakehouse..."
# Write spark dataframe to delta table
aggFilePath = create_abfss_path(
Expand Down Expand Up @@ -419,7 +422,7 @@ def generate_aggs(
# dfP = fabric.list_partitions(dataset = dataset, workspace = workspace)
# isDirectLake = any(r['Mode'] == 'DirectLake' for i, r in dfP.iterrows())

# spark = SparkSession.builder.getOrCreate()
# spark = _create_spark_session()
# views = spark.sql(f"SHOW VIEWS IN {lakehouse}").collect()
# for view in views:
# viewName = view['viewName']
Expand Down
78 changes: 48 additions & 30 deletions src/sempy_labs/_delta_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pandas as pd
import datetime
from typing import Dict
from typing import Dict, Optional
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
from sempy_labs._helper_functions import (
create_abfss_path,
save_as_delta_table,
Expand All @@ -12,19 +11,24 @@
_update_dataframe_datatypes,
resolve_workspace_name_and_id,
resolve_lakehouse_name_and_id,
_read_delta_table,
_delta_table_row_count,
)
from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables
from sempy_labs.lakehouse._lakehouse import lakehouse_attached
import sempy_labs._icons as icons
from uuid import UUID


def delta_analyzer(
table_name: str,
approx_distinct_count: bool = True,
export: bool = False,
lakehouse: Optional[str | UUID] = None,
workspace: Optional[str | UUID] = None,
) -> Dict[str, pd.DataFrame]:
"""
Analyzes a delta table and shows the results in dictionary containing a set of 5 dataframes. The table being analyzed must be in the lakehouse attached to the notebook.
Analyzes a delta table and shows the results in dictionary containing a set of 5 dataframes. If 'export' is set to True, the results will be saved to delta tables in the lakehouse attached to the notebook.

The 5 dataframes returned by this function are:

Expand All @@ -44,26 +48,52 @@ def delta_analyzer(
If True, uses approx_count_distinct to calculate the cardinality of each column. If False, uses COUNT(DISTINCT) instead.
export : bool, default=False
If True, exports the resulting dataframes to delta tables in the lakehouse attached to the notebook.
lakehouse : str | uuid.UUID, default=None
The Fabric lakehouse name or ID.
Defaults to None which resolves to the lakehouse attached to the notebook.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID used by the lakehouse.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.

Returns
-------
Dict[str, pandas.DataFrame]
A dictionary of pandas dataframes showing semantic model objects which violated the best practice analyzer rules.
"""
import notebookutils

if not lakehouse_attached():
raise ValueError(
f"{icons.red_dot} No lakehouse is attached to this notebook. Please attach a lakehouse to the notebook before running the Delta Analyzer."
)
display_toggle = notebookutils.common.configs.pandas_display

# Turn off notebookutils display
if display_toggle is True:
notebookutils.common.configs.pandas_display = False

prefix = "SLL_DeltaAnalyzer_"
now = datetime.datetime.now()
(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace=None)
(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace=workspace)
(lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(
lakehouse=None, workspace=None
lakehouse=lakehouse, workspace=workspace
)
path = create_abfss_path(lakehouse_id, workspace_id, table_name)
table_path = f"/lakehouse/default/Tables/{table_name}"
lake_path = create_abfss_path(lakehouse_id, workspace_id)
mounts = notebookutils.fs.mounts()
mount_point = f"/{workspace_name.replace(' ', '')}{lakehouse_name.replace(' ', '')}"
if not any(i.get("source") == lake_path for i in mounts):
# Mount lakehouse if not mounted
notebookutils.fs.mount(lake_path, mount_point)
print(
f"{icons.green_dot} Mounted the '{lakehouse_name}' lakehouse within the '{workspace_name}' to the notebook."
)

mounts = notebookutils.fs.mounts()
local_path = next(
i.get("localPath") for i in mounts if i.get("source") == lake_path
)
table_path = f"{local_path}/Tables/{table_name}"

# Set back to original value
notebookutils.common.configs.pandas_display = display_toggle

parquet_file_df_columns = {
"ParquetFile": "string",
Expand Down Expand Up @@ -95,39 +125,21 @@ def delta_analyzer(
row_group_df = _create_dataframe(columns=row_group_df_columns)
column_chunk_df = _create_dataframe(columns=column_chunk_df_columns)

spark = SparkSession.builder.getOrCreate()
# delta_table = DeltaTable.forPath(spark, path)
# detail_df = spark.sql(f"DESCRIBE DETAIL `{table_name}`").collect()[0]

# num_files = detail_df.numFiles
# size_in_bytes = detail_df.sizeInBytes

latest_files = spark.read.format("delta").load(path).inputFiles()
latest_files = _read_delta_table(path).inputFiles()
file_paths = [f.split("/")[-1] for f in latest_files]
row_count = spark.table(table_name).count()
row_count = _delta_table_row_count(table_name)
row_groups = 0
max_rows_per_row_group = 0
min_rows_per_row_group = float("inf")
# dt = DeltaTable.forPath(spark, path)
# schema = dt.toDF().schema
# is_vorder = False
# if (
# dt.detail()
# .collect()[0]
# .asDict()
# .get("properties")
# .get("delta.parquet.vorder.enabled")
# == "true"
# ):
# is_vorder = True

schema = ds.dataset(table_path).schema.metadata
is_vorder = any(b"vorder" in key for key in schema.keys())
# v_order_level = (
# int(schema.get(b"com.microsoft.parquet.vorder.level").decode("utf-8"))
# if is_vorder
# else None
# )

for file_name in file_paths:
parquet_file = pq.ParquetFile(f"{table_path}/{file_name}")
Expand Down Expand Up @@ -236,13 +248,15 @@ def delta_analyzer(
column_name=col_name,
function="approx",
lakehouse=lakehouse_name,
workspace=workspace,
)
else:
dc = _get_column_aggregate(
table_name=table_name,
column_name=col_name,
function="distinctcount",
lakehouse=lakehouse_name,
workspace=workspace,
)

if "Cardinality" not in column_df.columns:
Expand All @@ -264,6 +278,10 @@ def delta_analyzer(
save_table = f"{prefix}Summary"

if export:
if not lakehouse_attached():
raise ValueError(
f"{icons.red_dot} No lakehouse is attached to this notebook. Please attach a lakehouse to the notebook before running the Delta Analyzer."
)
dfL = get_lakehouse_tables()
dfL_filt = dfL[dfL["Table Name"] == save_table]
if dfL_filt.empty:
Expand Down
59 changes: 52 additions & 7 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def _build_url(url: str, params: dict) -> str:


def create_abfss_path(
lakehouse_id: UUID, lakehouse_workspace_id: UUID, delta_table_name: str
lakehouse_id: UUID,
lakehouse_workspace_id: UUID,
delta_table_name: Optional[str] = None,
) -> str:
"""
Creates an abfss path for a delta table in a Fabric lakehouse.
Expand All @@ -42,18 +44,22 @@ def create_abfss_path(
ID of the Fabric lakehouse.
lakehouse_workspace_id : uuid.UUID
ID of the Fabric workspace.
delta_table_name : str
delta_table_name : str, default=None
Name of the delta table name.

Returns
-------
str
An abfss path which can be used to save/reference a delta table in a Fabric lakehouse.
An abfss path which can be used to save/reference a delta table in a Fabric lakehouse or lakehouse.
"""

fp = _get_default_file_path()
path = f"abfss://{lakehouse_workspace_id}@{fp}/{lakehouse_id}"

if delta_table_name is not None:
path += f"/Tables/{delta_table_name}"

return f"abfss://{lakehouse_workspace_id}@{fp}/{lakehouse_id}/Tables/{delta_table_name}"
return path


def _get_default_file_path() -> str:
Expand Down Expand Up @@ -539,7 +545,7 @@ def save_as_delta_table(
)

dataframe.columns = dataframe.columns.str.replace(" ", "_")
spark = SparkSession.builder.getOrCreate()
spark = _create_spark_session()

type_mapping = {
"string": StringType(),
Expand Down Expand Up @@ -1248,7 +1254,6 @@ def _get_column_aggregate(
default_value: int = 0,
) -> int:

from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct
from pyspark.sql import functions as F

Expand All @@ -1257,7 +1262,7 @@ def _get_column_aggregate(
lakehouse_id = resolve_lakehouse_id(lakehouse, workspace)
path = create_abfss_path(lakehouse_id, workspace_id, table_name)

spark = SparkSession.builder.getOrCreate()
spark = _create_spark_session()
df = spark.read.format("delta").load(path)

if function in {"COUNTDISTINCT", "DISTINCTCOUNT"}:
Expand Down Expand Up @@ -1591,3 +1596,43 @@ def _print_success(item_name, item_type, workspace_name, action="created"):
)
else:
raise NotImplementedError


def _pure_python_notebook() -> bool:

from sempy.fabric._environment import _on_jupyter

return _on_jupyter()


def _create_spark_session():

if _pure_python_notebook():
raise ValueError(
f"{icons.red_dot} This function is only available in a PySpark notebook."
)

from pyspark.sql import SparkSession

return SparkSession.builder.getOrCreate()


def _read_delta_table(path: str) -> pd.DataFrame:

spark = _create_spark_session()

return spark.read.format("delta").load(path)


def _delta_table_row_count(table_name: str) -> int:

spark = _create_spark_session()

return spark.table(table_name).count()


def _run_spark_sql_query(query):

spark = _create_spark_session()

return spark.sql(query)
5 changes: 2 additions & 3 deletions src/sempy_labs/_list_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
_update_dataframe_datatypes,
_base_api,
_create_dataframe,
_run_spark_sql_query,
)
from sempy._utils._log import log
import pandas as pd
Expand Down Expand Up @@ -584,14 +585,12 @@ def list_columns(
query = f"{query} FROM {lakehouse}.{lakeTName}"
sql_statements.append((table_name, query))

spark = SparkSession.builder.getOrCreate()

for o in sql_statements:
tName = o[0]
query = o[1]

# Run the query
df = spark.sql(query)
df = _run_spark_sql_query(query)

for column in df.columns:
x = df.collect()[0][column]
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_model_bpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
get_language_codes,
_get_column_aggregate,
resolve_workspace_name_and_id,
_create_spark_session,
)
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
from sempy_labs.tom import connect_semantic_model
Expand Down Expand Up @@ -181,7 +182,6 @@ def translate_using_po(rule_file):
def translate_using_spark(rule_file):

from synapse.ml.services import Translate
from pyspark.sql import SparkSession

rules_temp = rule_file.copy()
rules_temp = rules_temp.drop(["Expression", "URL", "Severity"], axis=1)
Expand All @@ -195,7 +195,7 @@ def translate_using_spark(rule_file):
]
)

spark = SparkSession.builder.getOrCreate()
spark = _create_spark_session()
dfRules = spark.createDataFrame(rules_temp, schema)

columns = ["Category", "Rule Name", "Description"]
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_translations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sempy_labs._icons as icons
from sempy_labs._helper_functions import (
get_language_codes,
_create_spark_session,
)
from uuid import UUID

Expand Down Expand Up @@ -40,7 +41,6 @@ def translate_semantic_model(

from synapse.ml.services import Translate
from pyspark.sql.functions import col, flatten
from pyspark.sql import SparkSession
from sempy_labs.tom import connect_semantic_model

icons.sll_tags.append("TranslateSemanticModel")
Expand Down Expand Up @@ -145,7 +145,7 @@ def _clean_text(text, exclude_chars):
[df_prep, pd.DataFrame(new_data, index=[0])], ignore_index=True
)

spark = SparkSession.builder.getOrCreate()
spark = _create_spark_session()
df = spark.createDataFrame(df_prep)

columns = ["Name", "Description", "Display Folder"]
Expand Down
Loading