Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added workspace monitoring functions #297

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/sempy_labs/_authentication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Literal
from sempy.fabric._token_provider import TokenProvider
import notebookutils
from azure.identity import ClientSecretCredential


Expand Down Expand Up @@ -72,6 +71,8 @@ def with_azure_key_vault(
sempy.fabric.TokenProvider
Token provider to be used with FabricRestClient or PowerBIRestClient.
"""
import notebookutils

tenant_id = notebookutils.credentials.getSecret(
key_vault_uri, key_vault_tenant_id
)
Expand Down
101 changes: 69 additions & 32 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,51 @@ def resolve_report_name(report_id: UUID, workspace: Optional[str] = None) -> str
return obj


def resolve_item_name_and_id(
item: str | UUID, type: Optional[str] = None, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

if _is_valid_uuid(item):
item_id = item
item_name = fabric.resolve_item_name(
item_id=item_id, type=type, workspace=workspace_id
)
else:
if type is None:
raise ValueError(
f"{icons.warning} Must specify a 'type' if specifying a name as the 'item'."
)
item_name = item
item_id = fabric.resolve_item_id(
item_name=item, type=type, workspace=workspace_id
)

return item_name, item_id


def resolve_lakehouse_name_and_id(lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None) -> Tuple[str, UUID]:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse_name = fabric.resolve_item_name(item_id=lakehouse_id, type="Lakehouse", workspace=workspace_id)
elif _is_valid_uuid(lakehouse):
lakehouse_id = lakehouse
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type="Lakehouse", workspace=workspace_id
)
else:
lakehouse_name = lakehouse
lakehouse_id = fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace_id
)

return lakehouse_name, lakehouse_id


def resolve_dataset_id(dataset: str, workspace: Optional[str] = None) -> UUID:
"""
Obtains the ID of the semantic model.
Expand Down Expand Up @@ -395,8 +440,8 @@ def save_as_delta_table(
write_mode: str,
merge_schema: bool = False,
schema: Optional[dict] = None,
lakehouse: Optional[str] = None,
workspace: Optional[str] = None,
lakehouse: Optional[str | UUID] = None,
workspace: Optional[str | UUID] = None,
):
"""
Saves a pandas dataframe as a delta table in a Fabric lakehouse.
Expand All @@ -413,11 +458,11 @@ def save_as_delta_table(
Merges the schemas of the dataframe to the delta table.
schema : dict, default=None
A dictionary showing the schema of the columns and their data types.
lakehouse : str, default=None
The Fabric lakehouse used by the Direct Lake semantic model.
lakehouse : str | uuid.UUID, default=None
The Fabric lakehouse name or ID.
Defaults to None which resolves to the lakehouse attached to the notebook.
workspace : str, default=None
The Fabric workspace name.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
"""
Expand All @@ -436,26 +481,15 @@ def save_as_delta_table(
TimestampType,
)

if workspace is None:
workspace_id = fabric.get_workspace_id()
workspace = fabric.resolve_workspace_name(workspace_id)
else:
workspace_id = fabric.resolve_workspace_id(workspace)

if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse = resolve_lakehouse_name(
lakehouse_id=lakehouse_id, workspace=workspace
)
else:
lakehouse_id = resolve_lakehouse_id(lakehouse, workspace)
(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
(lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(lakehouse=lakehouse, workspace=workspace_id)

writeModes = ["append", "overwrite"]
write_modes = ["append", "overwrite"]
write_mode = write_mode.lower()

if write_mode not in writeModes:
if write_mode not in write_modes:
raise ValueError(
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {writeModes}."
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {write_modes}."
)

if " " in delta_table_name:
Expand All @@ -480,16 +514,19 @@ def save_as_delta_table(
"timestamp": TimestampType(),
}

if schema is None:
spark_df = spark.createDataFrame(dataframe)
if isinstance(dataframe, pd.DataFrame):
if schema is None:
spark_df = spark.createDataFrame(dataframe)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
spark_df = dataframe

filePath = create_abfss_path(
lakehouse_id=lakehouse_id,
Expand All @@ -504,7 +541,7 @@ def save_as_delta_table(
else:
spark_df.write.mode(write_mode).format("delta").save(filePath)
print(
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace}' workspace."
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse_name}' lakehouse within the '{workspace_name}' workspace."
)


Expand Down
13 changes: 13 additions & 0 deletions src/sempy_labs/_kql_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
pagination,
)
from sempy.fabric.exceptions import FabricHTTPException
from uuid import UUID


def list_kql_databases(workspace: Optional[str] = None) -> pd.DataFrame:
Expand Down Expand Up @@ -138,3 +139,15 @@ def delete_kql_database(name: str, workspace: Optional[str] = None):
print(
f"{icons.green_dot} The '{name}' KQL database within the '{workspace}' workspace has been deleted."
)


def _resolve_cluster_uri(workspace: Optional[str | UUID] = None) -> str:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
dfK = list_kql_databases(workspace=workspace_id)
dfK_filt = dfK[dfK["KQL Database Name"] == "Monitoring KQL database"]
if len(dfK_filt) == 0:
raise ValueError(
f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace."
)
return dfK_filt["Query Service URI"].iloc[0]
Loading