From bea7813b5e533ed49c1d9c514778a948239151a3 Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Mon, 7 Nov 2022 11:30:37 +0800 Subject: [PATCH 1/6] add ArrowConvertExtension --- .../oap/spark/sql/ArrowConvertExtension.scala | 70 +++++++++++++++++++ .../scala/com/intel/oap/GazellePlugin.scala | 17 +++-- 2 files changed, 81 insertions(+), 6 deletions(-) create mode 100644 arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala new file mode 100644 index 000000000..3c8ed3fb8 --- /dev/null +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.spark.sql + +import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat + +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat + +class ArrowConvertorExtension extends (SparkSessionExtensions => Unit) { + def apply(e: SparkSessionExtensions): Unit = { + e.injectPostHocResolutionRule(session => ArrowConvertorRule(session)) + } +} + +case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan resolveOperators { + // Write hive path + case s@ InsertIntoStatement( + l@ LogicalRelation(r@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _) + , _, _, _), _, _, _, _, _) => + InsertIntoStatement( + LogicalRelation( + HadoopFsRelation(r.location, r.partitionSchema, r.dataSchema, r.bucketSpec, + new ArrowFileFormat, r.options)(r.sparkSession), + l.output, l.catalogTable, l.isStreaming), + s.partitionSpec, s.userSpecifiedCols, s.query, s.overwrite, s.ifPartitionNotExists) + + // Write datasource path + case s@ InsertIntoHadoopFsRelationCommand( + _, _, _, _, _, _: ParquetFileFormat, _, _, _, _, _, _) => + InsertIntoHadoopFsRelationCommand( + s.outputPath, s.staticPartitions, s.ifPartitionNotExists, s.partitionColumns, + s.bucketSpec, new ArrowFileFormat, s.options, s.query, s.mode, s.catalogTable, + s.fileIndex, s.outputColumnNames) + + // Read path + case l@ LogicalRelation( + r@ HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _, _, _) => + LogicalRelation( + HadoopFsRelation(r.location, r.partitionSchema, r.dataSchema, r.bucketSpec, + new ArrowFileFormat, r.options)(r.sparkSession), + l.output, l.catalogTable, l.isStreaming) + + // INSERT HIVE DIR + case c@ InsertIntoDataSourceDirCommand(_, provider, _, _) if provider == "parquet" => + InsertIntoDataSourceDirCommand(c.storage, "arrow", c.query, c.overwrite) + } + } +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala index 5db03c416..19babeb54 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala @@ -23,11 +23,12 @@ import java.util.Objects import scala.language.implicitConversions +import com.intel.oap.GazellePlugin.GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME import com.intel.oap.GazellePlugin.GAZELLE_SESSION_EXTENSION_NAME import com.intel.oap.GazellePlugin.GAZELLE_WRITE_SESSION_EXTENSION_NAME import com.intel.oap.GazellePlugin.SPARK_SESSION_EXTS_KEY -import com.intel.oap.extension.ColumnarOverrides import com.intel.oap.extension.{OptimizerOverrides, StrategyOverrides} +import com.intel.oap.extension.ColumnarOverrides import org.apache.spark.SparkConf import org.apache.spark.SparkContext @@ -36,7 +37,7 @@ import org.apache.spark.api.plugin.ExecutorPlugin import org.apache.spark.api.plugin.PluginContext import org.apache.spark.api.plugin.SparkPlugin import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.StaticSQLConf class GazellePlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = { @@ -58,12 +59,14 @@ private[oap] class GazelleDriverPlugin extends DriverPlugin { def setPredefinedConfigs(conf: SparkConf): Unit = { val extensions = conf.getOption(SPARK_SESSION_EXTS_KEY).getOrElse("") if (extensions.contains(GAZELLE_SESSION_EXTENSION_NAME) || - extensions.contains(GAZELLE_WRITE_SESSION_EXTENSION_NAME)) { + extensions.contains(GAZELLE_WRITE_SESSION_EXTENSION_NAME) || + extensions.contains(GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME)) { throw new IllegalArgumentException("Spark gazelle extensions are already specified before " + "enabling Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SESSION_EXTS_KEY)) } conf.set(SPARK_SESSION_EXTS_KEY, - s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME,$extensions") + s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME," + + s"$GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME, $extensions") } } @@ -81,7 +84,7 @@ private[oap] class SparkConfImplicits(conf: SparkConf) { def enableGazellePlugin(): SparkConf = { if (conf.contains(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) { throw new IllegalArgumentException("A Spark plugin is already specified before enabling " + - "Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) + "Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) } conf.set(GazellePlugin.SPARK_SQL_PLUGINS_KEY, GazellePlugin.GAZELLE_PLUGIN_NAME) } @@ -100,12 +103,14 @@ private[oap] object GazellePlugin { // To enable GazellePlugin in production, set "spark.plugins=com.intel.oap.GazellePlugin" val SPARK_SQL_PLUGINS_KEY: String = "spark.plugins" val GAZELLE_PLUGIN_NAME: String = Objects.requireNonNull(classOf[GazellePlugin] - .getCanonicalName) + .getCanonicalName) val SPARK_SESSION_EXTS_KEY: String = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key val GAZELLE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( classOf[GazelleSessionExtensions].getCanonicalName) val GAZELLE_WRITE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( "com.intel.oap.spark.sql.ArrowWriteExtension") + val GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( + "com.intel.oap.spark.sql.ArrowConvertorExtension") /** * Specify all injectors that Gazelle is using in following list. */ From 3d35855cecfc75201d347413d67c6cbcc1753b72 Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Tue, 8 Nov 2022 14:34:03 +0800 Subject: [PATCH 2/6] do not convert parquet fileformat while writing to partitioned/bucketed/sorted output --- .../com/intel/oap/spark/sql/ArrowConvertExtension.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala index 3c8ed3fb8..95659f728 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala @@ -36,9 +36,10 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write hive path + // TODO: support writing with partitioned/bucketed/sorted column case s@ InsertIntoStatement( l@ LogicalRelation(r@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _) - , _, _, _), _, _, _, _, _) => + , _, _, _), _, _, _, _, _) if r.partitionSchema.isEmpty && r.bucketSpec.isEmpty => InsertIntoStatement( LogicalRelation( HadoopFsRelation(r.location, r.partitionSchema, r.dataSchema, r.bucketSpec, @@ -47,8 +48,10 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { s.partitionSpec, s.userSpecifiedCols, s.query, s.overwrite, s.ifPartitionNotExists) // Write datasource path + // TODO: support writing with partitioned/bucketed/sorted column case s@ InsertIntoHadoopFsRelationCommand( - _, _, _, _, _, _: ParquetFileFormat, _, _, _, _, _, _) => + _, _, _, _, _, _: ParquetFileFormat, _, _, _, _, _, _) + if s.partitionColumns.isEmpty && s.bucketSpec.isEmpty => InsertIntoHadoopFsRelationCommand( s.outputPath, s.staticPartitions, s.ifPartitionNotExists, s.partitionColumns, s.bucketSpec, new ArrowFileFormat, s.options, s.query, s.mode, s.catalogTable, From 63b41a8944c78849a7e8b8df2ee7414dfd91f695 Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Thu, 10 Nov 2022 20:05:37 +0800 Subject: [PATCH 3/6] fix cache failed --- .../oap/spark/sql/ArrowConvertExtension.scala | 38 +++++-------------- .../datasources/arrow/ArrowFileFormat.scala | 4 ++ 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala index 95659f728..c861de356 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala @@ -20,7 +20,7 @@ package com.intel.oap.spark.sql import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} @@ -35,39 +35,21 @@ class ArrowConvertorExtension extends (SparkSessionExtensions => Unit) { case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { - // Write hive path - // TODO: support writing with partitioned/bucketed/sorted column - case s@ InsertIntoStatement( - l@ LogicalRelation(r@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _) - , _, _, _), _, _, _, _, _) if r.partitionSchema.isEmpty && r.bucketSpec.isEmpty => - InsertIntoStatement( - LogicalRelation( - HadoopFsRelation(r.location, r.partitionSchema, r.dataSchema, r.bucketSpec, - new ArrowFileFormat, r.options)(r.sparkSession), - l.output, l.catalogTable, l.isStreaming), - s.partitionSpec, s.userSpecifiedCols, s.query, s.overwrite, s.ifPartitionNotExists) - // Write datasource path // TODO: support writing with partitioned/bucketed/sorted column - case s@ InsertIntoHadoopFsRelationCommand( - _, _, _, _, _, _: ParquetFileFormat, _, _, _, _, _, _) - if s.partitionColumns.isEmpty && s.bucketSpec.isEmpty => - InsertIntoHadoopFsRelationCommand( - s.outputPath, s.staticPartitions, s.ifPartitionNotExists, s.partitionColumns, - s.bucketSpec, new ArrowFileFormat, s.options, s.query, s.mode, s.catalogTable, - s.fileIndex, s.outputColumnNames) + case c: InsertIntoHadoopFsRelationCommand + if c.fileFormat.isInstanceOf[ParquetFileFormat] && + c.partitionColumns.isEmpty && c.bucketSpec.isEmpty => + c.copy(fileFormat = new ArrowFileFormat) // Read path case l@ LogicalRelation( - r@ HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _, _, _) => - LogicalRelation( - HadoopFsRelation(r.location, r.partitionSchema, r.dataSchema, r.bucketSpec, - new ArrowFileFormat, r.options)(r.sparkSession), - l.output, l.catalogTable, l.isStreaming) + r@ HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _, _, _) => + l.copy(relation = r.copy(fileFormat = new ArrowFileFormat)(session)) - // INSERT HIVE DIR - case c@ InsertIntoDataSourceDirCommand(_, provider, _, _) if provider == "parquet" => - InsertIntoDataSourceDirCommand(c.storage, "arrow", c.query, c.overwrite) + // INSERT DIR + case c: InsertIntoDataSourceDirCommand if c.provider == "parquet" => + c.copy(provider = "arrow") } } } diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala index edeaad712..6d048752a 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala @@ -197,6 +197,10 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab } override def shortName(): String = "arrow" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[ArrowFileFormat] } object ArrowFileFormat { From 8c788f7efe33a68ff53728750b3a0ab9c87d319b Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Mon, 28 Nov 2022 17:27:35 +0800 Subject: [PATCH 4/6] care about write codec --- .../oap/spark/sql/ArrowConvertExtension.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala index c861de356..40abd4efd 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowConvertExtension.scala @@ -17,7 +17,10 @@ package com.intel.oap.spark.sql +import java.util.Locale + import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -40,7 +43,21 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { case c: InsertIntoHadoopFsRelationCommand if c.fileFormat.isInstanceOf[ParquetFileFormat] && c.partitionColumns.isEmpty && c.bucketSpec.isEmpty => - c.copy(fileFormat = new ArrowFileFormat) + // TODO: Support pass parquet config and writing with other codecs + // `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and + // `spark.sql.parquet.compression.codec` + // are in order of precedence from highest to lowest. + val parquetCompressionConf = c.options.get(ParquetOutputFormat.COMPRESSION) + val codecName = c.options + .get("compression") + .orElse(parquetCompressionConf) + .getOrElse(session.sessionState.conf.parquetCompressionCodec) + .toLowerCase(Locale.ROOT) + if (codecName.equalsIgnoreCase("snappy")) { + c.copy(fileFormat = new ArrowFileFormat) + } else { + c + } // Read path case l@ LogicalRelation( From a3623c2d98b073e6215ec98cea3cef72396f63d3 Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Tue, 29 Nov 2022 16:06:54 +0800 Subject: [PATCH 5/6] disable convertor extension by default --- .../scala/com/intel/oap/GazellePlugin.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala index 19babeb54..1236c9f6b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala @@ -23,10 +23,7 @@ import java.util.Objects import scala.language.implicitConversions -import com.intel.oap.GazellePlugin.GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME -import com.intel.oap.GazellePlugin.GAZELLE_SESSION_EXTENSION_NAME -import com.intel.oap.GazellePlugin.GAZELLE_WRITE_SESSION_EXTENSION_NAME -import com.intel.oap.GazellePlugin.SPARK_SESSION_EXTS_KEY +import com.intel.oap.GazellePlugin.{GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED, GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME, GAZELLE_SESSION_EXTENSION_NAME, GAZELLE_WRITE_SESSION_EXTENSION_NAME, SPARK_SESSION_EXTS_KEY} import com.intel.oap.extension.{OptimizerOverrides, StrategyOverrides} import com.intel.oap.extension.ColumnarOverrides @@ -64,9 +61,14 @@ private[oap] class GazelleDriverPlugin extends DriverPlugin { throw new IllegalArgumentException("Spark gazelle extensions are already specified before " + "enabling Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SESSION_EXTS_KEY)) } - conf.set(SPARK_SESSION_EXTS_KEY, - s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME," + - s"$GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME, $extensions") + if (conf.getBoolean(GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED, false)) { + conf.set(SPARK_SESSION_EXTS_KEY, + s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME," + + s"$GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME, $extensions") + } else { + conf.set(SPARK_SESSION_EXTS_KEY, + s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME, $extensions") + } } } @@ -111,6 +113,7 @@ private[oap] object GazellePlugin { "com.intel.oap.spark.sql.ArrowWriteExtension") val GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( "com.intel.oap.spark.sql.ArrowConvertorExtension") + val GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED: String = "spark.oap.extension.convertor.enabled" /** * Specify all injectors that Gazelle is using in following list. */ From 16298f1c7e8f875024bc9b1a805d078e0a87cb6c Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Tue, 29 Nov 2022 16:10:47 +0800 Subject: [PATCH 6/6] add some comments --- .../core/src/main/scala/com/intel/oap/GazellePlugin.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala index 1236c9f6b..62464cca5 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala @@ -113,6 +113,10 @@ private[oap] object GazellePlugin { "com.intel.oap.spark.sql.ArrowWriteExtension") val GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( "com.intel.oap.spark.sql.ArrowConvertorExtension") + // This configuration is used to enable/disable the convertor from parquet to arrow format. + // Enabling the converter extension may result in inconsistent behavior with vanilla spark + // in some cases, such as metadata file, struct type support, ignoreMissingFiles and so on. + // Thus this configuration is disabled by default. val GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED: String = "spark.oap.extension.convertor.enabled" /** * Specify all injectors that Gazelle is using in following list.