diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala index 2806541847c..435060f0d4e 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala @@ -14,24 +14,31 @@ import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.test.ScalatestMiniClusterJobStatusCheckingOps.miniClusterWithServicesToOps import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider +import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder} +import java.util import scala.collection.mutable import scala.jdk.CollectionConverters._ +import scala.util.Random class JavaCollectionsSerializationTest extends AnyFunSuite with FlinkSpec with Matchers with Inside { private val processId = "aggregateFilterProcess" - private val process: CanonicalProcess = - ScenarioBuilder + private def process(expressionOption: Option[Expression] = None): CanonicalProcess = { + val scenario = ScenarioBuilder .streaming(processId) .parallelism(1) .source("start", "start") + + expressionOption + .map(expression => scenario.buildSimpleVariable("mapVariable", "mapVariable", expression)) + .getOrElse(scenario) .customNodeNoOutput( "delay", "delay", @@ -39,22 +46,24 @@ class JavaCollectionsSerializationTest extends AnyFunSuite with FlinkSpec with M "delay" -> "T(java.time.Duration).parse('PT30M')".spel ) .emptySink("end", "dead-end") + } + + private val record = Record( + id = "2", + map = mutable.Map(1 -> "a").asJava, + list = mutable.ListBuffer("abc").asJava, + set = mutable.Set("def").asJava + ) // In Scala 2.13 all java collections class wrappers were rewritten from case class to regular class. Now kryo does not // serialize them properly, so JavaWrapperScala2_13Registrar class was added to fix this issue. This test verifies // if we can serialize and deserialize records properly. test("should serialize record with java map, list and set") { - val record = Record( - id = "2", - map = mutable.Map(1 -> "a").asJava, - list = mutable.ListBuffer("abc").asJava, - set = mutable.Set("def").asJava - ) ResultsCollectingListenerHolder.withListener { collectingListener => val model = modelData(collectingListener, List(record)) - runScenario(model, process) + runScenario(model, process()) val result = collectingListener.results .nodeResults("end") @@ -64,6 +73,40 @@ class JavaCollectionsSerializationTest extends AnyFunSuite with FlinkSpec with M } } + test("should serialize java map without changing fields order") { + + ResultsCollectingListenerHolder.withListener { collectingListener => + val model = modelData(collectingListener, List(record)) + + val sampleMap = + ('a' to 'z') + .map(x => x -> Random.nextInt()) + .sortBy(_._2) + + runScenario( + model, + process( + Some( + sampleMap + .map { case (c, i) => s""""$c" : $i""" } + .mkString("{", ",", "}") + .spel + ) + ) + ) + + val linkedHashMap = new util.LinkedHashMap[String, AnyRef]() + sampleMap.foreach { case (char, int) => linkedHashMap.put(s"$char", Integer.valueOf(int)) } + + val result = collectingListener.results + .nodeResults("end") + .map(_.variableTyped[Map[_, _]]("mapVariable")) + + result shouldBe List(Some(linkedHashMap)) + + } + } + def modelData(collectingListener: ResultsCollectingListener[Any], list: List[Record] = List()): LocalModelData = { val sourceComponent = SourceFactory.noParamUnboundedStreamFactory[Record]( CollectionSource[Record](list, None, Typed.fromDetailedType[List[Record]]) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala index df03b2c5c15..c812f4d660e 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala @@ -24,7 +24,7 @@ case class TypedJavaMapSerializer( override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] = TypedJavaMapSerializer(serializers) - override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap() + override def createInstance(): jutil.Map[String, AnyRef] = new jutil.LinkedHashMap(serializers.length) override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala index e7ca4c1e5a8..6a51cfe43da 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala @@ -31,7 +31,7 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[ extends TypeInformation[T] { def this(fields: Map[String, TypeInformation[_]]) = { - this(fields.toArray.sortBy(_._1)) + this(fields.toArray) } override def isBasicType: Boolean = false diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala index f0b9b72f8c7..88ca01a2ed3 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala @@ -29,6 +29,7 @@ import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject._ import pl.touk.nussknacker.engine.process.typeinformation.testTypedObject.CustomTypedObject import java.time.{LocalDate, LocalDateTime, LocalTime} +import scala.collection.immutable.ListMap import scala.jdk.CollectionConverters._ @silent("deprecated") @@ -43,7 +44,7 @@ class TypingResultAwareTypeInformationDetectionSpec test("test map serialization") { val map = Map("intF" -> 11, "strF" -> "sdfasf", "longF" -> 111L, "fixedLong" -> 12L, "taggedString" -> "1") val typingResult = Typed.record( - Map( + ListMap( "intF" -> Typed[Int], "strF" -> Typed[String], "longF" -> Typed[Long], @@ -61,10 +62,10 @@ class TypingResultAwareTypeInformationDetectionSpec assertMapSerializers( typeInfo.createSerializer(executionConfigWithoutKryo), - ("fixedLong", new LongSerializer), ("intF", new IntSerializer), - ("longF", new LongSerializer), ("strF", new StringSerializer), + ("longF", new LongSerializer), + ("fixedLong", new LongSerializer), ("taggedString", new StringSerializer) ) } @@ -87,7 +88,7 @@ class TypingResultAwareTypeInformationDetectionSpec test("test context serialization") { val ctx = Context("11").copy(variables = - Map( + ListMap( "one" -> 11, "two" -> "ala", "three" -> Map("key" -> "value"), @@ -96,7 +97,7 @@ class TypingResultAwareTypeInformationDetectionSpec ) ) val vCtx = ValidationContext( - Map( + ListMap( "one" -> Typed[Int], "two" -> Typed[String], "three" -> Typed.record(Map("key" -> Typed[String]), Typed.typedClass[Map[String, Any]]), @@ -117,11 +118,11 @@ class TypingResultAwareTypeInformationDetectionSpec assertSerializersInContext( typeInfo.createSerializer(executionConfigWithoutKryo), - ("arrayOfInts", _ shouldBe new GenericArraySerializer(classOf[Integer], new IntSerializer)), - ("arrayOfStrings", _ shouldBe new StringArraySerializer), ("one", _ shouldBe new IntSerializer), + ("two", _ shouldBe new StringSerializer), ("three", assertMapSerializers(_, ("key", new StringSerializer))), - ("two", _ shouldBe new StringSerializer) + ("arrayOfStrings", _ shouldBe new StringArraySerializer), + ("arrayOfInts", _ shouldBe new GenericArraySerializer(classOf[Integer], new IntSerializer)), ) } diff --git a/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala b/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala index 649c777e593..7258855fdb8 100644 --- a/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala +++ b/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala @@ -9,6 +9,7 @@ import io.circe.Json._ import pl.touk.nussknacker.engine.api.DisplayJson import java.util.ServiceLoader import java.util.UUID +import scala.collection.immutable.ListMap import scala.jdk.CollectionConverters._ object ToJsonEncoder { @@ -66,7 +67,7 @@ case class ToJsonEncoder( case a: UUID => safeString(a.toString) case a: DisplayJson => a.asJson case a: scala.collection.Map[_, _] => encodeMap(a.toMap) - case a: java.util.Map[_, _] => encodeMap(a.asScala.toMap) + case a: java.util.Map[_, _] => encode(ListMap(a.asScala.toList: _*)) case a: Iterable[_] => fromValues(a.map(encode)) case a: Enum[_] => safeString(a.toString) case a: java.util.Collection[_] => fromValues(a.asScala.map(encode)) diff --git a/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala b/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala index 3602f4bda0f..f5d27f8cc5a 100644 --- a/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala +++ b/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala @@ -10,6 +10,7 @@ import java.time._ import java.util import java.util.UUID import scala.collection.immutable.{ListMap, ListSet} +import scala.util.Random class ToJsonEncoderSpec extends AnyFunSpec with Matchers { @@ -72,6 +73,19 @@ class ToJsonEncoderSpec extends AnyFunSpec with Matchers { encoder.encode(map) shouldEqual obj("key1" -> fromLong(1), "key2" -> fromString("value")) } + it("should encode maps as a json without changing fields order") { + val sampleMap = + ('a' to 'z') + .map(x => x -> Random.nextInt()) + .sortBy(_._2) + val linkedHashMap = new util.LinkedHashMap[String, AnyRef]() + sampleMap.foreach { case (char, int) => linkedHashMap.put(s"$char", Integer.valueOf(int)) } + + val encoded = encoder.encode(linkedHashMap) + val expected = obj(sampleMap.map { case (c, i) => s"$c" -> fromInt(i) }: _*) + encoded.asObject.map(_.keys.toList) shouldEqual expected.asObject.map(_.keys.toList) + } + it("should encode arrays as a json") { encoder.encode(Array(1, 2, 3)) shouldEqual arr(fromLong(1), fromLong(2), fromLong(3)) encoder.encode(Seq(Array(1, 2, 3))) shouldEqual arr(arr(fromLong(1), fromLong(2), fromLong(3)))