Skip to content

Commit

Permalink
Feature/agent 705 spark 3 4 support (#816)
Browse files Browse the repository at this point in the history
* spark agent #705 Spark 3.4 support

* Spark 3.4 regression & compatibility fixes

* Remove debugging log

* Add 3.4 bundle pom

* Add back in write to another topic and read from multiple

* Update integration-tests/src/test/scala/za/co/absa/spline/harvester/LineageHarvesterSpec.scala

* disable SparkUI in examples

---------

Co-authored-by: Adam Cervenka <[email protected]>
Co-authored-by: Ryan Whitcomb <[email protected]>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent 6df9652 commit cd7ecab
Show file tree
Hide file tree
Showing 9 changed files with 1,472 additions and 18 deletions.
1,344 changes: 1,344 additions & 0 deletions bundle-3.4/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.spline.harvester.plugin.embedded
import org.apache.spark.Partition
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.datasources.{FileScanRDD, PartitionedFile}
import za.co.absa.spline.commons.reflect.ReflectionUtils
import za.co.absa.spline.harvester.builder._
import za.co.absa.spline.harvester.plugin.Plugin.{Precedence, ReadNodeInfo}
Expand All @@ -39,14 +39,22 @@ class RDDPlugin(

override def rddReadNodeProcessor: PartialFunction[RDD[_], ReadNodeInfo] = {
case fsr: FileScanRDD =>
val uris = fsr.filePartitions.flatMap(_.files.map(_.filePath))
val files = fsr.filePartitions.flatMap(_.files)
val uris = files.map(extractPath(_))
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
case hr: HadoopRDD[_, _] =>
val partitions = ReflectionUtils.extractValue[Array[Partition]](hr, "partitions_")
val uris = partitions.map(hadoopPartitionToUriString)
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
}

private def extractPath(file: PartitionedFile): String = {
val path = ReflectionUtils.extractValue[AnyRef](file, "filePath")
// for Spark 3.3 and lower path is a String
// for Spark 3.4 path is org.apache.spark.paths.SparkPath
path.toString
}

private def hadoopPartitionToUriString(hadoopPartition: Partition): String = {
val inputSplit = ReflectionUtils.extractValue[AnyRef](hadoopPartition, "inputSplit")
val fileSplitT = ReflectionUtils.extractValue[AnyRef](inputSplit, "t")
Expand All @@ -56,5 +64,4 @@ class RDDPlugin(

uri.toString
}

}
13 changes: 7 additions & 6 deletions examples/src/main/scala/za/co/absa/spline/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ abstract class SparkApp
tags: Seq[String] = Nil
) extends SQLImplicits with App {

private val sparkBuilder = SparkSession.builder()

sparkBuilder.appName(name)
sparkBuilder.master(master)
sparkBuilder.config("spark.driver.host", "localhost")
sparkBuilder.config("spline.example.tags", tags.mkString(","))
private val sparkBuilder =
SparkSession.builder()
.appName(name)
.master(master)
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.config("spline.example.tags", tags.mkString(","))

for ((k, v) <- conf) sparkBuilder.config(k, v)

Expand Down
17 changes: 16 additions & 1 deletion integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-${elasticsearch.spark.sufix}_${scala.binary.version}</artifactId>
<version>8.2.2</version>
<version>8.9.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -267,6 +267,21 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.4</id>
<properties>
<guava.version>16.0.1</guava.version>
<elasticsearch.spark.sufix>30</elasticsearch.spark.sufix>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.4_${scala.binary.version}</artifactId>
<version>1.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.spline

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
Expand All @@ -24,9 +25,13 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.spline.commons.io.TempDirectory
import za.co.absa.spline.commons.version.Version.VersionStringInterpolator
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}
import za.co.absa.spline.test.fixture.SparkFixture
import za.co.absa.spline.test.fixture.spline.SplineFixture

import scala.concurrent.Future

class BasicIntegrationTests extends AsyncFlatSpec
with Matchers
with SparkFixture
Expand Down Expand Up @@ -117,15 +122,41 @@ class BasicIntegrationTests extends AsyncFlatSpec
.write.mode(Append).saveAsTable(tableName)
}

(plan2, _) <- captor.lineageOf {
// Spark 3.4+ is creating 2 commands for both writes here so we need to ignore one
// We only want the one that is from CreateDataSourceTableAsSelectCommand
// The one we ignore here is an extra InsertIntoHadoopFsRelationCommand
// They can come out of order so we need to filter out which one is which.
(plan2, _) <- if (ver"$SPARK_VERSION" >= ver"3.4.0") {
captor.lineageOf {
Thread.sleep(5000)
}
} else Future[(ExecutionPlan, Seq[ExecutionEvent])](null, null)

(plan3, _) <- captor.lineageOf {
spark
.read.table(tableName)
.write.mode(Overwrite).saveAsTable("somewhere")
}

(plan4, _) <- if (ver"$SPARK_VERSION" >= ver"3.4.0") {
captor.lineageOf {
Thread.sleep(5000)
}
} else Future[(ExecutionPlan, Seq[ExecutionEvent])](null, null)
} yield {
println("yield")
val writeUri = plan1.operations.write.outputSource
val readUri = plan2.operations.reads.head.inputSources.head

val writePlan = Seq(plan1, plan2)
.filter(null.!=)
.find(_.operations.write.name == "CreateDataSourceTableAsSelectCommand")
.get
val readPlan = Seq(plan3, plan4)
.filter(null.!=)
.find(_.operations.write.name == "CreateDataSourceTableAsSelectCommand")
.get

val writeUri = writePlan.operations.write.outputSource
val readUri = readPlan.operations.reads.head.inputSources.head

writeUri shouldEqual readUri
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DeltaSpec extends AsyncFlatSpec
private val deltaPath = TempDirectory(prefix = "delta", pathOnly = true).deleteOnExit().toURI.toString

it should "support Delta Lake as a source" taggedAs
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
Expand Down Expand Up @@ -79,7 +79,7 @@ class DeltaSpec extends AsyncFlatSpec
}

it should "support insert into existing Delta Lake table" taggedAs
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { lineageCaptor =>
val testData: DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class KafkaSinkSpec
EmbeddedKafka.stop()
}

it should "support Kafka as a write source" in {
it should "support Kafka as a write source while reading from multiple Kafka read sources" in {
val topicName = "bananas"
val otherTopicName = "anotherTopic"

withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
Expand All @@ -74,6 +75,16 @@ class KafkaSinkSpec
.option("topic", topicName)
.save())

// Write to another topic seeding lineage for a downstream read
(_, _) <- captor.lineageOf(
testData
.selectExpr("CAST (name AS STRING) AS value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("topic", otherTopicName)
.save())

(plan2, _) <- captor.lineageOf(
reader
.option("subscribe", s"$topicName,anotherTopic")
Expand All @@ -98,6 +109,7 @@ class KafkaSinkSpec

plan2.operations.reads.head.extra("sourceType") shouldBe Some("kafka")
plan2.operations.reads.head.inputSources should contain(s"kafka:$topicName")
plan2.operations.reads.head.inputSources should contain(s"kafka:$otherTopicName")
plan2.operations.reads.head.params should contain key "subscribe"

plan3.operations.reads.head.extra("sourceType") shouldBe Some("kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import za.co.absa.spline.test.fixture.spline.SplineFixture
import za.co.absa.spline.test.fixture.{SparkDatabaseFixture, SparkFixture}

import java.util.UUID
import scala.concurrent.Future
import scala.language.reflectiveCalls
import scala.util.Try

Expand Down Expand Up @@ -376,11 +377,25 @@ class LineageHarvesterSpec extends AsyncFlatSpec
val df = spark.createDataset(Seq(TestRow(1, 2.3, "text")))

for {
(plan, _) <- captor.lineageOf {
(plan1, _) <- captor.lineageOf {
df.createOrReplaceTempView("tempView")
spark.sql("CREATE TABLE users_sales AS SELECT i, d, s FROM tempView ")
}
// Spark 3.4+ is creating 2 commands for this CTAS here so we need to ignore one
// We only want the one that is from CreateHiveTableAsSelectCommand
// The one we ignore here is an extra InsertIntoHiveTableCommand
// They can come out of order so we need to filter out which one is which.
(plan2, _) <- if (ver"$SPARK_VERSION" >= ver"3.4.0") {
captor.lineageOf {
Thread.sleep(5000)
}
} else Future[(ExecutionPlan, Seq[ExecutionEvent])](null, null)
} yield {
val plan = Seq(plan1, plan2)
.filter(null.!=)
.find(_.operations.write.name == "CreateHiveTableAsSelectCommand")
.get

val writeOperation = plan.operations.write
writeOperation.id shouldEqual "op-0"
writeOperation.append shouldEqual false
Expand Down Expand Up @@ -500,7 +515,8 @@ class LineageHarvesterSpec extends AsyncFlatSpec
plan should not be null
event.durationNs should be(empty)
event.error should not be empty
event.error.get.toString should include(s"path ${tmpLocal.toURI.toString.stripSuffix("/")} already exists")
event.error.get.toString.toLowerCase should
include(s"path ${tmpLocal.toURI.toString.stripSuffix("/")} already exists".toLowerCase)
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
<spark-31.version>3.1.3</spark-31.version>
<spark-32.version>3.2.3</spark-32.version>
<spark-33.version>3.3.1</spark-33.version>
<spark-34.version>3.4.1</spark-34.version>

<!-- Delta -->

Expand All @@ -100,6 +101,8 @@
<delta-10.version>1.0.0</delta-10.version>
<delta-20.version>2.0.0</delta-20.version>
<delta-21.version>2.1.0</delta-21.version>
<delta-24.version>2.4.0</delta-24.version>


<!-- Cassandra -->
<cassandra-connector.version>${cassandra-connector-24.version}</cassandra-connector.version>
Expand All @@ -108,6 +111,7 @@
<cassandra-connector-31.version>3.1.0</cassandra-connector-31.version>
<cassandra-connector-32.version>3.2.0</cassandra-connector-32.version>
<cassandra-connector-33.version>3.3.0</cassandra-connector-33.version>
<cassandra-connector-34.version>3.4.1</cassandra-connector-34.version>

<spark-excel.version>0.13.7</spark-excel.version>

Expand Down Expand Up @@ -210,6 +214,7 @@
<groupId>com.github.cerveada</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<stdout>I</stdout>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
Expand Down Expand Up @@ -815,6 +820,29 @@
</dependencyManagement>
</profile>

<profile>
<id>spark-3.4</id>
<properties>
<spark.version>${spark-34.version}</spark.version>
<delta.version>${delta-24.version}</delta.version>
<spark-excel.version>${spark-34.version}_0.19.0</spark-excel.version>
<cassandra-connector.version>${cassandra-connector-34.version}</cassandra-connector.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</profile>



<!-- Binary compatibility checking profile -->

<profile>
Expand Down

0 comments on commit cd7ecab

Please sign in to comment.