diff --git a/spark-connector/common/dependency-reduced-pom.xml b/spark-connector/common/dependency-reduced-pom.xml
index 8c1d669c..8f957b66 100644
--- a/spark-connector/common/dependency-reduced-pom.xml
+++ b/spark-connector/common/dependency-reduced-pom.xml
@@ -3,7 +3,7 @@
spark-connector
com.aliyun.odps
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
4.0.0
spark-odps-common
@@ -11,25 +11,25 @@
org.apache.spark
spark-core_2.12
- 3.3.1
+ 3.5.1
compile
org.apache.spark
spark-sql_2.12
- 3.3.1
+ 3.5.1
compile
org.apache.spark
spark-hive_2.12
- 3.3.1
+ 3.5.1
compile
org.apache.spark
spark-tags_2.12
- 3.3.1
+ 3.5.1
compile
diff --git a/spark-connector/common/pom.xml b/spark-connector/common/pom.xml
index 2d21527d..a422daf0 100644
--- a/spark-connector/common/pom.xml
+++ b/spark-connector/common/pom.xml
@@ -5,7 +5,7 @@
spark-connector
com.aliyun.odps
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
../pom.xml
4.0.0
diff --git a/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsPartitionReaderFactory.scala b/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsPartitionReaderFactory.scala
index ff973865..5eeb489d 100644
--- a/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsPartitionReaderFactory.scala
+++ b/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsPartitionReaderFactory.scala
@@ -35,6 +35,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.odps.vectorized._
import org.apache.spark.sql.types._
@@ -59,7 +60,7 @@ case class OdpsPartitionReaderFactory(broadcastedConf: Broadcast[SerializableCon
asyncRead: Boolean)
extends PartitionReaderFactory with Logging {
- private val output = readDataSchema.toAttributes ++ readPartitionSchema.toAttributes
+ private val output = DataTypeUtils.toAttributes(readDataSchema) ++ DataTypeUtils.toAttributes(readPartitionSchema)
private val allNames = output.map(_.name)
private val allTypes = output.map(_.dataType)
private val arrowDataFormat = new DataFormat(DataFormat.Type.ARROW, DataFormat.Version.V5)
diff --git a/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/execution/exchange/OdpsShuffleExchangeExec.scala b/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/execution/exchange/OdpsShuffleExchangeExec.scala
index d346e30f..b8fded38 100644
--- a/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/execution/exchange/OdpsShuffleExchangeExec.scala
+++ b/spark-connector/common/src/main/scala/org/apache/spark/sql/odps/execution/exchange/OdpsShuffleExchangeExec.scala
@@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
-import org.apache.spark.sql.execution.{PartitionIdPassthrough, RecordBinaryComparator, SQLExecution, ShufflePartitionSpec, ShuffledRowRDD, SparkPlan, UnsafeExternalRowSorter, UnsafeRowSerializer}
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.execution.{RecordBinaryComparator, SQLExecution, ShufflePartitionSpec, ShuffledRowRDD, SparkPlan, UnsafeExternalRowSorter, UnsafeRowSerializer}
import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_NUM, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
@@ -128,6 +129,16 @@ case class OdpsShuffleExchangeExec(
override protected def withNewChildInternal(newChild: SparkPlan): OdpsShuffleExchangeExec = {
copy(child = newChild)
}
+
+ override def advisoryPartitionSize: Option[Long] = {
+ val dataSize = metrics("dataSize").value
+ val numPartitions = metrics("numPartitions").value
+ if (dataSize > 0 && numPartitions > 0) {
+ Some(dataSize / numPartitions)
+ } else {
+ Some(64 * 1024 * 1024L) // 64MB
+ }
+ }
}
object ShuffleExchangeExec {
@@ -303,7 +314,7 @@ object ShuffleExchangeExec {
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
val sorter = UnsafeExternalRowSorter.createWithRecordComparator(
- StructType.fromAttributes(outputAttributes),
+ DataTypeUtils.fromAttributes(outputAttributes),
recordComparatorSupplier,
prefixComparator,
prefixComputer,
diff --git a/spark-connector/datasource/dependency-reduced-pom.xml b/spark-connector/datasource/dependency-reduced-pom.xml
index b55b3f3b..682fa657 100644
--- a/spark-connector/datasource/dependency-reduced-pom.xml
+++ b/spark-connector/datasource/dependency-reduced-pom.xml
@@ -3,7 +3,7 @@
spark-connector
com.aliyun.odps
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
4.0.0
spark-odps-datasource
diff --git a/spark-connector/datasource/pom.xml b/spark-connector/datasource/pom.xml
index 160c0218..939dee05 100644
--- a/spark-connector/datasource/pom.xml
+++ b/spark-connector/datasource/pom.xml
@@ -5,7 +5,7 @@
spark-connector
com.aliyun.odps
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
../pom.xml
4.0.0
@@ -16,7 +16,7 @@
com.aliyun.odps
spark-odps-common
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
diff --git a/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/OdpsWriteBuilder.scala b/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/OdpsWriteBuilder.scala
index 63f77972..e750f01f 100644
--- a/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/OdpsWriteBuilder.scala
+++ b/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/OdpsWriteBuilder.scala
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
@@ -199,8 +201,8 @@ case class OdpsWriteBuilder(
assert(queryId != null, "Missing query ID")
SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name),
- s"when inserting into $tableIdent", caseSensitiveAnalysis)
- DataSource.validateSchema(schema)
+ caseSensitiveAnalysis)
+ DataSource.validateSchema(schema, SQLConf.get)
}
private def createWriteJobDescription(sparkSession: SparkSession,
@@ -210,7 +212,7 @@ case class OdpsWriteBuilder(
options: Map[String, String],
odpsOptions: OdpsOptions,
supportArrowWriter: Boolean): WriteJobDescription = {
- val outputColumns = schema.toAttributes
+ val outputColumns = DataTypeUtils.toAttributes(schema)
val outputPartitionColumns =
outputColumns.filter(c => partitionSchema.getFieldIndex(c.name).isDefined)
val outputPartitionSet = AttributeSet(outputPartitionColumns)
diff --git a/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/extension/OdpsExtensions.scala b/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/extension/OdpsExtensions.scala
index 5bfec8a7..95fb7ad9 100644
--- a/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/extension/OdpsExtensions.scala
+++ b/spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/extension/OdpsExtensions.scala
@@ -60,7 +60,7 @@ class OdpsExtensions extends (SparkSessionExtensions => Unit) {
}
ShowColumnsCommand(table)
- case i@InsertIntoStatement(r@DataSourceV2Relation(table: OdpsTable, _, _, _, _), _, _, _, _, _)
+ case i@InsertIntoStatement(r@DataSourceV2Relation(table: OdpsTable, _, _, _, _), _, _, _, _, _, /* byName */false)
if i.query.resolved =>
if (i.partitionSpec.nonEmpty && !r.options.containsKey(WRITE_ODPS_STATIC_PARTITION)) {
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
@@ -125,7 +125,7 @@ class OdpsExtensions extends (SparkSessionExtensions => Unit) {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transform {
case AppendData(
- r @ DataSourceV2Relation(table: OdpsTable, _ , _, _, options), query, writeOptions, isByName, write)
+ r @ DataSourceV2Relation(table: OdpsTable, _ , _, _, options), query, writeOptions, isByName, write, /* analyzedQuery */ null)
if !writeOptions.contains(WRITE_ODPS_TABLE_RESOLVED) =>
val newQuery = insertRepartition(query, table)
var newOptions = writeOptions + Tuple2(WRITE_ODPS_TABLE_RESOLVED, "true")
diff --git a/spark-connector/hive/dependency-reduced-pom.xml b/spark-connector/hive/dependency-reduced-pom.xml
index b8f4c45b..2144caae 100644
--- a/spark-connector/hive/dependency-reduced-pom.xml
+++ b/spark-connector/hive/dependency-reduced-pom.xml
@@ -3,7 +3,7 @@
spark-connector
com.aliyun.odps
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
4.0.0
spark-odps-hive_2.12
diff --git a/spark-connector/hive/pom.xml b/spark-connector/hive/pom.xml
index 1c18dbd6..9e4d1620 100644
--- a/spark-connector/hive/pom.xml
+++ b/spark-connector/hive/pom.xml
@@ -22,7 +22,7 @@
spark-connector
com.aliyun.odps
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
../pom.xml
@@ -32,7 +32,7 @@
com.aliyun.odps
spark-odps-common
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 4390dc89..dc050568 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.odps.OdpsClient
+import java.util.Locale
+
private[sql] class HiveSessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
@@ -48,4 +50,8 @@ private[sql] class HiveSessionCatalog(
// TODO: Load defaults in cluster mode
.getOrCreate()
.odps().getDefaultProject)
+
+ private def formatDatabaseName(name: String): String = {
+ if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+ }
}
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index e4d1f222..5ce7f4c0 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -99,7 +99,7 @@ class HiveSessionStateBuilder(
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
new DetermineTableStats(session) +:
- PreprocessTableCreation(session) +:
+ PreprocessTableCreation(catalog) +:
PreprocessTableInsertion +:
DataSourceAnalysis +:
HiveAnalysis +:
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index bf5e010e..00dfb9c1 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -79,7 +79,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
// handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its
// children, hence not matched directly by previous HiveTableRelation case.
- case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
+ case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, false)
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
i.copy(table = hiveTableWithStats(relation))
}
@@ -94,7 +94,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoStatement(
- r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists)
+ r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists, false)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoOdpsTable(r.tableMeta, partSpec, query, overwrite,
ifPartitionNotExists, query.output.map(_.name))
@@ -120,7 +120,7 @@ private[hive] trait HiveStrategies {
*/
object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case ScanOperation(projects, filters, relation: HiveTableRelation) =>
+ case ScanOperation(projects, filters, null, relation: HiveTableRelation) =>
// Filters on this relation fall into four categories based
// on where we can use them to avoid reading unneeded data:
// - partition keys only - used to prune directories to read
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 42b98090..e1c77c06 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -360,7 +360,8 @@ private[hive] class HiveClientImpl(
if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
// SPARK-29260: Enable supported versions once it support altering database location.
if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
- throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
+ // version.fullVersion
+ throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
}
}
val hiveDb = toHiveDatabase(database)
@@ -516,7 +517,7 @@ private[hive] class HiveClientImpl(
case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW
case unsupportedType =>
val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ")
- throw QueryCompilationErrors.hiveTableTypeUnsupportedError(tableTypeStr)
+ throw QueryCompilationErrors.hiveTableTypeUnsupportedError(h.getTableName, tableTypeStr)
},
schema = schema,
partitionColumnNames = partCols.map(_.name).toSeq,
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 67bb72c1..036af155 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -20,17 +20,15 @@ package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
-import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet}
+import java.util.{Locale, ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit
-
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, Index, MetaException, PrincipalType, ResourceType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, FunctionType, Index, MetaException, PrincipalType, ResourceType, ResourceUri, Function => HiveFunction}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
@@ -38,7 +36,6 @@ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
-
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
@@ -982,7 +979,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
def unapply(expr: Expression): Option[Attribute] = {
expr match {
case attr: Attribute => Some(attr)
- case Cast(child @ IntegralType(), dt: IntegralType, _, _)
+ case Cast(child, dt: IntegralType, _, _)
if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child)
case _ => None
}
@@ -1146,8 +1143,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
// client-side filtering cannot be used with TimeZoneAwareExpression.
def hasTimeZoneAwareExpression(e: Expression): Boolean = {
e.exists {
- case cast: CastBase => cast.needsTimeZone
- case tz: TimeZoneAwareExpression => !tz.isInstanceOf[CastBase]
+ case cast: Cast => cast.needsTimeZone
+ case tz: TimeZoneAwareExpression => !tz.isInstanceOf[Cast]
case _ => false
}
}
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 15c172a6..022a6898 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -21,15 +21,13 @@ import java.io.File
import java.lang.reflect.InvocationTargetException
import java.net.{URL, URLClassLoader}
import java.util
-
import scala.util.Try
-
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.ShimLoader
-
+import org.apache.hadoop.util.VersionInfo
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
@@ -42,6 +40,9 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils, VersionUtils}
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
private[hive] object IsolatedClientLoader extends Logging {
+
+ def isHadoop3: Boolean = VersionUtils.majorVersion(VersionInfo.getVersion) == 3
+
/**
* Creates isolated Hive client loaders by downloading the requested version from maven.
*/
@@ -68,7 +69,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
// version cannot be resolved.
- val fallbackVersion = if (VersionUtils.isHadoop3) {
+ val fallbackVersion = if (isHadoop3) {
"3.3.2"
} else {
"2.7.4"
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsSqlParser.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsSqlParser.scala
index 73d9e3af..91a72aa9 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsSqlParser.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsSqlParser.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql.hive.execution
import java.util.Locale
-
-import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.parser.{ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
@@ -41,11 +40,15 @@ class OdpsSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder() {
// }.getOrElse(TruncateTable(table))
// }
+ def getId(ctx: PrimitiveDataTypeContext): IdentifierContext = {
+ ctx.getRuleContext(classOf[SqlBaseParser.IdentifierContext], 0)
+ }
+
/**
* Resolve/create a primitive type.
*/
override def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType = withOrigin(ctx) {
- val dataType = ctx.identifier.getText.toLowerCase(Locale.ROOT)
+ val dataType = getId(ctx).getText.toLowerCase(Locale.ROOT)
(dataType, ctx.INTEGER_VALUE().asScala.toList) match {
case ("boolean", Nil) => BooleanType
case ("tinyint" | "byte", Nil) => ByteType
diff --git a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsTableWriter.scala b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsTableWriter.scala
index 94c4be2a..58db968a 100644
--- a/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsTableWriter.scala
+++ b/spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/OdpsTableWriter.scala
@@ -208,7 +208,7 @@ object OdpsTableWriter extends Logging {
logError(s"Data source write $identifier aborted.")
cause match {
// Only wrap non fatal exceptions.
- case NonFatal(e) => throw QueryExecutionErrors.writingJobAbortedError(e)
+ case NonFatal(e) => throw QueryExecutionErrors.writingJobFailedError(e)
case _ => throw cause
}
}
diff --git a/spark-connector/pom.xml b/spark-connector/pom.xml
index 3f794df8..40d9ebed 100644
--- a/spark-connector/pom.xml
+++ b/spark-connector/pom.xml
@@ -11,7 +11,7 @@
com.aliyun.odps
spark-connector
- 3.3.1-odps0.43.0
+ 3.5.1-odps0.45.5
common
datasource
@@ -23,7 +23,7 @@
4.0.0
0.45.5-public
0.45.5-public
- 3.3.1
+ 3.5.1
2.12.10
2.12
1.8
diff --git a/spark-datasource-v3.3/pom.xml b/spark-datasource-v3.3/pom.xml
index cf848bb8..d59866de 100644
--- a/spark-datasource-v3.3/pom.xml
+++ b/spark-datasource-v3.3/pom.xml
@@ -18,7 +18,7 @@
3.3.0
2.12.10
2.12
- 0.43.1-public-upsert-SNAPSHOT
+ 0.45.5-public
4.0.0
1.8
512m
diff --git a/trino-connector/pom.xml b/trino-connector/pom.xml
index 146f30c9..0a3d0b29 100644
--- a/trino-connector/pom.xml
+++ b/trino-connector/pom.xml
@@ -40,7 +40,7 @@
2.10
- **/*Test.java 只包含测试类
+ **/*Test.java