From e6a94db807d3ae57a408e507050ce2060f9ab98d Mon Sep 17 00:00:00 2001 From: Szymon Bogusz Date: Thu, 13 Feb 2025 09:27:11 +0100 Subject: [PATCH] Cleaning code and added tests to check how key variable functions in more complex cases --- docs/Changelog.md | 1 + .../nussknacker/engine/flink/util/keyed.scala | 13 -- .../aggregate/TransformersTest.scala | 163 +++++++++++++++--- .../EmitWhenEventLeftAggregatorFunction.scala | 2 +- .../transformer/aggregate/transformers.scala | 6 +- 5 files changed, 143 insertions(+), 42 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index cd4fd482fc0..f2465d0f497 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -91,6 +91,7 @@ * Kafka source has "offset reset strategy" parameter that controls starting point for reading events. * Configuration entry `kafkaEspProperties.forceLatestRead` is replaced with `kafkaEspProperties.defaultOffsetResetStrategy` * [#7545](https://github.com/TouK/nussknacker/pull/7545) Added `useMiniClusterForDeployment` option allowing to run Flink scenarios on Flink MiniCluster +* [#7553](https://github.com/TouK/nussknacker/pull/7553) Key variable created in window component doesn't have to be string ## 1.18 diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/keyed.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/keyed.scala index b87cb7ac572..63cdcf2c292 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/keyed.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/keyed.scala @@ -103,19 +103,6 @@ object keyed { private lazy val interpreter = prepareInterpreter(key, value) -// private def transformKey(keyValue: CharSequence): String = { -// Option(keyValue).map(_.toString).getOrElse("") -// } - -// protected def prepareInterpreter( -// key: LazyParameter[String], -// value: LazyParameter[Value] -// ): Context => KeyedValue[Key, Value] = { -// toEvaluateFunctionConverter.toEvaluateFunction[KeyedValue[Key, Value]]( -// key.product(value).map(tuple => KeyedValue(tuple._1, tuple._2)) -// ) -// } - override protected def interpret(ctx: Context): KeyedValue[Key, Value] = interpreter(ctx) } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index bda69d1f7a7..4e06a49c290 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -70,10 +70,14 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins endVariablesForKey(key).map(_.variableTyped[T]("fragmentResult").get) } - def endVariablesForKey(key: String): List[TestProcess.ResultContext[Any]] = { + def fragmentResultEndVariable[K <: Any, T <: AnyRef](key: K): List[T] = { + endVariablesForKey(key).map(_.variableTyped[T]("fragmentResult").get) + } + + def endVariablesForKey[K <: Any](key: K): List[TestProcess.ResultContext[Any]] = { collectingListener.results .nodeResults("end") - .filter(_.variableTyped[String](VariableConstants.KeyVariableName).contains(key)) + .filter(_.variableTyped[K](VariableConstants.KeyVariableName).contains(key)) } def keyVariables[T <: AnyRef]: List[T] = { @@ -172,27 +176,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins } } - test("key variable can be something else than string") { - ResultsCollectingListenerHolder.withListener { collectingListener => - val model = - modelData( - collectingListener, - List( - GenericRecordHours(1, 0, 1, "a"), - GenericRecordHours("2", 1, 2, "b"), - GenericRecordHours(List(1), 2, 5, "b"), - GenericRecordHours(Map("a" -> 1), 3, 2, "c"), - GenericRecordHours(1.2, 4, 5, "b") - ) - ) - val testScenario = sliding("#AGG.first", "#input.str", emitWhenEventLeft = false) - - runScenario(model, testScenario) - val aggregateVariables = collectingListener.keyVariables - aggregateVariables shouldBe List(1, "2", List(1), Map("a" -> 1), 1.2) - } - } - test("sum aggregate with zeros") { val id = "1" ResultsCollectingListenerHolder.withListener { collectingListener => @@ -887,6 +870,140 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins } } + val intKey: Int = 1 + val stringKey: String = "2" + val mapKey: Map[String, Int] = Map("a" -> 1) + + def modelDataToCheckAggregations(collectingListener: ResultsCollectingListener[Any]): LocalModelData = { + modelData( + collectingListener, + List( + GenericRecordHours(intKey, 0, 1, "a"), + GenericRecordHours(stringKey, 1, 2, "b"), + GenericRecordHours(mapKey, 1, 3, "a"), + GenericRecordHours(intKey, 2, 5, "b"), + GenericRecordHours(stringKey, 3, 2, "c"), + GenericRecordHours(mapKey, 4, 5, "b") + ) + ) + } + + test("key variable can be something else than string") { + ResultsCollectingListenerHolder.withListener { collectingListener => + val modelData = modelData( + collectingListener, + List( + GenericRecordHours(1, 0, 1, "a"), + GenericRecordHours("2", 1, 2, "b"), + GenericRecordHours(List(1), 2, 5, "b"), + GenericRecordHours(Map("a" -> 1), 3, 2, "c"), + GenericRecordHours(1.2, 4, 5, "b") + ) + ) + val testScenario = sliding("#AGG.first", "#input.str", emitWhenEventLeft = false) + + runScenario(modelData, testScenario) + val keyVariables = collectingListener.keyVariables + keyVariables shouldBe List(1, "2", List(1), Map("a" -> 1), 1.2) + } + } + + test("sliding aggregation when emitWhenEventLeft is false should groupBy complex types") { + ResultsCollectingListenerHolder.withListener { collectingListener => + val testScenario = sliding("#AGG.first", "#input.eId", emitWhenEventLeft = false) + + runScenario(modelDataToCheckAggregations(collectingListener), testScenario) + val keyVariables = collectingListener.keyVariables + keyVariables.distinct shouldBe List(intKey, stringKey, mapKey) + + val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey) + aggregateVariablesForInt shouldBe List(1, 5) + + val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey) + aggregateVariablesForString shouldBe List(2, 2) + + val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey) + aggregateVariablesForMap shouldBe List(3, 5) + } + } + + test("sliding aggregation when emitWhenEventLeft is true should groupBy complex types") { + ResultsCollectingListenerHolder.withListener { collectingListener => + val testScenario = sliding("#AGG.first", "#input.eId", emitWhenEventLeft = true) + + runScenario(modelDataToCheckAggregations(collectingListener), testScenario) + val keyVariables = collectingListener.keyVariables + keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey) + + val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey) + aggregateVariablesForInt shouldBe List(1, 5, null) + + val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey) + aggregateVariablesForString shouldBe List(2, 2, null) + + val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey) + aggregateVariablesForMap shouldBe List(3, 5, null) + } + } + + test("tumbling aggregation when emitWhen is onEndWithExtraWindow should groupBy complex types") { + ResultsCollectingListenerHolder.withListener { collectingListener => + val testScenario = tumbling("#AGG.first", "#input.eId", emitWhen = TumblingWindowTrigger.OnEndWithExtraWindow) + + runScenario(modelDataToCheckAggregations(collectingListener), testScenario) + val keyVariables = collectingListener.keyVariables + keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey) + + val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey) + aggregateVariablesForInt shouldBe List(1, 5, null) + + val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey) + aggregateVariablesForString shouldBe List(2, 2, null) + + val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey) + aggregateVariablesForMap shouldBe List(3, null, 5, null) + } + } + + test("tumbling aggregation when emitWhen is onEvent should groupBy complex types") { + ResultsCollectingListenerHolder.withListener { collectingListener => + val testScenario = tumbling("#AGG.first", "#input.eId", emitWhen = TumblingWindowTrigger.OnEvent) + + runScenario(modelDataToCheckAggregations(collectingListener), testScenario) + val keyVariables = collectingListener.keyVariables + keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey) + + val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey) + aggregateVariablesForInt shouldBe List(1, 5) + + val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey) + aggregateVariablesForString shouldBe List(2, 2) + + val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey) + aggregateVariablesForMap shouldBe List(3, 5) + } + } + + test("session aggregation should groupBy complex types") { + ResultsCollectingListenerHolder.withListener { collectingListener => + val testScenario = + session("#AGG.first", "#input.eId", emitWhen = SessionWindowTrigger.OnEvent, endSessionCondition = "false") + + runScenario(modelDataToCheckAggregations(collectingListener), testScenario) + val keyVariables = collectingListener.keyVariables + keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey) + + val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey) + aggregateVariablesForInt shouldBe List(1, 1) + + val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey) + aggregateVariablesForString shouldBe List(2, 2) + + val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey) + aggregateVariablesForMap shouldBe List(3, 5) + } + } + private def runScenario( model: LocalModelData, testScenario: CanonicalProcess diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala index 18236851fe5..f318ac155da 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala @@ -25,7 +25,7 @@ class EmitWhenEventLeftAggregatorFunction[MapT[K, V]]( override val nodeId: NodeId, protected val aggregateElementType: TypingResult, override protected val aggregateTypeInformation: TypeInformation[AnyRef], - val convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext, + val convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext )(implicit override val rangeMap: FlinkRangeMap[MapT]) extends LatelyEvictableStateFunction[ ValueWithContext[KeyedValue[AnyRef, AnyRef]], diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 1c0149d1f22..b23547d0bb8 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -198,11 +198,7 @@ object transformers { .window(windowDefinition) .trigger(baseTrigger) .aggregate( - new UnwrappingAggregateFunction[(AnyRef, java.lang.Boolean)]( - aggregator, - aggregateBy.returnType, - _._1 - ), + aggregatingFunction, EnrichingWithKeyFunction(fctx), typeInfos.storedTypeInfo, typeInfos.returnTypeInfo,