Skip to content

Commit

Permalink
Cleaning code and added test to check if key variable can be somethin…
Browse files Browse the repository at this point in the history
…g different from string
  • Loading branch information
Szymon Bogusz authored and ForrestFairy committed Feb 12, 2025
1 parent ebe4026 commit 230bc1b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package pl.touk.nussknacker.engine.flink.util

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream, SingleOutputStreamOperator}
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
import pl.touk.nussknacker.engine.flink.util.keyed.{
GenericKeyOnlyMapper,
GenericKeyedValueMapper,
StringKeyOnlyMapper,
StringKeyedValueMapper
}
import pl.touk.nussknacker.engine.flink.util.keyed.{GenericKeyOnlyMapper, GenericKeyedValueMapper, StringKeyOnlyMapper}
import pl.touk.nussknacker.engine.util.KeyedValue

import scala.reflect.ClassTag
Expand Down Expand Up @@ -40,15 +34,6 @@ object richflink {
)
.keyBy((k: ValueWithContext[K]) => k.value)

// def groupByWithValue[T <: AnyRef: TypeTag](groupBy: LazyParameter[CharSequence], value: LazyParameter[T])(
// implicit ctx: FlinkCustomNodeContext
// ): KeyedStream[ValueWithContext[KeyedValue[String, T]], String] = {
// val typeInfo = keyed.typeInfo(ctx, groupBy.map[String]((k: CharSequence) => k.toString), value)
// dataStream
// .flatMap(new StringKeyedValueMapper(ctx.lazyParameterHelper, groupBy, value), typeInfo)
// .keyBy((k: ValueWithContext[KeyedValue[String, T]]) => k.value.key)
// }

def groupByWithValue[T <: AnyRef: TypeTag, K <: AnyRef: TypeTag](
groupBy: LazyParameter[K],
value: LazyParameter[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
.filter(_.variableTyped[String](VariableConstants.KeyVariableName).contains(key))
}

def keyVariables[T <: AnyRef]: List[T] = {
collectingListener.results
.nodeResults("end")
.map(_.variableTyped[T]("key").get)
}

}

private val processValidator: ProcessValidator =
Expand Down Expand Up @@ -166,6 +172,27 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
}
}

test("key variable can be something else than string") {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val model =
modelData(
collectingListener,
List(
GenericRecordHours(1, 0, 1, "a"),
GenericRecordHours("2", 1, 2, "b"),
GenericRecordHours(List(1), 2, 5, "b"),
GenericRecordHours(Map("a" -> 1), 3, 2, "c"),
GenericRecordHours(1.2, 4, 5, "b")
)
)
val testScenario = sliding("#AGG.first", "#input.str", emitWhenEventLeft = false)

runScenario(model, testScenario)
val aggregateVariables = collectingListener.keyVariables
aggregateVariables shouldBe List(1, "2", List(1), Map("a" -> 1), 1.2)
}
}

test("sum aggregate with zeros") {
val id = "1"
ResultsCollectingListenerHolder.withListener { collectingListener =>
Expand Down Expand Up @@ -1058,7 +1085,7 @@ case class AggregateData(
)

trait TestRecord {
val id: String
val id: Any
val eId: Int
val str: String

Expand All @@ -1070,3 +1097,7 @@ case class TestRecordHours(id: String, timeHours: Int, eId: Int, str: String) ex
}

case class TestRecordWithTimestamp(id: String, timestamp: Long, eId: Int, str: String) extends TestRecord

case class GenericRecordHours[T <: Any](id: T, timeHours: Int, eId: Int, str: String) extends TestRecord {
override def timestamp: Long = timeHours * 3600L * 1000
}

0 comments on commit 230bc1b

Please sign in to comment.