Skip to content

Commit

Permalink
#544 Improve test Spark session configuration and efficiency.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Feb 13, 2025
1 parent 1806a16 commit 8569a31
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import scala.util.control.NonFatal

object StringUtils {

/** This is in order to make testing compatible for windows */
def stripLineEndings(str: String): String = {
str.stripMargin.linesIterator.mkString("").trim
}

/**
* Substitutes variables in form of \${var} with values in a given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ trait SparkTestBase {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

val hadoopTempDir: String = System.getProperty("java.io.tmpdir")

implicit val spark: SparkSession = {
SparkSession.getActiveSession.foreach(_.stop())
SparkSession.getActiveSession.foreach { spark =>
// Stopping the existing Spark session if it is not Delta-enabled
if (spark.conf.get("spark.sql.extensions") != "io.delta.sql.DeltaSparkSessionExtension") {
spark.stop()
}
}

SparkSession.builder()
.master("local[2]")
Expand All @@ -42,10 +45,4 @@ trait SparkTestBase {
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
}


def stripLineEndings(str: String): String = {
//make testing compatible for windows
str.stripMargin.linesIterator.mkString("").trim
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ trait SparkTestIcebergBase {
val hadoopTempDir: String = System.getProperty("java.io.tmpdir")

implicit val spark: SparkSession = {
SparkSession.getActiveSession.foreach(_.stop())
SparkSession.getActiveSession.foreach { spark =>
// Stopping the existing Spark session if it is not Iceberg-enabled
if (spark.conf.get("spark.sql.extensions") != "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") {
spark.stop()
}
}

SparkSession.builder()
.master("local[2]")
Expand All @@ -45,10 +50,4 @@ trait SparkTestIcebergBase {
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
}


def stripLineEndings(str: String): String = {
//make testing compatible for windows
str.stripMargin.linesIterator.mkString("").trim
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class MetastorePartitionSchemeSuite extends AnyWordSpec
.write
.format("delta")
.mode(SaveMode.Append).saveAsTable(table)
case _ => throw new IllegalArgumentException("Unsupported query type")
}

assert(mt.loadTable(None, None).count() == 6)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TextComparisonFixtu
import za.co.absa.pramen.core.reader.TableReaderJdbcNative
import za.co.absa.pramen.core.samples.RdbExampleTable
import za.co.absa.pramen.core.utils.SparkUtils
import za.co.absa.pramen.core.utils.StringUtils.stripLineEndings

import java.time.LocalDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.core.samples.SampleCaseClass2
import za.co.absa.pramen.core.utils.SparkUtils
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.StringUtils.stripLineEndings

import java.time.LocalDate

Expand Down

0 comments on commit 8569a31

Please sign in to comment.