Skip to content

Commit 14337f6

Browse files
huaxingaocloud-fan
authored andcommitted
[SPARK-29643][SQL] ALTER TABLE/VIEW (DROP PARTITION) should look up catalog/table like v2 commands
###What changes were proposed in this pull request? Add AlterTableDropPartitionStatement and make ALTER TABLE/VIEW ... DROP PARTITION go through the same catalog/table resolution framework of v2 commands. ### Why are the changes needed? It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g. ``` USE my_catalog DESC t // success and describe the table t from my_catalog ALTER TABLE t DROP PARTITION (id=1) // report table not found as there is no table t in the session catalog ``` ### Does this PR introduce any user-facing change? Yes. When running ALTER TABLE/VIEW ... DROP PARTITION, Spark fails the command if the current catalog is set to a v2 catalog, or the table name specified a v2 catalog. ### How was this patch tested? Unit tests. Closes apache#26303 from huaxingao/spark-29643. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a4382f7 commit 14337f6

File tree

8 files changed

+107
-67
lines changed

8 files changed

+107
-67
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,8 @@ statement
164164
partitionSpec+ #addTablePartition
165165
| ALTER TABLE multipartIdentifier
166166
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
167-
| ALTER TABLE tableIdentifier
167+
| ALTER (TABLE | VIEW) multipartIdentifier
168168
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
169-
| ALTER VIEW tableIdentifier
170-
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
171169
| ALTER TABLE multipartIdentifier
172170
(partitionSpec)? SET locationSpec #setTableLocation
173171
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2967,4 +2967,30 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29672967
visitNonOptionalPartitionSpec(ctx.from),
29682968
visitNonOptionalPartitionSpec(ctx.to))
29692969
}
2970+
2971+
/**
2972+
* Create an [[AlterTableDropPartitionStatement]]
2973+
*
2974+
* For example:
2975+
* {{{
2976+
* ALTER TABLE multi_part_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
2977+
* [PURGE];
2978+
* ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
2979+
* }}}
2980+
*
2981+
* ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
2982+
* is associated with physical tables
2983+
*/
2984+
override def visitDropTablePartitions(
2985+
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
2986+
if (ctx.VIEW != null) {
2987+
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
2988+
}
2989+
AlterTableDropPartitionStatement(
2990+
visitMultipartIdentifier(ctx.multipartIdentifier),
2991+
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
2992+
ifExists = ctx.EXISTS != null,
2993+
purge = ctx.PURGE != null,
2994+
retainData = false)
2995+
}
29702996
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,16 @@ case class AlterTableRenamePartitionStatement(
196196
from: TablePartitionSpec,
197197
to: TablePartitionSpec) extends ParsedStatement
198198

199+
/**
200+
* ALTER TABLE ... DROP PARTITION command, as parsed from SQL
201+
*/
202+
case class AlterTableDropPartitionStatement(
203+
tableName: Seq[String],
204+
specs: Seq[TablePartitionSpec],
205+
ifExists: Boolean,
206+
purge: Boolean,
207+
retainData: Boolean) extends ParsedStatement
208+
199209
/**
200210
* ALTER VIEW ... SET TBLPROPERTIES command, as parsed from SQL.
201211
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,6 +1228,56 @@ class DDLParserSuite extends AnalysisTest {
12281228
comparePlans(parsed2, expected2)
12291229
}
12301230

1231+
// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
1232+
// ALTER VIEW table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
1233+
test("alter table: drop partition") {
1234+
val sql1_table =
1235+
"""
1236+
|ALTER TABLE table_name DROP IF EXISTS PARTITION
1237+
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
1238+
""".stripMargin
1239+
val sql2_table =
1240+
"""
1241+
|ALTER TABLE table_name DROP PARTITION
1242+
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
1243+
""".stripMargin
1244+
val sql1_view = sql1_table.replace("TABLE", "VIEW")
1245+
val sql2_view = sql2_table.replace("TABLE", "VIEW")
1246+
1247+
val parsed1_table = parsePlan(sql1_table)
1248+
val parsed2_table = parsePlan(sql2_table)
1249+
val parsed1_purge = parsePlan(sql1_table + " PURGE")
1250+
1251+
assertUnsupported(sql1_view)
1252+
assertUnsupported(sql2_view)
1253+
1254+
val expected1_table = AlterTableDropPartitionStatement(
1255+
Seq("table_name"),
1256+
Seq(
1257+
Map("dt" -> "2008-08-08", "country" -> "us"),
1258+
Map("dt" -> "2009-09-09", "country" -> "uk")),
1259+
ifExists = true,
1260+
purge = false,
1261+
retainData = false)
1262+
val expected2_table = expected1_table.copy(ifExists = false)
1263+
val expected1_purge = expected1_table.copy(purge = true)
1264+
1265+
comparePlans(parsed1_table, expected1_table)
1266+
comparePlans(parsed2_table, expected2_table)
1267+
comparePlans(parsed1_purge, expected1_purge)
1268+
1269+
val sql3_table = "ALTER TABLE a.b.c DROP IF EXISTS PARTITION (ds='2017-06-10')"
1270+
val expected3_table = AlterTableDropPartitionStatement(
1271+
Seq("a", "b", "c"),
1272+
Seq(Map("ds" -> "2017-06-10")),
1273+
ifExists = true,
1274+
purge = false,
1275+
retainData = false)
1276+
1277+
val parsed3_table = parsePlan(sql3_table)
1278+
comparePlans(parsed3_table, expected3_table)
1279+
}
1280+
12311281
private case class TableSpec(
12321282
name: Seq[String],
12331283
schema: Option[StructType],

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,15 @@ class ResolveSessionCatalog(
376376
v1TableName.asTableIdentifier,
377377
from,
378378
to)
379+
380+
case AlterTableDropPartitionStatement(tableName, specs, ifExists, purge, retainData) =>
381+
val v1TableName = parseV1Table(tableName, "ALTER TABLE DROP PARTITION")
382+
AlterTableDropPartitionCommand(
383+
v1TableName.asTableIdentifier,
384+
specs,
385+
ifExists,
386+
purge,
387+
retainData)
379388
}
380389

381390
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -461,31 +461,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
461461
ctx.EXISTS != null)
462462
}
463463

464-
/**
465-
* Create an [[AlterTableDropPartitionCommand]] command
466-
*
467-
* For example:
468-
* {{{
469-
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
470-
* ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
471-
* }}}
472-
*
473-
* ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
474-
* is associated with physical tables
475-
*/
476-
override def visitDropTablePartitions(
477-
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
478-
if (ctx.VIEW != null) {
479-
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
480-
}
481-
AlterTableDropPartitionCommand(
482-
visitTableIdentifier(ctx.tableIdentifier),
483-
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
484-
ifExists = ctx.EXISTS != null,
485-
purge = ctx.PURGE != null,
486-
retainData = false)
487-
}
488-
489464
/**
490465
* Create a [[AlterTableChangeColumnCommand]] command.
491466
*

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,17 @@ class DataSourceV2SQLSuite
13961396
}
13971397
}
13981398

1399+
test("ALTER TABLE DROP PARTITIONS") {
1400+
val t = "testcat.ns1.ns2.tbl"
1401+
withTable(t) {
1402+
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
1403+
val e = intercept[AnalysisException] {
1404+
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
1405+
}
1406+
assert(e.message.contains("ALTER TABLE DROP PARTITION is only supported with v1 tables"))
1407+
}
1408+
}
1409+
13991410
private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
14001411
val e = intercept[AnalysisException] {
14011412
sql(s"$sqlCommand $sqlParams")

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -566,45 +566,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
566566
""".stripMargin)
567567
}
568568

569-
// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
570-
// ALTER VIEW table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
571-
test("alter table/view: drop partitions") {
572-
val sql1_table =
573-
"""
574-
|ALTER TABLE table_name DROP IF EXISTS PARTITION
575-
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
576-
""".stripMargin
577-
val sql2_table =
578-
"""
579-
|ALTER TABLE table_name DROP PARTITION
580-
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
581-
""".stripMargin
582-
val sql1_view = sql1_table.replace("TABLE", "VIEW")
583-
val sql2_view = sql2_table.replace("TABLE", "VIEW")
584-
585-
val parsed1_table = parser.parsePlan(sql1_table)
586-
val parsed2_table = parser.parsePlan(sql2_table)
587-
val parsed1_purge = parser.parsePlan(sql1_table + " PURGE")
588-
assertUnsupported(sql1_view)
589-
assertUnsupported(sql2_view)
590-
591-
val tableIdent = TableIdentifier("table_name", None)
592-
val expected1_table = AlterTableDropPartitionCommand(
593-
tableIdent,
594-
Seq(
595-
Map("dt" -> "2008-08-08", "country" -> "us"),
596-
Map("dt" -> "2009-09-09", "country" -> "uk")),
597-
ifExists = true,
598-
purge = false,
599-
retainData = false)
600-
val expected2_table = expected1_table.copy(ifExists = false)
601-
val expected1_purge = expected1_table.copy(purge = true)
602-
603-
comparePlans(parsed1_table, expected1_table)
604-
comparePlans(parsed2_table, expected2_table)
605-
comparePlans(parsed1_purge, expected1_purge)
606-
}
607-
608569
test("alter table: archive partition (not supported)") {
609570
assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')")
610571
}

0 commit comments

Comments
 (0)