Skip to content

Commit c7d276a

Browse files
committed
Merge branch 'm-kovalsky/deltaanalyzer'
2 parents cd0b05f + 6cee0b9 commit c7d276a

File tree

5 files changed

+526
-9
lines changed

5 files changed

+526
-9
lines changed

src/sempy_labs/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
list_item_job_instances,
77
list_item_schedules,
88
)
9+
from sempy_labs._delta_analyzer import delta_analyzer
910
from sempy_labs._gateways import (
1011
list_gateway_members,
1112
list_gateway_role_assigments,
@@ -203,6 +204,8 @@
203204
evaluate_dax_impersonation,
204205
get_dax_query_dependencies,
205206
get_dax_query_memory_size,
207+
# dax_perf_test,
208+
# dax_perf_test_bulk,
206209
)
207210
from sempy_labs._generate_semantic_model import (
208211
create_blank_semantic_model,
@@ -505,4 +508,7 @@
505508
"list_synonyms",
506509
"list_graphql_apis",
507510
"create_graphql_api",
511+
"delta_analyzer",
512+
# "dax_perf_test",
513+
# "dax_perf_test_bulk",
508514
]

src/sempy_labs/_dax.py

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
format_dax_object_name,
66
resolve_dataset_name_and_id,
77
_base_api,
8+
generate_guid,
89
)
910
from sempy_labs._model_dependencies import get_model_calc_dependencies
10-
from typing import Optional, List
11+
from typing import Optional, List, Tuple
1112
from sempy._utils._log import log
1213
from uuid import UUID
1314
from sempy_labs.directlake._warm_cache import _put_columns_into_memory
15+
import sempy_labs._icons as icons
16+
import time
1417

1518

1619
@log
@@ -258,3 +261,219 @@ def get_dax_query_memory_size(
258261
)
259262

260263
return df["Total Size"].sum()
264+
265+
266+
@log
267+
def _dax_perf_test(
268+
dataset: str,
269+
dax_queries: dict,
270+
clear_cache_before_run: bool = False,
271+
refresh_type: Optional[str] = None,
272+
rest_time: int = 2,
273+
workspace: Optional[str] = None,
274+
) -> Tuple[pd.DataFrame, dict]:
275+
"""
276+
Runs a performance test on a set of DAX queries.
277+
278+
Parameters
279+
----------
280+
dataset : str
281+
Name of the semantic model.
282+
dax_queries : dict
283+
The dax queries to run in a dictionary format. Here is an example:
284+
{
285+
"Sales Amount Test", """ """ EVALUATE SUMMARIZECOLUMNS("Sales Amount", [Sales Amount]) """ """,
286+
"Order Quantity with Product", """ """ EVALUATE SUMMARIZECOLUMNS('Product'[Color], "Order Qty", [Order Qty]) """ """,
287+
}
288+
clear_cache_before_run : bool, default=False
289+
refresh_type : str, default=None
290+
rest_time : int, default=2
291+
Rest time (in seconds) between the execution of each DAX query.
292+
workspace : str, default=None
293+
The Fabric workspace name.
294+
Defaults to None which resolves to the workspace of the attached lakehouse
295+
or if no lakehouse attached, resolves to the workspace of the notebook.
296+
297+
Returns
298+
-------
299+
Tuple[pandas.DataFrame, dict]
300+
A pandas dataframe showing the SQL profiler trace results of the DAX queries.
301+
A dictionary of the query results in pandas dataframes.
302+
"""
303+
from sempy_labs._refresh_semantic_model import refresh_semantic_model
304+
from sempy_labs._clear_cache import clear_cache
305+
306+
event_schema = {
307+
"QueryBegin": [
308+
"EventClass",
309+
"EventSubclass",
310+
"CurrentTime",
311+
"NTUserName",
312+
"TextData",
313+
"StartTime",
314+
"ApplicationName",
315+
],
316+
"QueryEnd": [
317+
"EventClass",
318+
"EventSubclass",
319+
"CurrentTime",
320+
"NTUserName",
321+
"TextData",
322+
"StartTime",
323+
"EndTime",
324+
"Duration",
325+
"CpuTime",
326+
"Success",
327+
"ApplicationName",
328+
],
329+
"VertiPaqSEQueryBegin": [
330+
"EventClass",
331+
"EventSubclass",
332+
"CurrentTime",
333+
"NTUserName",
334+
"TextData",
335+
"StartTime",
336+
],
337+
"VertiPaqSEQueryEnd": [
338+
"EventClass",
339+
"EventSubclass",
340+
"CurrentTime",
341+
"NTUserName",
342+
"TextData",
343+
"StartTime",
344+
"EndTime",
345+
"Duration",
346+
"CpuTime",
347+
"Success",
348+
],
349+
"VertiPaqSEQueryCacheMatch": [
350+
"EventClass",
351+
"EventSubclass",
352+
"CurrentTime",
353+
"NTUserName",
354+
"TextData",
355+
],
356+
}
357+
358+
# Add Execution Metrics
359+
event_schema["ExecutionMetrics"] = ["EventClass", "ApplicationName", "TextData"]
360+
# Add DAX Query Plan
361+
# event_schema["DAXQueryPlan"] = ["EventClass", "EventSubclass", "CurrentTime", "StartTime", "EndTime", "Duration", "CpuTime", "ApplicationName", "TextData"]
362+
363+
query_results = {}
364+
365+
# Establish trace connection
366+
with fabric.create_trace_connection(
367+
dataset=dataset, workspace=workspace
368+
) as trace_connection:
369+
with trace_connection.create_trace(event_schema) as trace:
370+
trace.start()
371+
print(f"{icons.in_progress} Starting performance testing...")
372+
# Loop through DAX queries
373+
for name, dax in dax_queries.items():
374+
375+
if clear_cache_before_run:
376+
clear_cache(dataset=dataset, workspace=workspace)
377+
if refresh_type is not None:
378+
refresh_semantic_model(
379+
dataset=dataset, workspace=workspace, refresh_type=refresh_type
380+
)
381+
382+
# EVALUATE {1} is used to initate a warm cache
383+
fabric.evaluate_dax(
384+
dataset=dataset, workspace=workspace, dax_string="""EVALUATE {1}"""
385+
)
386+
# Run DAX Query
387+
result = fabric.evaluate_dax(
388+
dataset=dataset, workspace=workspace, dax_string=dax
389+
)
390+
391+
# Add results to output
392+
query_results[name] = result
393+
394+
time.sleep(rest_time)
395+
print(f"{icons.green_dot} The '{name}' query has completed.")
396+
397+
df = trace.stop()
398+
# Allow time to collect trace results
399+
time.sleep(5)
400+
401+
# Step 1: Filter out unnecessary operations
402+
query_names = list(dax_queries.keys())
403+
df = df[
404+
~df["Application Name"].isin(["PowerBI", "PowerBIEIM"])
405+
& (~df["Text Data"].str.startswith("EVALUATE {1}"))
406+
]
407+
query_begin = df["Event Class"] == "QueryBegin"
408+
temp_column_name = "QueryName_INT"
409+
df = df.copy()
410+
df[temp_column_name] = query_begin.cumsum()
411+
df[temp_column_name] = (
412+
df[temp_column_name]
413+
.where(query_begin, None) # Assign None to non-query begin rows
414+
.ffill() # Forward fill None values
415+
.astype("Int64") # Use pandas nullable integer type for numeric indices
416+
)
417+
418+
df.loc[df[temp_column_name].notna(), "Query Name"] = (
419+
df[temp_column_name]
420+
.dropna()
421+
.astype(int)
422+
.map(lambda x: query_names[x - 1])
423+
)
424+
df = df[df[temp_column_name] != None]
425+
df = df.drop(columns=[temp_column_name])
426+
427+
query_to_guid = {
428+
name: generate_guid() for name in df["Query Name"].unique()
429+
}
430+
df["Query ID"] = df["Query Name"].map(query_to_guid)
431+
432+
df = df.reset_index(drop=True)
433+
434+
return df, query_results
435+
436+
437+
def _dax_perf_test_bulk(
438+
mapping: dict,
439+
clear_cache_before_run: bool = False,
440+
refresh_type: Optional[str] = None,
441+
rest_time: int = 2,
442+
):
443+
"""
444+
mapping is something like this:
445+
446+
mapping = {
447+
"Workspace1": {
448+
"Dataset1": {
449+
"Query1": "EVALUATE ...",
450+
"Query2": "EVALUATE ...",
451+
},
452+
"Dataset2": {
453+
"Query3": "EVALUATE ...",
454+
"Query4": "EVALUATE ...",
455+
}
456+
},
457+
"Workspace2": {
458+
"Dataset3": {
459+
"Query5": "EVALUATE ...",
460+
"Query6": "EVALUATE ...",
461+
},
462+
"Dataset4": {
463+
"Query7": "EVALUATE ...",
464+
"Query8": "EVALUATE ...",
465+
}
466+
}
467+
}
468+
"""
469+
470+
for workspace, datasets in mapping.items():
471+
for dataset, queries in datasets.items():
472+
_dax_perf_test(
473+
dataset=dataset,
474+
dax_queries=queries,
475+
clear_cache_before_run=clear_cache_before_run,
476+
refresh_type=refresh_type,
477+
rest_time=rest_time,
478+
workspace=workspace,
479+
)

0 commit comments

Comments
 (0)