Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SourceScanOptimizer generating duplicated columns #1494

Merged
merged 9 commits into from
Nov 5, 2024
18 changes: 16 additions & 2 deletions metricflow/dataflow/nodes/compute_metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Sequence, Set, Tuple
from dataclasses import dataclass, field
from typing import Dict, Sequence, Set, Tuple

from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
Expand All @@ -28,9 +28,11 @@ class ComputeMetricsNode(DataflowPlanNode):
metric_specs: Tuple[MetricSpec, ...]
for_group_by_source_node: bool
_aggregated_to_elements: Tuple[LinkableInstanceSpec, ...]
_alias_to_metric_spec: Dict[str, MetricSpec] = field(hash=False)

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()

assert len(self.parent_nodes) == 1

@staticmethod
Expand All @@ -44,6 +46,7 @@ def create( # noqa: D102
parent_nodes=(parent_node,),
metric_specs=tuple(metric_specs),
_aggregated_to_elements=tuple(aggregated_to_elements),
_alias_to_metric_spec={spec.alias: spec for spec in metric_specs if spec.alias is not None},
for_group_by_source_node=for_group_by_source_node,
)

Expand Down Expand Up @@ -97,6 +100,17 @@ def can_combine(self, other_node: ComputeMetricsNode) -> Tuple[bool, str]:
if other_node.for_group_by_source_node != self.for_group_by_source_node:
return False, "one node is a group by metric source node"

for spec in other_node.metric_specs:
if (
spec.alias is not None
and spec.alias in self._alias_to_metric_spec
and self._alias_to_metric_spec[spec.alias] != spec
):
return (
False,
f"'{spec.alias}' is defined in both nodes but it refers to different things in each of them",
)

return True, ""

def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> ComputeMetricsNode: # noqa: D102
Expand Down