Skip to content

Commit

Permalink
Cleaning code and added tests to check how key variable functions in …
Browse files Browse the repository at this point in the history
…more complex cases
  • Loading branch information
Szymon Bogusz committed Feb 13, 2025
1 parent 230bc1b commit e6a94db
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 42 deletions.
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* Kafka source has "offset reset strategy" parameter that controls starting point for reading events.
* Configuration entry `kafkaEspProperties.forceLatestRead` is replaced with `kafkaEspProperties.defaultOffsetResetStrategy`
* [#7545](https://github.com/TouK/nussknacker/pull/7545) Added `useMiniClusterForDeployment` option allowing to run Flink scenarios on Flink MiniCluster
* [#7553](https://github.com/TouK/nussknacker/pull/7553) Key variable created in window component doesn't have to be string

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,6 @@ object keyed {

private lazy val interpreter = prepareInterpreter(key, value)

// private def transformKey(keyValue: CharSequence): String = {
// Option(keyValue).map(_.toString).getOrElse("")
// }

// protected def prepareInterpreter(
// key: LazyParameter[String],
// value: LazyParameter[Value]
// ): Context => KeyedValue[Key, Value] = {
// toEvaluateFunctionConverter.toEvaluateFunction[KeyedValue[Key, Value]](
// key.product(value).map(tuple => KeyedValue(tuple._1, tuple._2))
// )
// }

override protected def interpret(ctx: Context): KeyedValue[Key, Value] = interpreter(ctx)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
endVariablesForKey(key).map(_.variableTyped[T]("fragmentResult").get)
}

def endVariablesForKey(key: String): List[TestProcess.ResultContext[Any]] = {
def fragmentResultEndVariable[K <: Any, T <: AnyRef](key: K): List[T] = {
endVariablesForKey(key).map(_.variableTyped[T]("fragmentResult").get)
}

def endVariablesForKey[K <: Any](key: K): List[TestProcess.ResultContext[Any]] = {
collectingListener.results
.nodeResults("end")
.filter(_.variableTyped[String](VariableConstants.KeyVariableName).contains(key))
.filter(_.variableTyped[K](VariableConstants.KeyVariableName).contains(key))
}

def keyVariables[T <: AnyRef]: List[T] = {
Expand Down Expand Up @@ -172,27 +176,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
}
}

test("key variable can be something else than string") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val model =
modelData(
collectingListener,
List(
GenericRecordHours(1, 0, 1, "a"),
GenericRecordHours("2", 1, 2, "b"),
GenericRecordHours(List(1), 2, 5, "b"),
GenericRecordHours(Map("a" -> 1), 3, 2, "c"),
GenericRecordHours(1.2, 4, 5, "b")
)
)
val testScenario = sliding("#AGG.first", "#input.str", emitWhenEventLeft = false)

runScenario(model, testScenario)
val aggregateVariables = collectingListener.keyVariables
aggregateVariables shouldBe List(1, "2", List(1), Map("a" -> 1), 1.2)
}
}

test("sum aggregate with zeros") {
val id = "1"
ResultsCollectingListenerHolder.withListener { collectingListener =>
Expand Down Expand Up @@ -887,6 +870,140 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
}
}

val intKey: Int = 1
val stringKey: String = "2"
val mapKey: Map[String, Int] = Map("a" -> 1)

def modelDataToCheckAggregations(collectingListener: ResultsCollectingListener[Any]): LocalModelData = {
modelData(
collectingListener,
List(
GenericRecordHours(intKey, 0, 1, "a"),
GenericRecordHours(stringKey, 1, 2, "b"),
GenericRecordHours(mapKey, 1, 3, "a"),
GenericRecordHours(intKey, 2, 5, "b"),
GenericRecordHours(stringKey, 3, 2, "c"),
GenericRecordHours(mapKey, 4, 5, "b")
)
)
}

test("key variable can be something else than string") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val modelData = modelData(
collectingListener,
List(
GenericRecordHours(1, 0, 1, "a"),
GenericRecordHours("2", 1, 2, "b"),
GenericRecordHours(List(1), 2, 5, "b"),
GenericRecordHours(Map("a" -> 1), 3, 2, "c"),
GenericRecordHours(1.2, 4, 5, "b")
)
)
val testScenario = sliding("#AGG.first", "#input.str", emitWhenEventLeft = false)

runScenario(modelData, testScenario)
val keyVariables = collectingListener.keyVariables
keyVariables shouldBe List(1, "2", List(1), Map("a" -> 1), 1.2)
}
}

test("sliding aggregation when emitWhenEventLeft is false should groupBy complex types") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val testScenario = sliding("#AGG.first", "#input.eId", emitWhenEventLeft = false)

runScenario(modelDataToCheckAggregations(collectingListener), testScenario)
val keyVariables = collectingListener.keyVariables
keyVariables.distinct shouldBe List(intKey, stringKey, mapKey)

val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey)
aggregateVariablesForInt shouldBe List(1, 5)

val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey)
aggregateVariablesForString shouldBe List(2, 2)

val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey)
aggregateVariablesForMap shouldBe List(3, 5)
}
}

test("sliding aggregation when emitWhenEventLeft is true should groupBy complex types") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val testScenario = sliding("#AGG.first", "#input.eId", emitWhenEventLeft = true)

runScenario(modelDataToCheckAggregations(collectingListener), testScenario)
val keyVariables = collectingListener.keyVariables
keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey)

val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey)
aggregateVariablesForInt shouldBe List(1, 5, null)

val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey)
aggregateVariablesForString shouldBe List(2, 2, null)

val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey)
aggregateVariablesForMap shouldBe List(3, 5, null)
}
}

test("tumbling aggregation when emitWhen is onEndWithExtraWindow should groupBy complex types") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val testScenario = tumbling("#AGG.first", "#input.eId", emitWhen = TumblingWindowTrigger.OnEndWithExtraWindow)

runScenario(modelDataToCheckAggregations(collectingListener), testScenario)
val keyVariables = collectingListener.keyVariables
keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey)

val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey)
aggregateVariablesForInt shouldBe List(1, 5, null)

val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey)
aggregateVariablesForString shouldBe List(2, 2, null)

val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey)
aggregateVariablesForMap shouldBe List(3, null, 5, null)
}
}

test("tumbling aggregation when emitWhen is onEvent should groupBy complex types") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val testScenario = tumbling("#AGG.first", "#input.eId", emitWhen = TumblingWindowTrigger.OnEvent)

runScenario(modelDataToCheckAggregations(collectingListener), testScenario)
val keyVariables = collectingListener.keyVariables
keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey)

val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey)
aggregateVariablesForInt shouldBe List(1, 5)

val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey)
aggregateVariablesForString shouldBe List(2, 2)

val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey)
aggregateVariablesForMap shouldBe List(3, 5)
}
}

test("session aggregation should groupBy complex types") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val testScenario =
session("#AGG.first", "#input.eId", emitWhen = SessionWindowTrigger.OnEvent, endSessionCondition = "false")

runScenario(modelDataToCheckAggregations(collectingListener), testScenario)
val keyVariables = collectingListener.keyVariables
keyVariables.toSet shouldBe Set(intKey, stringKey, mapKey)

val aggregateVariablesForInt = collectingListener.fragmentResultEndVariable[Int, Number](intKey)
aggregateVariablesForInt shouldBe List(1, 1)

val aggregateVariablesForString = collectingListener.fragmentResultEndVariable[String, Number](stringKey)
aggregateVariablesForString shouldBe List(2, 2)

val aggregateVariablesForMap = collectingListener.fragmentResultEndVariable[Map[String, Int], Number](mapKey)
aggregateVariablesForMap shouldBe List(3, 5)
}
}

private def runScenario(
model: LocalModelData,
testScenario: CanonicalProcess
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class EmitWhenEventLeftAggregatorFunction[MapT[K, V]](
override val nodeId: NodeId,
protected val aggregateElementType: TypingResult,
override protected val aggregateTypeInformation: TypeInformation[AnyRef],
val convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext,
val convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext
)(implicit override val rangeMap: FlinkRangeMap[MapT])
extends LatelyEvictableStateFunction[
ValueWithContext[KeyedValue[AnyRef, AnyRef]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,7 @@ object transformers {
.window(windowDefinition)
.trigger(baseTrigger)
.aggregate(
new UnwrappingAggregateFunction[(AnyRef, java.lang.Boolean)](
aggregator,
aggregateBy.returnType,
_._1
),
aggregatingFunction,
EnrichingWithKeyFunction(fctx),
typeInfos.storedTypeInfo,
typeInfos.returnTypeInfo,
Expand Down

0 comments on commit e6a94db

Please sign in to comment.