Skip to content

Error with repeated fields in UDF functions #340

@ps-akoshevoy

Description

@ps-akoshevoy

Env:

  • Spark 3.3.2
  • sparksql33-scalapb0_11 1.0.3

Problem:
When I use protobuf scala case class with repeated fields as input parameter in UDF function, I get next error:

'Project [unresolvedalias(FramelessUdf([email protected]), Some(org.apache.spark.sql.Column$$Lambda$1470/0x0000000801614ee0@7f53a31f))]
+- LocalRelation [id#0, names#1, count#2]

org.apache.spark.sql.AnalysisException: unresolved operator 'Project [unresolvedalias(FramelessUdf([email protected]), Some(org.apache.spark.sql.Column$$Lambda$1470/0x0000000801614ee0@7f53a31f))];
'Project [unresolvedalias(FramelessUdf([email protected]), Some(org.apache.spark.sql.Column$$Lambda$1470/0x0000000801614ee0@7f53a31f))]
+- LocalRelation [id#0, names#1, count#2]

	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:57)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:56)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$48(CheckAnalysis.scala:620)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$48$adapted(CheckAnalysis.scala:618)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:618)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:214)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
	at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
	at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
	at com.github.scalapb.RecordStreamingQueryTest.$anonfun$new$2(RecordStreamingQueryTest.scala:33)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.featurespec.AnyFeatureSpecLike$$anon$1.apply(AnyFeatureSpecLike.scala:313)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.featurespec.AnyFeatureSpec.withFixture(AnyFeatureSpec.scala:1827)
	at org.scalatest.featurespec.AnyFeatureSpecLike.invokeWithFixture$1(AnyFeatureSpecLike.scala:311)
	at org.scalatest.featurespec.AnyFeatureSpecLike.$anonfun$runTest$1(AnyFeatureSpecLike.scala:323)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.featurespec.AnyFeatureSpecLike.runTest(AnyFeatureSpecLike.scala:323)
	at org.scalatest.featurespec.AnyFeatureSpecLike.runTest$(AnyFeatureSpecLike.scala:305)
	at org.scalatest.featurespec.AnyFeatureSpec.runTest(AnyFeatureSpec.scala:1827)
	at org.scalatest.featurespec.AnyFeatureSpecLike.$anonfun$runTests$1(AnyFeatureSpecLike.scala:382)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.featurespec.AnyFeatureSpecLike.runTests(AnyFeatureSpecLike.scala:382)
	at org.scalatest.featurespec.AnyFeatureSpecLike.runTests$(AnyFeatureSpecLike.scala:381)
	at org.scalatest.featurespec.AnyFeatureSpec.runTests(AnyFeatureSpec.scala:1827)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.featurespec.AnyFeatureSpec.org$scalatest$featurespec$AnyFeatureSpecLike$$super$run(AnyFeatureSpec.scala:1827)
	at org.scalatest.featurespec.AnyFeatureSpecLike.$anonfun$run$1(AnyFeatureSpecLike.scala:423)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.featurespec.AnyFeatureSpecLike.run(AnyFeatureSpecLike.scala:423)
	at org.scalatest.featurespec.AnyFeatureSpecLike.run$(AnyFeatureSpecLike.scala:422)
	at com.github.scalapb.RecordStreamingQueryTest.org$scalatest$BeforeAndAfterAll$$super$run(RecordStreamingQueryTest.scala:13)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at com.github.scalapb.RecordStreamingQueryTest.run(RecordStreamingQueryTest.scala:13)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)

Testscenario:

record.proto

syntax = "proto3";
option java_package = "com.github.scalapb";

message Record {
  int32 id = 1;
  repeated string names = 2;
}

TestClass:

      val rowData = java.util.List.of(Row(1, Seq("name1"), 1))
      val schema = StructType(
        Array(
          StructField("id", DataTypes.IntegerType, nullable                    = true),
          StructField("names", DataTypes.createArrayType(StringType), nullable = true),
          StructField("count", DataTypes.IntegerType, nullable                 = true)
        )
      )

      val df = spark.createDataFrame(rowData, schema)

      import scalapb.spark.Implicits._
      val my_udf = ProtoSQL.udf { record: Record =>
        //serialization to protobuf byte array with schema registry
      }

      df.select(my_udf(struct(col("id"), col("names")).as[Record]))
        .show(false)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions