Skip to content

Commit

Permalink
[SPARK-46342][SQL] Replace IllegalStateException by `SparkException…
Browse files Browse the repository at this point in the history
….internalError` in sql

### What changes were proposed in this pull request?
In the PR, I propose to replace all `IllegalStateException` exception in the `sql` project except of `streaming` by `SparkException.internalError`.

### Why are the changes needed?
This is a part of migration onto new error framework and error classes.

### Does this PR introduce _any_ user-facing change?
No, users shouldn't face to `IllegalStateException` in regular cases.

### How was this patch tested?
Using existing GAs.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#44275 from MaxGekk/replace-ise-by-internal-error.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Dec 10, 2023
1 parent 109b1e4 commit d02fbba
Show file tree
Hide file tree
Showing 44 changed files with 136 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT}
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
Expand Down Expand Up @@ -90,7 +91,7 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long =
throw new IllegalStateException(
throw SparkException.internalError(
s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` should be " +
"implemented in the formatter of timestamp without time zone")

Expand Down Expand Up @@ -137,7 +138,7 @@ sealed trait TimestampFormatter extends Serializable {

@throws(classOf[IllegalStateException])
def format(localDateTime: LocalDateTime): String =
throw new IllegalStateException(
throw SparkException.internalError(
s"The method `format(localDateTime: LocalDateTime)` should be implemented in the formatter " +
"of timestamp without time zone")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}

import org.apache.spark.SparkException
import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -53,7 +54,7 @@ private[sql] object ArrowUtils {
case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale)
case DateType => new ArrowType.Date(DateUnit.DAY)
case TimestampType if timeZoneId == null =>
throw new IllegalStateException("Missing timezoneId where it is mandatory.")
throw SparkException.internalError("Missing timezoneId where it is mandatory.")
case TimestampType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId)
case TimestampNTZType =>
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class DataSourceV2Relation(
// when testing, throw an exception if this computeStats method is called because stats should
// not be accessed before pushing the projection and filters to create a scan. otherwise, the
// stats are not accurate because they are based on a full table scan of all columns.
throw new IllegalStateException(
throw SparkException.internalError(
s"BUG: computeStats called before pushdown on DSv2 relation: $name")
} else {
// when not testing, return stats because bad stats are better than failing a query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.time.ZoneId

import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.LA
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -50,10 +50,12 @@ class ArrowUtilsSuite extends SparkFunSuite {
roundtrip(DateType)
roundtrip(YearMonthIntervalType())
roundtrip(DayTimeIntervalType())
val tsExMsg = intercept[IllegalStateException] {
roundtrip(TimestampType)
}
assert(tsExMsg.getMessage.contains("timezoneId"))
checkError(
exception = intercept[SparkException] {
roundtrip(TimestampType)
},
errorClass = "INTERNAL_ERROR",
parameters = Map("message" -> "Missing timezoneId where it is mandatory."))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
ArrowUtils.fromArrowType(new ArrowType.Int(8, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, SparkException, TaskContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -1217,7 +1217,7 @@ object SparkSession extends Logging {
*/
def active: SparkSession = {
getActiveSession.getOrElse(getDefaultSession.getOrElse(
throw new IllegalStateException("No active or default Spark session found")))
throw SparkException.internalError("No active or default Spark session found")))
}

/**
Expand Down Expand Up @@ -1316,7 +1316,7 @@ object SparkSession extends Logging {
private def assertOnDriver(): Unit = {
if (TaskContext.get() != null) {
// we're accessing it during task execution, fail.
throw new IllegalStateException(
throw SparkException.internalError(
"SparkSession should only be created and accessed on the driver.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.Locale

import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.spark.SparkException
import org.apache.spark.api.python.DechunkedInputStream
import org.apache.spark.internal.Logging
import org.apache.spark.security.SocketAuthServer
Expand Down Expand Up @@ -159,7 +160,7 @@ private[sql] object PythonSQLUtils extends Logging {
case "HOUR" => Column(zero.copy(hours = e.expr))
case "MINUTE" => Column(zero.copy(mins = e.expr))
case "SECOND" => Column(zero.copy(secs = e.expr))
case _ => throw new IllegalStateException(s"Got the unexpected unit '$unit'.")
case _ => throw SparkException.internalError(s"Got the unexpected unit '$unit'.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.commons.lang3.StringUtils

import org.apache.spark.SparkException
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
Expand Down Expand Up @@ -154,7 +155,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(child))
case _ =>
throw new IllegalStateException(s"[BUG] unexpected column expression: $column")
throw SparkException.internalError(s"[BUG] unexpected column expression: $column")
}

// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import java.util.Locale

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -150,7 +151,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
LocalRelation(partAttrs, partitionData)

case _ =>
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
throw SparkException.internalError(s"unrecognized table scan node: $relation, " +
s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.util.control.NonFatal

import org.apache.spark.{ErrorMessageFormat, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX}
Expand Down Expand Up @@ -58,7 +58,7 @@ object SQLExecution extends Logging {
// started execution of a query didn't call withNewExecutionId. The execution ID should be
// set by calling withNewExecutionId in the action that begins execution, like
// Dataset.collect or DataFrameWriter.insertInto.
throw new IllegalStateException("Execution ID should be set")
throw SparkException.internalError("Execution ID should be set")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunctionName, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog._
Expand Down Expand Up @@ -269,7 +270,7 @@ class SparkSqlAstBuilder extends AstBuilder {
} else if (ctx.stringLit() != null) {
SetCatalogCommand(string(visitStringLit(ctx.stringLit())))
} else {
throw new IllegalStateException("Invalid catalog name")
throw SparkException.internalError("Invalid catalog name")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import java.util.Locale

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -552,7 +553,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if (distinctAggChildSets.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our `RewriteDistinctAggregates` should take care this case.
throw new IllegalStateException(
throw SparkException.internalError(
"You hit a query analyzer bug. Please report your query to Spark user mailing list.")
}

Expand Down Expand Up @@ -782,27 +783,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil

case logical.Distinct(child) =>
throw new IllegalStateException(
throw SparkException.internalError(
"logical distinct operator should have been replaced by aggregate in the optimizer")
case logical.Intersect(left, right, false) =>
throw new IllegalStateException(
throw SparkException.internalError(
"logical intersect operator should have been replaced by semi-join in the optimizer")
case logical.Intersect(left, right, true) =>
throw new IllegalStateException(
throw SparkException.internalError(
"logical intersect operator should have been replaced by union, aggregate" +
" and generate operators in the optimizer")
case logical.Except(left, right, false) =>
throw new IllegalStateException(
throw SparkException.internalError(
"logical except operator should have been replaced by anti-join in the optimizer")
case logical.Except(left, right, true) =>
throw new IllegalStateException(
throw SparkException.internalError(
"logical except (all) operator should have been replaced by union, aggregate" +
" and generate operators in the optimizer")
case logical.ResolvedHint(child, hints) =>
throw new IllegalStateException(
throw SparkException.internalError(
"ResolvedHint operator should have been replaced by join hint in the optimizer")
case Deduplicate(_, child) if !child.isStreaming =>
throw new IllegalStateException(
throw SparkException.internalError(
"Deduplicate operator for non streaming data source should have been replaced " +
"by aggregate in the optimizer")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.broadcast
import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -402,7 +402,7 @@ trait CodegenSupport extends SparkPlan {
val errMsg = "Only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " +
"in its data producing loop."
if (Utils.isTesting) {
throw new IllegalStateException(errMsg)
throw SparkException.internalError(errMsg)
} else {
logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ case class AQEShuffleReadExec private(
case other => other
}
case _ =>
throw new IllegalStateException("operating on canonicalization plan")
throw SparkException.internalError("operating on canonicalization plan")
}
} else if (isCoalescedRead) {
// For coalesced shuffle read, the data distribution is not changed, only the number of
Expand All @@ -90,7 +90,7 @@ case class AQEShuffleReadExec private(
case r: RoundRobinPartitioning =>
r.copy(numPartitions = partitionSpecs.length)
case other @ SinglePartition =>
throw new IllegalStateException(
throw SparkException.internalError(
"Unexpected partitioning for coalesced shuffle read: " + other)
case _ =>
// Spark plugins may have custom partitioning and may replace this operator
Expand Down Expand Up @@ -163,7 +163,7 @@ case class AQEShuffleReadExec private(
assert(p.dataSize.isDefined)
p.dataSize.get
case p: PartialReducerPartitionSpec => p.dataSize
case p => throw new IllegalStateException(s"unexpected $p")
case p => throw SparkException.internalError(s"unexpected $p")
})
} else {
None
Expand Down Expand Up @@ -253,7 +253,7 @@ case class AQEShuffleReadExec private(
sendDriverMetrics()
stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
case _ =>
throw new IllegalStateException("operating on canonicalized plan")
throw SparkException.internalError("operating on canonicalized plan")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.concurrent.Future

import org.apache.spark.{FutureAction, MapOutputStatistics}
import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -179,7 +179,7 @@ case class ShuffleQueryStageExec(
case s: ShuffleExchangeLike => s
case ReusedExchangeExec(_, s: ShuffleExchangeLike) => s
case _ =>
throw new IllegalStateException(s"wrong plan for shuffle stage:\n ${plan.treeString}")
throw SparkException.internalError(s"wrong plan for shuffle stage:\n ${plan.treeString}")
}

def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
Expand Down Expand Up @@ -233,7 +233,7 @@ case class BroadcastQueryStageExec(
case b: BroadcastExchangeLike => b
case ReusedExchangeExec(_, b: BroadcastExchangeLike) => b
case _ =>
throw new IllegalStateException(s"wrong plan for broadcast stage:\n ${plan.treeString}")
throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}")
}

override protected def doMaterialize(): Future[Any] = {
Expand Down Expand Up @@ -273,7 +273,7 @@ case class TableCacheQueryStageExec(
@transient val inMemoryTableScan = plan match {
case i: InMemoryTableScanExec => i
case _ =>
throw new IllegalStateException(s"wrong plan for table cache stage:\n ${plan.treeString}")
throw SparkException.internalError(s"wrong plan for table cache stage:\n ${plan.treeString}")
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ExpressionEquals, UnsafeRow}
Expand Down Expand Up @@ -343,7 +344,7 @@ trait AggregateCodegenSupport
"length of at least one split function went over the JVM limit: " +
CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH
if (Utils.isTesting) {
throw new IllegalStateException(errMsg)
throw SparkException.internalError(errMsg)
} else {
logInfo(errMsg)
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
Expand Down Expand Up @@ -102,9 +103,9 @@ trait BaseAggregateExec extends UnaryExecNode with PartitioningPreservingUnaryEx
StatefulOperatorPartitioning.getCompatibleDistribution(
exprs, parts, conf) :: Nil

case _ =>
throw new IllegalStateException("Expected to set the number of partitions before " +
"constructing required child distribution!")
case _ => throw SparkException.internalError(
"Expected to set the number of partitions before " +
"constructing required child distribution!")
}
} else {
ClusteredDistribution(exprs) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -109,7 +109,7 @@ class ObjectAggregationIterator(
val defaultAggregationBuffer = createNewAggregationBuffer()
generateOutput(UnsafeRow.createFromByteArray(0, 0), defaultAggregationBuffer)
} else {
throw new IllegalStateException(
throw SparkException.internalError(
"This method should not be called when groupingExpressions is not empty.")
}
}
Expand Down
Loading

0 comments on commit d02fbba

Please sign in to comment.