Skip to content

Commit

Permalink
[SPARK-46285][SQL] Add foreachWithSubqueries
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

We can have a `foreachWithSubqueries` which also traverse the subqueries in the query plan.

### Why are the changes needed?

Add a new way to access subqueries in the query plan.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

UT

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#44206 from amaliujia/foreachsubqueries.

Authored-by: Rui Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
amaliujia authored and dongjoon-hyun committed Dec 10, 2023
1 parent d02fbba commit a777130
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,17 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
transformDownWithPruning(cond, ruleId)(g)
}

/**
* A variant of [[foreach]] which considers plan nodes inside subqueries as well.
*/
def foreachWithSubqueries(f: PlanType => Unit): Unit = {
def actualFunc(plan: PlanType): Unit = {
f(plan)
plan.subqueries.foreach(_.foreachWithSubqueries(f))
}
foreach(actualFunc)
}

/**
* A variant of `collect`. This method not only apply the given function to all elements in this
* plan, also considering all the plans in its (nested) subqueries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.plans

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -145,4 +146,16 @@ class LogicalPlanSuite extends SparkFunSuite {
assert(query.where(Literal.FalseLiteral).maxRows.contains(0))
assert(query.where(Literal.FalseLiteral).maxRowsPerPartition.contains(0))
}

test("SPARK-46285: foreachWithSubqueries") {
val input = UnresolvedRelation(Seq("subquery_table"))
val input2 = UnresolvedRelation(Seq("t"))
val plan = Filter(Exists(input), input2)
val tableNames = scala.collection.mutable.Set[String]()
plan.foreachWithSubqueries {
case e: UnresolvedRelation => tableNames.add(e.name)
case _ =>
}
assert(tableNames.contains("subquery_table"))
}
}

0 comments on commit a777130

Please sign in to comment.