Skip to content

Commit 6dd0fd7

Browse files
Bug fix: use FULL OUTER JOIN for dimension-only queries (#863)
1 parent 45a970d commit 6dd0fd7

File tree

37 files changed

+1878
-67
lines changed

37 files changed

+1878
-67
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Use FULL OUTER JOIN for dimension-only queries.
3+
time: 2023-11-10T16:20:09.530487-08:00
4+
custom:
5+
Author: courtneyholcomb
6+
Issue: "863"

metricflow/dataflow/builder/dataflow_plan_builder.py

Lines changed: 9 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -80,49 +80,7 @@ class DataflowRecipe:
8080
@property
8181
def join_targets(self) -> List[JoinDescription]:
8282
"""Joins to be made to source node."""
83-
join_targets = []
84-
for join_recipe in self.join_linkable_instances_recipes:
85-
# Figure out what elements to filter from the joined node.
86-
87-
# Sanity check - all linkable specs should have a link, or else why would we be joining them.
88-
assert all([len(x.entity_links) > 0 for x in join_recipe.satisfiable_linkable_specs])
89-
90-
# If we're joining something in, then we need the associated entity, partitions, and time dimension
91-
# specs defining the validity window (if necessary)
92-
include_specs: List[LinkableInstanceSpec] = [
93-
LinklessEntitySpec.from_reference(x.entity_links[0]) for x in join_recipe.satisfiable_linkable_specs
94-
]
95-
include_specs.extend([x.node_to_join_dimension_spec for x in join_recipe.join_on_partition_dimensions])
96-
include_specs.extend(
97-
[x.node_to_join_time_dimension_spec for x in join_recipe.join_on_partition_time_dimensions]
98-
)
99-
if join_recipe.validity_window:
100-
include_specs.extend(
101-
[
102-
join_recipe.validity_window.window_start_dimension,
103-
join_recipe.validity_window.window_end_dimension,
104-
]
105-
)
106-
107-
# satisfiable_linkable_specs describes what can be satisfied after the join, so remove the entity
108-
# link when filtering before the join.
109-
# e.g. if the node is used to satisfy "user_id__country", then the node must have the entity
110-
# "user_id" and the "country" dimension so that it can be joined to the measure node.
111-
include_specs.extend([x.without_first_entity_link for x in join_recipe.satisfiable_linkable_specs])
112-
filtered_node_to_join = FilterElementsNode(
113-
parent_node=join_recipe.node_to_join,
114-
include_specs=InstanceSpecSet.create_from_linkable_specs(include_specs),
115-
)
116-
join_targets.append(
117-
JoinDescription(
118-
join_node=filtered_node_to_join,
119-
join_on_entity=join_recipe.join_on_entity,
120-
join_on_partition_dimensions=join_recipe.join_on_partition_dimensions,
121-
join_on_partition_time_dimensions=join_recipe.join_on_partition_time_dimensions,
122-
validity_window=join_recipe.validity_window,
123-
)
124-
)
125-
return join_targets
83+
return [join_recipe.join_description for join_recipe in self.join_linkable_instances_recipes]
12684

12785

12886
@dataclass(frozen=True)
@@ -485,13 +443,15 @@ def _find_dataflow_recipe(
485443
potential_source_nodes: Sequence[BaseOutput] = self._select_source_nodes_with_measures(
486444
measure_specs=set(measure_spec_properties.measure_specs), source_nodes=source_nodes
487445
)
446+
default_join_type = SqlJoinType.LEFT_OUTER
488447
else:
489448
# Only read nodes can be source nodes for queries without measures
490449
source_nodes = self._read_nodes
491450
source_nodes_to_linkable_specs = self._select_read_nodes_with_linkable_specs(
492451
linkable_specs=linkable_spec_set, read_nodes=source_nodes
493452
)
494453
potential_source_nodes = list(source_nodes_to_linkable_specs.keys())
454+
default_join_type = SqlJoinType.FULL_OUTER
495455

496456
logger.info(f"There are {len(potential_source_nodes)} potential source nodes")
497457

@@ -518,7 +478,9 @@ def _find_dataflow_recipe(
518478
f"After removing unnecessary nodes, there are {len(nodes_available_for_joins)} nodes available for joins"
519479
)
520480
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs):
521-
nodes_available_for_joins = node_processor.add_multi_hop_joins(linkable_specs, source_nodes)
481+
nodes_available_for_joins = node_processor.add_multi_hop_joins(
482+
desired_linkable_specs=linkable_specs, nodes=source_nodes, join_type=default_join_type
483+
)
522484
logger.info(
523485
f"After adding multi-hop nodes, there are {len(nodes_available_for_joins)} nodes available for joins:\n"
524486
f"{pformat_big_objects(nodes_available_for_joins)}"
@@ -552,7 +514,9 @@ def _find_dataflow_recipe(
552514
logger.debug(f"Evaluating source node:\n{pformat_big_objects(source_node=dataflow_dag_as_text(node))}")
553515

554516
start_time = time.time()
555-
evaluation = node_evaluator.evaluate_node(start_node=node, required_linkable_specs=list(linkable_specs))
517+
evaluation = node_evaluator.evaluate_node(
518+
start_node=node, required_linkable_specs=list(linkable_specs), default_join_type=default_join_type
519+
)
556520
logger.info(f"Evaluation of {node} took {time.time() - start_time:.2f}s")
557521

558522
logger.debug(

metricflow/dataflow/builder/node_evaluator.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from metricflow.dataflow.builder.partitions import PartitionJoinResolver
2828
from metricflow.dataflow.dataflow_plan import (
2929
BaseOutput,
30+
FilterElementsNode,
3031
JoinDescription,
3132
PartitionDimensionJoinDescription,
3233
PartitionTimeDimensionJoinDescription,
@@ -37,10 +38,8 @@
3738
from metricflow.model.semantics.semantic_model_join_evaluator import SemanticModelJoinEvaluator
3839
from metricflow.plan_conversion.instance_converters import CreateValidityWindowJoinDescription
3940
from metricflow.protocols.semantics import SemanticModelAccessor
40-
from metricflow.specs.specs import (
41-
LinkableInstanceSpec,
42-
LinklessEntitySpec,
43-
)
41+
from metricflow.specs.specs import InstanceSpecSet, LinkableInstanceSpec, LinklessEntitySpec
42+
from metricflow.sql.sql_plan import SqlJoinType
4443

4544
logger = logging.getLogger(__name__)
4645

@@ -61,6 +60,8 @@ class JoinLinkableInstancesRecipe:
6160
# the linkable specs in the node that can help to satisfy the query. e.g. "user_id__country" might be one of the
6261
# "satisfiable_linkable_specs", but "country" is the linkable spec in the node.
6362
satisfiable_linkable_specs: List[LinkableInstanceSpec]
63+
# Join type to use when joining nodes
64+
join_type: SqlJoinType
6465

6566
# The partitions to join on, if there are matching partitions between the start_node and node_to_join.
6667
join_on_partition_dimensions: Tuple[PartitionDimensionJoinDescription, ...]
@@ -71,12 +72,36 @@ class JoinLinkableInstancesRecipe:
7172
@property
7273
def join_description(self) -> JoinDescription:
7374
"""The recipe as a join description to use in the dataflow plan node."""
75+
# Figure out what elements to filter from the joined node.
76+
include_specs: List[LinkableInstanceSpec] = []
77+
assert all([len(spec.entity_links) > 0 for spec in self.satisfiable_linkable_specs])
78+
include_specs.extend(
79+
[LinklessEntitySpec.from_reference(spec.entity_links[0]) for spec in self.satisfiable_linkable_specs]
80+
)
81+
82+
include_specs.extend([join.node_to_join_dimension_spec for join in self.join_on_partition_dimensions])
83+
include_specs.extend([join.node_to_join_time_dimension_spec for join in self.join_on_partition_time_dimensions])
84+
if self.validity_window:
85+
include_specs.extend(
86+
[self.validity_window.window_start_dimension, self.validity_window.window_end_dimension]
87+
)
88+
89+
# `satisfiable_linkable_specs` describes what can be satisfied after the join, so remove the entity
90+
# link when filtering before the join.
91+
# e.g. if the node is used to satisfy "user_id__country", then the node must have the entity
92+
# "user_id" and the "country" dimension so that it can be joined to the source node.
93+
include_specs.extend([spec.without_first_entity_link for spec in self.satisfiable_linkable_specs])
94+
filtered_node_to_join = FilterElementsNode(
95+
parent_node=self.node_to_join, include_specs=InstanceSpecSet.create_from_linkable_specs(include_specs)
96+
)
97+
7498
return JoinDescription(
75-
join_node=self.node_to_join,
99+
join_node=filtered_node_to_join,
76100
join_on_entity=self.join_on_entity,
77101
join_on_partition_dimensions=self.join_on_partition_dimensions,
78102
join_on_partition_time_dimensions=self.join_on_partition_time_dimensions,
79103
validity_window=self.validity_window,
104+
join_type=self.join_type,
80105
)
81106

82107

@@ -133,6 +158,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
133158
self,
134159
start_node_instance_set: InstanceSet,
135160
needed_linkable_specs: List[LinkableInstanceSpec],
161+
join_type: SqlJoinType,
136162
) -> List[JoinLinkableInstancesRecipe]:
137163
"""Get nodes that can be joined to get 1 or more of the "needed_linkable_specs".
138164
@@ -257,6 +283,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
257283
join_on_partition_dimensions=join_on_partition_dimensions,
258284
join_on_partition_time_dimensions=join_on_partition_time_dimensions,
259285
validity_window=validity_window_join_description,
286+
join_type=join_type,
260287
)
261288
)
262289

@@ -271,6 +298,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
271298
def _update_candidates_that_can_satisfy_linkable_specs(
272299
candidates_for_join: List[JoinLinkableInstancesRecipe],
273300
already_satisfisfied_linkable_specs: List[LinkableInstanceSpec],
301+
join_type: SqlJoinType,
274302
) -> List[JoinLinkableInstancesRecipe]:
275303
"""Update / filter candidates_for_join based on linkable instance specs that we have already satisfied.
276304
@@ -294,6 +322,7 @@ def _update_candidates_that_can_satisfy_linkable_specs(
294322
join_on_partition_dimensions=candidate_for_join.join_on_partition_dimensions,
295323
join_on_partition_time_dimensions=candidate_for_join.join_on_partition_time_dimensions,
296324
validity_window=candidate_for_join.validity_window,
325+
join_type=join_type,
297326
)
298327
)
299328
return sorted(
@@ -306,6 +335,7 @@ def evaluate_node(
306335
self,
307336
start_node: BaseOutput,
308337
required_linkable_specs: Sequence[LinkableInstanceSpec],
338+
default_join_type: SqlJoinType,
309339
) -> LinkableInstanceSatisfiabilityEvaluation:
310340
"""Evaluates if the "required_linkable_specs" can be realized by joining the "start_node" with other nodes.
311341
@@ -345,7 +375,9 @@ def evaluate_node(
345375
possibly_joinable_linkable_specs.append(required_linkable_spec)
346376

347377
candidates_for_join = self._find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
348-
start_node_instance_set=candidate_instance_set, needed_linkable_specs=possibly_joinable_linkable_specs
378+
start_node_instance_set=candidate_instance_set,
379+
needed_linkable_specs=possibly_joinable_linkable_specs,
380+
join_type=default_join_type,
349381
)
350382
join_candidates: List[JoinLinkableInstancesRecipe] = []
351383

@@ -378,6 +410,7 @@ def evaluate_node(
378410
candidates_for_join = self._update_candidates_that_can_satisfy_linkable_specs(
379411
candidates_for_join=candidates_for_join,
380412
already_satisfisfied_linkable_specs=next_candidate.satisfiable_linkable_specs,
413+
join_type=default_join_type,
381414
)
382415

383416
# The once possibly joinable specs are definitely joinable and no longer need to be searched for.

metricflow/dataflow/dataflow_plan.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ class JoinDescription:
250250

251251
join_node: BaseOutput
252252
join_on_entity: LinklessEntitySpec
253+
join_type: SqlJoinType
253254

254255
join_on_partition_dimensions: Tuple[PartitionDimensionJoinDescription, ...]
255256
join_on_partition_time_dimensions: Tuple[PartitionTimeDimensionJoinDescription, ...]
@@ -339,6 +340,7 @@ def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinToBase
339340
join_on_partition_dimensions=old_join_target.join_on_partition_dimensions,
340341
join_on_partition_time_dimensions=old_join_target.join_on_partition_time_dimensions,
341342
validity_window=old_join_target.validity_window,
343+
join_type=old_join_target.join_type,
342344
)
343345
for i, old_join_target in enumerate(self._join_targets)
344346
],

metricflow/plan_conversion/dataflow_to_sql.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,12 +402,9 @@ def visit_join_to_base_output_node(self, node: JoinToBaseOutputNode) -> SqlDataS
402402
# e.g. a data set has the dimension "listing__country_latest" and "listing" is a primary entity in the
403403
# data set. The next step would create an instance like "listing__listing__country_latest" without this
404404
# filter.
405-
406-
# logger.error(f"before filter is:\n{pformat_big_objects(right_data_set.instance_set.spec_set)}")
407405
right_data_set_instance_set_filtered = FilterLinkableInstancesWithLeadingLink(
408406
entity_link=join_on_entity,
409407
).transform(right_data_set.instance_set)
410-
# logger.error(f"after filter is:\n{pformat_big_objects(right_data_set_instance_set_filtered.spec_set)}")
411408

412409
# After the right data set is joined to the "from" data set, we need to change the links for some of the
413410
# instances that represent the right data set. For example, if the "from" data set contains the "bookings"

metricflow/plan_conversion/node_processor.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from metricflow.protocols.semantics import SemanticModelAccessor
2222
from metricflow.specs.spec_set_transforms import ToElementNameSet
2323
from metricflow.specs.specs import InstanceSpecSet, LinkableInstanceSpec, LinklessEntitySpec
24+
from metricflow.sql.sql_plan import SqlJoinType
2425

2526
logger = logging.getLogger(__name__)
2627

@@ -149,9 +150,7 @@ def _node_contains_entity(
149150
return False
150151

151152
def _get_candidates_nodes_for_multi_hop(
152-
self,
153-
desired_linkable_spec: LinkableInstanceSpec,
154-
nodes: Sequence[BaseOutput],
153+
self, desired_linkable_spec: LinkableInstanceSpec, nodes: Sequence[BaseOutput], join_type: SqlJoinType
155154
) -> Sequence[MultiHopJoinCandidate]:
156155
"""Assemble nodes representing all possible one-hop joins."""
157156
if len(desired_linkable_spec.entity_links) > MAX_JOIN_HOPS:
@@ -249,6 +248,7 @@ def _get_candidates_nodes_for_multi_hop(
249248
),
250249
join_on_partition_dimensions=join_on_partition_dimensions,
251250
join_on_partition_time_dimensions=join_on_partition_time_dimensions,
251+
join_type=join_type,
252252
)
253253
],
254254
),
@@ -276,16 +276,18 @@ def _get_candidates_nodes_for_multi_hop(
276276
return multi_hop_join_candidates
277277

278278
def add_multi_hop_joins(
279-
self, desired_linkable_specs: Sequence[LinkableInstanceSpec], nodes: Sequence[BaseOutput]
279+
self,
280+
desired_linkable_specs: Sequence[LinkableInstanceSpec],
281+
nodes: Sequence[BaseOutput],
282+
join_type: SqlJoinType,
280283
) -> Sequence[BaseOutput]:
281284
"""Assemble nodes representing all possible one-hop joins."""
282285
all_multi_hop_join_candidates: List[MultiHopJoinCandidate] = []
283286
lineage_for_all_multi_hop_join_candidates: Set[MultiHopJoinCandidateLineage] = set()
284287

285288
for desired_linkable_spec in desired_linkable_specs:
286289
for multi_hop_join_candidate in self._get_candidates_nodes_for_multi_hop(
287-
desired_linkable_spec=desired_linkable_spec,
288-
nodes=nodes,
290+
desired_linkable_spec=desired_linkable_spec, nodes=nodes, join_type=join_type
289291
):
290292
# Dedupe candidates that are the same join.
291293
if multi_hop_join_candidate.lineage not in lineage_for_all_multi_hop_join_candidates:

metricflow/plan_conversion/sql_join_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def make_base_output_join_description(
207207
left_source_alias=left_data_set.alias,
208208
right_source_alias=right_data_set.alias,
209209
column_equality_descriptions=column_equality_descriptions,
210-
join_type=SqlJoinType.LEFT_OUTER,
210+
join_type=join_description.join_type,
211211
additional_on_conditions=validity_conditions,
212212
)
213213

metricflow/test/dataflow/builder/test_node_data_set.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
MeasureSpec,
2424
)
2525
from metricflow.sql.sql_exprs import SqlColumnReference, SqlColumnReferenceExpression
26-
from metricflow.sql.sql_plan import SqlSelectColumn, SqlSelectStatementNode, SqlTableFromClauseNode
26+
from metricflow.sql.sql_plan import SqlJoinType, SqlSelectColumn, SqlSelectStatementNode, SqlTableFromClauseNode
2727
from metricflow.test.fixtures.model_fixtures import ConsistentIdObjectRepository
2828
from metricflow.test.fixtures.setup_fixtures import MetricFlowTestSessionState
2929
from metricflow.test.snapshot_utils import assert_spec_set_snapshot_equal
@@ -111,6 +111,7 @@ def test_joined_node_data_set( # noqa: D
111111
join_on_entity=LinklessEntitySpec.from_element_name("user"),
112112
join_on_partition_dimensions=(),
113113
join_on_partition_time_dimensions=(),
114+
join_type=SqlJoinType.LEFT_OUTER,
114115
)
115116
],
116117
)

0 commit comments

Comments
 (0)