diff --git a/docs/ppl-lang/ppl-lookup-command.md b/docs/ppl-lang/ppl-lookup-command.md index 87cf34bac..c88bef591 100644 --- a/docs/ppl-lang/ppl-lookup-command.md +++ b/docs/ppl-lang/ppl-lookup-command.md @@ -29,17 +29,17 @@ LOOKUP ( [AS ])... **inputField** - Optional - Default: All fields of \ where matched values are applied to result output if no field is specified. -- Description: A field in \ where matched values are applied to result output. You can specify multiple \ with comma-delimited. If you don't specify any \, all fields of \ where matched values are applied to result output. +- Description: A field in \ where matched values are applied to result output. You can specify multiple \ with comma-delimited. If you don't specify any \, all fields expect \ from \ where matched values are applied to result output. **outputField** - Optional - Default: \ -- Description: A field of output. You can specify multiple \. If you specify \ with an existing field name in source query, its values will be replaced or appended by matched values from \. If the field specified in \ is a new field, an extended new field will be applied to the results. +- Description: A field of output. You can specify zero or multiple \. If you specify \ with an existing field name in source query, its values will be replaced or appended by matched values from \. If the field specified in \ is a new field, in REPLACE strategy, an extended new field will be applied to the results, but fail in APPEND strategy. **REPLACE | APPEND** - Optional - Default: REPLACE -- Description: If you specify REPLACE, matched values in \ field overwrite the values in result. If you specify APPEND, matched values in \ field only append to the missing values in result. +- Description: The output strategies. If you specify REPLACE, matched values in \ field overwrite the values in result. If you specify APPEND, matched values in \ field only append to the missing values in result. ### Usage - `LOOKUP id AS cid REPLACE mail AS email` diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLLookupITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLLookupITSuite.scala index d790c98bc..77c9a36c9 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLLookupITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLLookupITSuite.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.ppl import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Coalesce, EqualTo} import org.apache.spark.sql.catalyst.plans.LeftOuter @@ -51,9 +51,6 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id REPLACE department") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id REPLACE department") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "Engineer", "England", 100000, "IT"), Row(1001, "Hello", "Artist", "USA", 70000, null), @@ -61,19 +58,18 @@ class FlintSparkPPLLookupITSuite Row(1003, "David", "Doctor", null, 120000, "HR"), Row(1004, "David", null, "Canada", 0, null), Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA")) + assertSameRows(expectedResults, frame) - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) val lookupProject = Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias) val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id")) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) - val coalesceForSafeExpr = - Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "department")()), + Alias( + UnresolvedAttribute("__auto_generated_subquery_name_l.department"), + "department")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -88,9 +84,6 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id APPEND department") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id APPEND department") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "Engineer", "England", 100000, "IT"), Row(1001, "Hello", "Artist", "USA", 70000, null), @@ -98,21 +91,21 @@ class FlintSparkPPLLookupITSuite Row(1003, "David", "Doctor", null, 120000, "HR"), Row(1004, "David", null, "Canada", 0, null), Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA")) - - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assertSameRows(expectedResults, frame) val lookupProject = Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias) val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id")) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) val coalesceExpr = - Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department"))) - val coalesceForSafeExpr = Coalesce(Seq(coalesceExpr, UnresolvedAttribute("department"))) + Coalesce( + Seq( + UnresolvedAttribute("department"), + UnresolvedAttribute("__auto_generated_subquery_name_l.department"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "department")()), + Alias(coalesceExpr, "department")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -128,30 +121,23 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id REPLACE department AS country") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id REPLACE department AS country") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "Engineer", 100000, "IT"), - Row(1001, "Hello", "Artist", 70000, "USA"), + Row(1001, "Hello", "Artist", 70000, null), Row(1002, "John", "Doctor", 120000, "DATA"), Row(1003, "David", "Doctor", 120000, "HR"), - Row(1004, "David", null, 0, "Canada"), + Row(1004, "David", null, 0, null), Row(1005, "Jane", "Scientist", 90000, "DATA")) - - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assertSameRows(expectedResults, frame) val lookupProject = Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias) val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id")) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) - val coalesceForSafeExpr = - Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("country"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "country")()), + Alias(UnresolvedAttribute("__auto_generated_subquery_name_l.department"), "country")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -167,9 +153,6 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id APPEND department AS country") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id APPEND department AS country") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "Engineer", 100000, "England"), Row(1001, "Hello", "Artist", 70000, "USA"), @@ -177,18 +160,21 @@ class FlintSparkPPLLookupITSuite Row(1003, "David", "Doctor", 120000, "HR"), Row(1004, "David", null, 0, "Canada"), Row(1005, "Jane", "Scientist", 90000, "Canada")) + assertSameRows(expectedResults, frame) val lookupProject = Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias) val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id")) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) val coalesceExpr = - Coalesce(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("department"))) - val coalesceForSafeExpr = Coalesce(Seq(coalesceExpr, UnresolvedAttribute("country"))) + Coalesce( + Seq( + UnresolvedAttribute("__auto_generated_subquery_name_s.country"), + UnresolvedAttribute("__auto_generated_subquery_name_l.department"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "country")()), + Alias(coalesceExpr, "country")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -204,9 +190,6 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id, name REPLACE department") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uID AS id, name REPLACE department") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "Engineer", "England", 100000, "IT"), Row(1001, "Hello", "Artist", "USA", 70000, null), @@ -214,9 +197,8 @@ class FlintSparkPPLLookupITSuite Row(1003, "David", "Doctor", null, 120000, "HR"), Row(1004, "David", null, "Canada", 0, null), Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA")) + assertSameRows(expectedResults, frame) - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) val lookupProject = Project( Seq( @@ -231,12 +213,12 @@ class FlintSparkPPLLookupITSuite UnresolvedAttribute("__auto_generated_subquery_name_l.name"), UnresolvedAttribute("__auto_generated_subquery_name_s.name"))) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) - val coalesceForSafeExpr = - Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "department")()), + Alias( + UnresolvedAttribute("__auto_generated_subquery_name_l.department"), + "department")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -253,9 +235,6 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id, name APPEND department") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS ID, name APPEND department") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "Engineer", "England", 100000, "IT"), Row(1001, "Hello", "Artist", "USA", 70000, null), @@ -263,9 +242,7 @@ class FlintSparkPPLLookupITSuite Row(1003, "David", "Doctor", null, 120000, "HR"), Row(1004, "David", null, "Canada", 0, null), Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA")) - - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assertSameRows(expectedResults, frame) val lookupProject = Project( @@ -282,12 +259,14 @@ class FlintSparkPPLLookupITSuite UnresolvedAttribute("__auto_generated_subquery_name_s.name"))) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) val coalesceExpr = - Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department"))) - val coalesceForSafeExpr = Coalesce(Seq(coalesceExpr, UnresolvedAttribute("department"))) + Coalesce( + Seq( + UnresolvedAttribute("department"), + UnresolvedAttribute("__auto_generated_subquery_name_l.department"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "department")()), + Alias(coalesceExpr, "department")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -303,38 +282,29 @@ class FlintSparkPPLLookupITSuite test("test LOOKUP lookupTable uid AS id, name") { val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uID AS id, name") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( - Row(1000, "Jake", "Engineer", "England", 100000, 1000, "Jake", "IT", "Engineer"), - Row(1001, "Hello", "Artist", "USA", 70000, null, null, null, null), - Row(1002, "John", "Doctor", "Canada", 120000, 1002, "John", "DATA", "Scientist"), - Row(1003, "David", "Doctor", null, 120000, 1003, "David", "HR", "Doctor"), - Row(1004, "David", null, "Canada", 0, null, null, null, null), - Row(1005, "Jane", "Scientist", "Canada", 90000, 1005, "Jane", "DATA", "Engineer")) - - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + Row(1000, "Jake", "England", 100000, "IT", "Engineer"), + Row(1001, "Hello", "USA", 70000, null, null), + Row(1002, "John", "Canada", 120000, "DATA", "Scientist"), + Row(1003, "David", null, 120000, "HR", "Doctor"), + Row(1004, "David", "Canada", 0, null, null), + Row(1005, "Jane", "Canada", 90000, "DATA", "Engineer")) + + assertSameRows(expectedResults, frame) } test("test LOOKUP lookupTable name REPLACE occupation") { val frame = sql( s"source = $sourceTable | eval major = occupation | fields id, name, major, country, salary | LOOKUP $lookupTable name REPLACE occupation AS major") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "England", 100000, "Engineer"), - Row(1001, "Hello", "USA", 70000, "Artist"), + Row(1001, "Hello", "USA", 70000, null), Row(1002, "John", "Canada", 120000, "Scientist"), Row(1003, "David", null, 120000, "Doctor"), Row(1004, "David", "Canada", 0, "Doctor"), Row(1005, "Jane", "Canada", 90000, "Engineer")) - - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assertSameRows(expectedResults, frame) val sourceTbl = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) val eval = Project( @@ -355,12 +325,10 @@ class FlintSparkPPLLookupITSuite UnresolvedAttribute("__auto_generated_subquery_name_s.name"), UnresolvedAttribute("__auto_generated_subquery_name_l.name")) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) - val coalesceForSafeExpr = - Coalesce(Seq(UnresolvedAttribute("occupation"), UnresolvedAttribute("major"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "major")()), + Alias(UnresolvedAttribute("__auto_generated_subquery_name_l.occupation"), "major")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -377,9 +345,6 @@ class FlintSparkPPLLookupITSuite val frame = sql( s"source = $sourceTable | eval major = occupation | fields id, name, major, country, salary | LOOKUP $lookupTable name APPEND occupation AS major") - // frame.show() - // frame.explain(true) - val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(1000, "Jake", "England", 100000, "Engineer"), Row(1001, "Hello", "USA", 70000, "Artist"), @@ -387,9 +352,7 @@ class FlintSparkPPLLookupITSuite Row(1003, "David", null, 120000, "Doctor"), Row(1004, "David", "Canada", 0, "Doctor"), Row(1005, "Jane", "Canada", 90000, "Scientist")) - - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assertSameRows(expectedResults, frame) val sourceTbl = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) val eval = Project( @@ -411,13 +374,14 @@ class FlintSparkPPLLookupITSuite UnresolvedAttribute("__auto_generated_subquery_name_l.name")) val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE) val coalesceExpr = - Coalesce(Seq(UnresolvedAttribute("major"), UnresolvedAttribute("occupation"))) - val coalesceForSafeExpr = - Coalesce(Seq(coalesceExpr, UnresolvedAttribute("major"))) + Coalesce( + Seq( + UnresolvedAttribute("major"), + UnresolvedAttribute("__auto_generated_subquery_name_l.occupation"))) val projectAfterJoin = Project( Seq( UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))), - Alias(coalesceForSafeExpr, "major")()), + Alias(coalesceExpr, "major")()), joinPlan) val dropColumns = DataFrameDropColumns( Seq( @@ -429,4 +393,130 @@ class FlintSparkPPLLookupITSuite comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) } + + test("test LOOKUP lookupTable name") { + val frame = + sql(s"source = $sourceTable | LOOKUP $lookupTable name") + val expectedResults: Array[Row] = Array( + Row(1000, "Jake", "England", 100000, 1000, "IT", "Engineer"), + Row(1001, "Hello", "USA", 70000, null, null, null), + Row(1002, "John", "Canada", 120000, 1002, "DATA", "Scientist"), + Row(1003, "David", null, 120000, 1003, "HR", "Doctor"), + Row(1004, "David", "Canada", 0, 1003, "HR", "Doctor"), + Row(1005, "Jane", "Canada", 90000, 1005, "DATA", "Engineer")) + assertSameRows(expectedResults, frame) + } + + // rename country to department for verify the case if search side is not a table + // and its output has diffed from the original fields of source table + test("test LOOKUP lookupTable id with rename") { + val frame = + sql( + s"source = $sourceTable | rename id as uid | rename country as department | LOOKUP $lookupTable uid") + val expectedResults: Array[Row] = Array( + Row(100000, 1000, "Jake", "IT", "Engineer"), + Row(70000, 1001, null, null, null), + Row(120000, 1002, "John", "DATA", "Scientist"), + Row(120000, 1003, "David", "HR", "Doctor"), + Row(0, 1004, null, null, null), + Row(90000, 1005, "Jane", "DATA", "Engineer")) + assertSameRows(expectedResults, frame) + } + + test("correctness combination test") { + sql(s""" + | CREATE TABLE s + | ( + | id INT, + | col1 STRING, + | col2 STRING + | ) + | USING $tableType $tableOptions + |""".stripMargin) + sql(s""" + | INSERT INTO s + | VALUES (1, 'a', 'b'), + | (2, 'aa', 'bb'), + | (3, null, 'ccc') + | """.stripMargin) + + sql(s""" + | CREATE TABLE l + | ( + | id INT, + | col1 STRING, + | col3 STRING + | ) + | USING $tableType $tableOptions + |""".stripMargin) + sql(s""" + | INSERT INTO l + | VALUES (1, 'x', 'y'), + | (3, 'xx', 'yy') + | """.stripMargin) + var frame = sql(s"source = s | LOOKUP l id | fields id, col1, col2, col3") + var expectedResults = + Array(Row(1, "x", "b", "y"), Row(2, null, "bb", null), Row(3, "xx", "ccc", "yy")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id REPLACE id, col1, col3 | fields id, col1, col2, col3") + expectedResults = + Array(Row(1, "x", "b", "y"), Row(null, null, "bb", null), Row(3, "xx", "ccc", "yy")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id APPEND id, col1, col3 | fields id, col1, col2, col3") + expectedResults = + Array(Row(1, "a", "b", "y"), Row(2, "aa", "bb", null), Row(3, "xx", "ccc", "yy")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id REPLACE col1 | fields id, col1, col2") + expectedResults = Array(Row(1, "x", "b"), Row(2, null, "bb"), Row(3, "xx", "ccc")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id APPEND col1 | fields id, col1, col2") + expectedResults = Array(Row(1, "a", "b"), Row(2, "aa", "bb"), Row(3, "xx", "ccc")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id REPLACE col1 as col2 | fields id, col1, col2") + expectedResults = Array(Row(1, "a", "x"), Row(2, "aa", null), Row(3, null, "xx")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id APPEND col1 as col2 | fields id, col1, col2") + expectedResults = Array(Row(1, "a", "b"), Row(2, "aa", "bb"), Row(3, null, "ccc")) + assertSameRows(expectedResults, frame) + frame = sql(s"source = s | LOOKUP l id REPLACE col1 as colA | fields id, col1, col2, colA") + expectedResults = + Array(Row(1, "a", "b", "x"), Row(2, "aa", "bb", null), Row(3, null, "ccc", "xx")) + assertSameRows(expectedResults, frame) + // source = s | LOOKUP l id APPEND col1 as colA | fields id, col1, col2, colA throw exception + } + + test("test LOOKUP lookupTable name REPLACE occupation - 2") { + val frame = + sql(s"source = $sourceTable | LOOKUP $lookupTable name REPLACE occupation") + val expectedResults: Array[Row] = Array( + Row(1000, "Jake", "England", 100000, "Engineer"), + Row(1001, "Hello", "USA", 70000, null), + Row(1002, "John", "Canada", 120000, "Scientist"), + Row(1003, "David", null, 120000, "Doctor"), + Row(1004, "David", "Canada", 0, "Doctor"), + Row(1005, "Jane", "Canada", 90000, "Engineer")) + assertSameRows(expectedResults, frame) + } + + test("test LOOKUP lookupTable name REPLACE occupation as new_col") { + val frame = + sql(s"source = $sourceTable | LOOKUP $lookupTable name REPLACE occupation as new_col") + val expectedResults: Array[Row] = Array( + Row(1000, "Jake", "Engineer", "England", 100000, "Engineer"), + Row(1001, "Hello", "Artist", "USA", 70000, null), + Row(1002, "John", "Doctor", "Canada", 120000, "Scientist"), + Row(1003, "David", "Doctor", null, 120000, "Doctor"), + Row(1004, "David", null, "Canada", 0, "Doctor"), + Row(1005, "Jane", "Scientist", "Canada", 90000, "Engineer")) + assertSameRows(expectedResults, frame) + } + + test("test LOOKUP lookupTable name APPEND occupation as new_col throw exception") { + val ex = intercept[AnalysisException](sql(s""" + | source = $sourceTable | LOOKUP $lookupTable name APPEND occupation as new_col + | """.stripMargin)) + assert( + ex.getMessage.contains( + "A column or function parameter with name `new_col` cannot be resolved")) + } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java index d9ace48ba..04662522b 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java @@ -368,6 +368,7 @@ public Expression visitWindowFunction(WindowFunction node, CatalystPlanContext c @Override public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerContext) { CatalystPlanContext innerContext = new CatalystPlanContext(); + innerContext.withSparkSession(outerContext.getSparkSession()); visitExpressionList(node.getChild(), innerContext); Seq values = innerContext.retainAllNamedParseExpressions(p -> p); UnresolvedPlan outerPlan = node.getQuery(); @@ -387,6 +388,7 @@ public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerCont @Override public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext context) { CatalystPlanContext innerContext = new CatalystPlanContext(); + innerContext.withSparkSession(context.getSparkSession()); UnresolvedPlan outerPlan = node.getQuery(); LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext); Expression scalarSubQuery = ScalarSubquery$.MODULE$.apply( @@ -402,6 +404,7 @@ public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext c @Override public Expression visitExistsSubquery(ExistsSubquery node, CatalystPlanContext context) { CatalystPlanContext innerContext = new CatalystPlanContext(); + innerContext.withSparkSession(context.getSparkSession()); UnresolvedPlan outerPlan = node.getQuery(); LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext); Expression existsSubQuery = Exists$.MODULE$.apply( diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java index 1621e65d5..1a3e0dd44 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java @@ -6,6 +6,7 @@ package org.opensearch.sql.ppl; import lombok.Getter; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.Expression; @@ -38,6 +39,8 @@ * The context used for Catalyst logical plan. */ public class CatalystPlanContext { + + @Getter private SparkSession sparkSession; /** * Catalyst relations list **/ @@ -283,4 +286,8 @@ public Expression resolveJoinCondition( isResolvingJoinCondition = false; return result; } + + public void withSparkSession(SparkSession sparkSession) { + this.sparkSession = sparkSession; + } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index a1bd2b6c4..926637375 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -79,6 +79,7 @@ import org.opensearch.sql.ppl.utils.FieldSummaryTransformer; import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator; import org.opensearch.sql.ppl.utils.ParseTransformer; +import org.opensearch.sql.ppl.utils.RelationUtils; import org.opensearch.sql.ppl.utils.SortUtils; import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils; import org.opensearch.sql.ppl.utils.WindowSpecTransformer; @@ -88,9 +89,11 @@ import scala.collection.Seq; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -193,41 +196,56 @@ public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) { public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) { visitFirstChild(node, context); return context.apply( searchSide -> { + context.retainAllNamedParseExpressions(p -> p); + context.retainAllPlans(p -> p); + LogicalPlan target; LogicalPlan lookupTable = node.getLookupRelation().accept(this, context); Expression lookupCondition = buildLookupMappingCondition(node, expressionAnalyzer, context); - // If no output field is specified, all fields from lookup table are applied to the output. if (node.allFieldsShouldAppliedToOutputList()) { - context.retainAllNamedParseExpressions(p -> p); - context.retainAllPlans(p -> p); - return join(searchSide, lookupTable, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint()); - } - - // If the output fields are specified, build a project list for lookup table. - // The mapping fields of lookup table should be added in this project list, otherwise join will fail. - // So the mapping fields of lookup table should be dropped after join. - List lookupTableProjectList = buildLookupRelationProjectList(node, expressionAnalyzer, context); - LogicalPlan lookupTableWithProject = Project$.MODULE$.apply(seq(lookupTableProjectList), lookupTable); - - LogicalPlan join = join(searchSide, lookupTableWithProject, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint()); + // When no output field is specified, all fields except mapping fields from lookup table are applied to the output. + // If some output fields from source side duplicate to fields of lookup table, these fields will + // be replaced by fields from lookup table in output. + // For example, the lookup table contains fields [id, col1, col3] and source side fields are [id, col1, col2]. + // For query "index = sourceTable | fields id, col1, col2 | LOOKUP lookupTable id", + // the col1 is duplicated field and id is mapping key (and duplicated). + // The query outputs 4 fields: [id, col1, col2, col3]. Among them, `col2` is the original field from source, + // the matched values of `col1` from lookup table will replace to the values of `col1` from source. + Set duplicatedFieldsMaybeDrop = + new HashSet<>(RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), lookupTable)); + Set mappingFieldsOfLookup = node.getLookupMappingMap().keySet(); + // lookup mapping keys are not concerned to drop here, it will be checked later. + duplicatedFieldsMaybeDrop.removeAll(mappingFieldsOfLookup); + List duplicated = + buildProjectListFromFields(new ArrayList<>(duplicatedFieldsMaybeDrop), expressionAnalyzer, context) + .stream().map(e -> (Expression) e).collect(Collectors.toList()); + LogicalPlan searchSideWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(duplicated), searchSide); + target = join(searchSideWithDropped, lookupTable, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint()); + } else { + // When output fields are specified, build a project list for lookup table. + // The mapping fields of lookup table should be added in this project list, otherwise join will fail. + // So the mapping fields of lookup table should be dropped after join. + List lookupTableProjectList = buildLookupRelationProjectList(node, expressionAnalyzer, context); + LogicalPlan lookupTableWithProject = Project$.MODULE$.apply(seq(lookupTableProjectList), lookupTable); - // Add all outputFields by __auto_generated_subquery_name_s.* - List outputFieldsWithNewAdded = new ArrayList<>(); - outputFieldsWithNewAdded.add(UnresolvedStar$.MODULE$.apply(Option.apply(seq(node.getSourceSubqueryAliasName())))); + LogicalPlan join = join(searchSide, lookupTableWithProject, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint()); - // Add new columns based on different strategies: - // Append: coalesce($outputField, $"inputField").as(outputFieldName) - // Replace: $outputField.as(outputFieldName) - outputFieldsWithNewAdded.addAll(buildOutputProjectList(node, node.getOutputStrategy(), expressionAnalyzer, context)); + // Add all outputFields by __auto_generated_subquery_name_s.* + List outputFieldsWithNewAdded = new ArrayList<>(); + outputFieldsWithNewAdded.add(UnresolvedStar$.MODULE$.apply(Option.apply(seq(node.getSourceSubqueryAliasName())))); - org.apache.spark.sql.catalyst.plans.logical.Project outputWithNewAdded = Project$.MODULE$.apply(seq(outputFieldsWithNewAdded), join); + // Add new columns based on different strategies: + // Append: coalesce($outputField, $"inputField").as(outputFieldName) + // Replace: $outputField.as(outputFieldName) + outputFieldsWithNewAdded.addAll(buildOutputProjectList(node, node.getOutputStrategy(), expressionAnalyzer, context, searchSide)); + target = Project$.MODULE$.apply(seq(outputFieldsWithNewAdded), join); + } // Drop the mapping fields of lookup table in result: // For example, in command "LOOKUP lookTbl Field1 AS Field2, Field3", // the Field1 and Field3 are projection fields and join keys which will be dropped in result. List mappingFieldsOfLookup = node.getLookupMappingMap().entrySet().stream() .map(kv -> kv.getKey().getField() == kv.getValue().getField() ? buildFieldWithLookupSubqueryAlias(node, kv.getKey()) : kv.getKey()) .collect(Collectors.toList()); -// List mappingFieldsOfLookup = new ArrayList<>(node.getLookupMappingMap().keySet()); List dropListOfLookupMappingFields = buildProjectListFromFields(mappingFieldsOfLookup, expressionAnalyzer, context).stream() .map(Expression.class::cast).collect(Collectors.toList()); @@ -237,7 +255,7 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) { List toDrop = new ArrayList<>(dropListOfLookupMappingFields); toDrop.addAll(dropListOfSourceFields); - LogicalPlan outputWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(toDrop), outputWithNewAdded); + LogicalPlan outputWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(toDrop), target); context.retainAllNamedParseExpressions(p -> p); context.retainAllPlans(p -> p); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/LookupTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/LookupTransformer.java index 3673d96d6..de2f74386 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/LookupTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/LookupTransformer.java @@ -11,13 +11,13 @@ import org.apache.spark.sql.catalyst.expressions.EqualTo$; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.QualifiedName; import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ppl.CatalystExpressionVisitor; import org.opensearch.sql.ppl.CatalystPlanContext; -import org.opensearch.sql.ppl.CatalystQueryPlanVisitor; import scala.Option; import java.util.ArrayList; @@ -83,31 +83,36 @@ static List buildOutputProjectList( Lookup node, Lookup.OutputStrategy strategy, CatalystExpressionVisitor expressionAnalyzer, - CatalystPlanContext context) { + CatalystPlanContext context, + LogicalPlan searchSide) { List outputProjectList = new ArrayList<>(); for (Map.Entry entry : node.getOutputCandidateMap().entrySet()) { Alias inputFieldWithAlias = entry.getKey(); Field inputField = (Field) inputFieldWithAlias.getDelegated(); Field outputField = entry.getValue(); - Expression inputCol = expressionAnalyzer.visitField(inputField, context); - Expression outputCol = expressionAnalyzer.visitField(outputField, context); - + // Always resolve the inputCol expression with alias: __auto_generated_subquery_name_l. + // If the outputField existed in source table, resolve the outputCol expression with alias: __auto_generated_subquery_name_s. + // If not, resolve the outputCol expression without alias: to avoid failure of unable to resolved attribute. + Expression inputCol = expressionAnalyzer.visitField(buildFieldWithLookupSubqueryAlias(node, inputField), context); + Expression outputCol; + if (RelationUtils.columnExistsInCatalogTable(context.getSparkSession(), searchSide, outputField)) { + outputCol = expressionAnalyzer.visitField(buildFieldWithSourceSubqueryAlias(node, outputField), context); + } else { + outputCol = expressionAnalyzer.visitField(outputField, context); + } Expression child; if (strategy == Lookup.OutputStrategy.APPEND) { child = Coalesce$.MODULE$.apply(seq(outputCol, inputCol)); } else { child = inputCol; } - // The result output project list we build here is used to replace the source output, - // for the unmatched rows of left outer join, the outputs are null, so fall back to source output. - Expression nullSafeOutput = Coalesce$.MODULE$.apply(seq(child, outputCol)); - NamedExpression nullSafeOutputCol = Alias$.MODULE$.apply(nullSafeOutput, + NamedExpression output = Alias$.MODULE$.apply(child, inputFieldWithAlias.getName(), NamedExpression.newExprId(), seq(new java.util.ArrayList()), Option.empty(), seq(new java.util.ArrayList())); - outputProjectList.add(nullSafeOutputCol); + outputProjectList.add(output); } context.retainAllNamedParseExpressions(p -> p); return outputProjectList; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index f959fe199..fa9faf8af 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -5,16 +5,29 @@ package org.opensearch.sql.ppl.utils; +import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.opensearch.flint.spark.ppl.PPLSparkUtils; +import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.QualifiedName; import scala.Option$; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; public interface RelationUtils { + Logger LOG = LogManager.getLogger(RelationUtils.class); + /** * attempt resolving if the field is relating to the given relation * if name doesnt contain table prefix - add the current relation prefix to the fields name - returns true @@ -65,4 +78,27 @@ static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { } return identifier; } + + static boolean columnExistsInCatalogTable(SparkSession spark, LogicalPlan plan, Field field) { + return getFieldsFromCatalogTable(spark, plan).stream().anyMatch(f -> f.getField().equals(field.getField())); + } + + static List getFieldsFromCatalogTable(SparkSession spark, LogicalPlan plan) { + UnresolvedRelation relation = PPLSparkUtils.findLogicalRelations(plan).head(); + QualifiedName tableQualifiedName = QualifiedName.of(Arrays.asList(relation.tableName().split("\\."))); + TableIdentifier tableIdentifier = getTableIdentifier(tableQualifiedName); + boolean tableExists = spark.sessionState().catalog().tableExists(tableIdentifier); + if (tableExists) { + try { + CatalogTable table = spark.sessionState().catalog().getTableMetadata(getTableIdentifier(tableQualifiedName)); + return Arrays.stream(table.dataSchema().fields()).map(f -> new Field(QualifiedName.of(f.name()))).collect(Collectors.toList()); + } catch (NoSuchDatabaseException | NoSuchTableException e) { + LOG.info("Table or database {} not found", tableIdentifier); + return ImmutableList.of(); + } + } else { + LOG.info("Table {} not found", tableIdentifier); + return ImmutableList.of(); + } + } } diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index 26ad4b69b..c2485ab3a 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -16,7 +16,7 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (spark, parser) => - new FlintSparkPPLParser(parser) + new FlintSparkPPLParser(parser, spark) } } } diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala index 296e8ccc4..7b56c5fed 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala @@ -31,6 +31,7 @@ import org.opensearch.flint.spark.ppl.PlaneUtils.plan import org.opensearch.sql.common.antlr.SyntaxCheckException import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -44,7 +45,8 @@ import org.apache.spark.sql.types.{DataType, StructType} * @param sparkParser * Spark SQL parser */ -class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface { +class FlintSparkPPLParser(sparkParser: ParserInterface, val spark: SparkSession) + extends ParserInterface { /** OpenSearch (PPL) AST builder. */ private val planTransformer = new CatalystQueryPlanVisitor() @@ -55,6 +57,7 @@ class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface try { // if successful build ppl logical plan and translate to catalyst logical plan val context = new CatalystPlanContext + context.withSparkSession(spark) planTransformer.visit(plan(pplParser, sqlText), context) context.getPlan } catch { diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/PPLSparkUtils.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/PPLSparkUtils.scala new file mode 100644 index 000000000..78827d4e7 --- /dev/null +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/PPLSparkUtils.scala @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +object PPLSparkUtils { + + def findLogicalRelations(plan: LogicalPlan): Seq[UnresolvedRelation] = { + plan + .transformDown { case relation: UnresolvedRelation => + relation + } + .collect { case relation: UnresolvedRelation => + relation + } + } +}