From 88b36dec5288ae2bf641c41d14657b10dd7beeea Mon Sep 17 00:00:00 2001 From: Marcin Stelmaszczyk Date: Fri, 22 Mar 2024 13:33:40 +0100 Subject: [PATCH] NU 1.14.0 release --- .../sample/csv/GenericCsvSourceFactory.scala | 40 +++++++++++-------- .../csv/SpecificRecordCsvSourceFactory.scala | 6 ++- .../sample/SampleComponentProviderTest.scala | 2 + .../csv/CallDetailRecordSourceTest.scala | 6 ++- .../nussknacker/sample/csv/CsvSinkTest.scala | 2 + .../sample/csv/CsvSourceTest.scala | 12 ++++-- .../SampleComponentProviderLiteTest.scala | 7 +++- nussknacker.version | 2 +- 8 files changed, 50 insertions(+), 27 deletions(-) diff --git a/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/GenericCsvSourceFactory.scala b/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/GenericCsvSourceFactory.scala index 1354731..7cff7c1 100644 --- a/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/GenericCsvSourceFactory.scala +++ b/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/GenericCsvSourceFactory.scala @@ -5,24 +5,30 @@ import cats.syntax.apply._ import cats.syntax.traverse._ import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner import org.apache.flink.api.common.typeinfo.TypeInformation -import pl.touk.nussknacker.engine.api.NodeId +import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent +import pl.touk.nussknacker.engine.api.{NodeId, Params} import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError -import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputGenericNodeTransformation} +import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputDynamicComponent} import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext} -import pl.touk.nussknacker.engine.api.definition.{NodeDependency, ParameterWithExtractor} +import pl.touk.nussknacker.engine.api.definition.{NodeDependency, ParameterCreatorWithNoDependency, ParameterDeclaration, ParameterExtractor} +import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.{Source, SourceFactory} import pl.touk.nussknacker.engine.api.typed.TypedMap import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult, TypingResult} import pl.touk.nussknacker.engine.flink.api.timestampwatermark.StandardTimestampWatermarkHandler.toAssigner import pl.touk.nussknacker.engine.util.typing.TypingUtils -import pl.touk.nussknacker.sample.csv.GenericCsvSourceFactory.{ColumnParsers, DefinitionParameter, FileNameParameter} +import pl.touk.nussknacker.sample.csv.GenericCsvSourceFactory.{ColumnParsers, DefinitionParameterName, DefinitionParameterDeclaration, FileNameParameterName, FileNameParameterDeclaration} import java.io.File import scala.collection.JavaConverters._ object GenericCsvSourceFactory { - val FileNameParameter: ParameterWithExtractor[String] = ParameterWithExtractor.mandatory("fileName") - val DefinitionParameter: ParameterWithExtractor[java.util.List[java.util.List[String]]] = ParameterWithExtractor.mandatory("definition") + val FileNameParameterName: ParameterName = ParameterName("fileName") + val DefinitionParameterName: ParameterName = ParameterName("definition") + val FileNameParameterDeclaration: ParameterCreatorWithNoDependency with ParameterExtractor[String] = + ParameterDeclaration.mandatory[String](FileNameParameterName).withCreator() + val DefinitionParameterDeclaration: ParameterCreatorWithNoDependency with ParameterExtractor[java.util.List[java.util.List[String]]] = + ParameterDeclaration.mandatory[java.util.List[java.util.List[String]]](DefinitionParameterName).withCreator() private val ColumnParsers: Map[TypingResult, String => Any] = Map( Typed[String] -> identity, @@ -33,17 +39,17 @@ object GenericCsvSourceFactory { /** * A sample generic CSV source. It has two parameters - fileName and definition. Definition describe columns in the file - names and their types. */ -class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceFactory with SingleInputGenericNodeTransformation[Source] { +class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceFactory with SingleInputDynamicComponent[Source] with UnboundedStreamComponent { override type State = Nothing override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue]) - (implicit nodeId: NodeId): NodeTransformationDefinition = { + (implicit nodeId: NodeId): ContextTransformationDefinition = { case TransformationStep(Nil, _) => - NextParameters(FileNameParameter.parameter :: DefinitionParameter.parameter :: Nil) + NextParameters(FileNameParameterDeclaration.createParameter() :: DefinitionParameterDeclaration.createParameter() :: Nil) case TransformationStep( - (FileNameParameter.parameter.name, DefinedEagerParameter(fileName: String, _)) :: - (DefinitionParameter.parameter.name, DefinedEagerParameter(definition: java.util.List[java.util.List[String]], _)) :: + (`FileNameParameterName`, DefinedEagerParameter(fileName: String, _)) :: + (`DefinitionParameterName`, DefinedEagerParameter(definition: java.util.List[java.util.List[String]], _)) :: Nil, _) => ( validateFileName(fileName), @@ -56,10 +62,10 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF ) } - override def implementation(params: Map[String, Any], dependencies: List[NodeDependencyValue], finalState: Option[State]): Source = { - val fileName = FileNameParameter.extractValue(params) + override def implementation(params: Params, dependencies: List[NodeDependencyValue], finalState: Option[State]): Source = { + val fileName = FileNameParameterDeclaration.extractValueUnsafe(params) val file = new File(filesDir, fileName) - val definition = DefinitionParameter.extractValue(params) + val definition = DefinitionParameterDeclaration.extractValueUnsafe(params) // For each event, current time is assigned. We could also add a parameter with timestamp column name and assign timestamps // based on the given column value. val assignProcessingTime: SerializableTimestampAssigner[TypedMap] = toAssigner(_ => System.currentTimeMillis()) @@ -71,7 +77,7 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF private def validateFileName(fileName: String)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, Unit] = { val file = new File(filesDir, fileName) - Validated.condNel(file.canRead, (), CustomNodeError(s"File: '$fileName' is not readable", paramName = Some(FileNameParameter.parameter.name))) + Validated.condNel(file.canRead, (), CustomNodeError(s"File: '$fileName' is not readable", paramName = Some(FileNameParameterDeclaration.parameterName))) } private def describeInput(definition: java.util.List[java.util.List[String]]) @@ -80,7 +86,7 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF Validated.condNel( column.size() == 2, (column.get(0), column.get(1)), - CustomNodeError(s"Column ${idx + 1} should have name and type", Some(DefinitionParameter.parameter.name)) + CustomNodeError(s"Column ${idx + 1} should have name and type", Some(DefinitionParameterDeclaration.parameterName)) ) }.sequence validatedDefinitionFormat.map(namesAndTypes => TypingUtils.typeMapDefinition(namesAndTypes.toMap)) @@ -93,7 +99,7 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF Validated.condNel( ColumnParsers.contains(typingResult), (name, typingResult), - CustomNodeError(s"Type for column '$name' is not supported", Some(DefinitionParameter.parameter.name)) + CustomNodeError(s"Type for column '$name' is not supported", Some(DefinitionParameterDeclaration.parameterName)) ) }.toList.sequence.map(TypedObjectTypingResult(_)) } diff --git a/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/SpecificRecordCsvSourceFactory.scala b/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/SpecificRecordCsvSourceFactory.scala index f02b53d..2a7efc0 100644 --- a/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/SpecificRecordCsvSourceFactory.scala +++ b/flink-components/src/main/scala/pl/touk/nussknacker/sample/csv/SpecificRecordCsvSourceFactory.scala @@ -1,6 +1,8 @@ package pl.touk.nussknacker.sample.csv import org.apache.flink.api.common.typeinfo.TypeInformation +import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent +import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.SourceFactory import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName} @@ -12,13 +14,13 @@ import java.io.File class SpecificRecordCsvSourceFactory[T: TypeInformation](filesDir: File, separator: Char, createRecord: Array[String] => T, - extractTimestamp: T => Long) extends SourceFactory { + extractTimestamp: T => Long) extends SourceFactory with UnboundedStreamComponent { @MethodToInvoke def create(@ParamName("fileName") fileName: String): FlinkSource = { val file = new File(filesDir, fileName) if (!file.canRead) { - throw CustomNodeValidationException(s"File: '$file' is not readable", paramName = Some("fileName")) + throw CustomNodeValidationException(s"File: '$file' is not readable", paramName = Some(ParameterName("fileName"))) } new CsvSource[T](file, separator, createRecord, toAssigner(extractTimestamp(_))) } diff --git a/flink-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderTest.scala b/flink-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderTest.scala index 78ef556..e2ad930 100644 --- a/flink-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderTest.scala +++ b/flink-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderTest.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.sample import org.junit.jupiter.api.Test import org.scalatest.Inside.inside import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.Implicits._ @@ -23,6 +24,7 @@ class SampleComponentProviderTest extends Matchers with ValidatedValuesDetailedM val runner = TestScenarioRunner .flinkBased(config, flinkMiniCluster) + .withExtraComponents(ComponentDefinition("randomString", new RandomStringProvider) :: Nil) .build() val length = 5 diff --git a/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CallDetailRecordSourceTest.scala b/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CallDetailRecordSourceTest.scala index c6b5b66..3edd245 100644 --- a/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CallDetailRecordSourceTest.scala +++ b/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CallDetailRecordSourceTest.scala @@ -4,6 +4,8 @@ import org.junit.jupiter.api.Test import org.scalatest.Inside.inside import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression @@ -34,6 +36,7 @@ class CallDetailRecordSourceTest extends Matchers with ValidatedValuesDetailedMe .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input") val runner = TestScenarioRunner .flinkBased(config, flinkMiniCluster) + .withExtraComponents(ComponentDefinition("cdr", CallDetailRecordSourceFactory.prepare(cdrsFile.getParent.toString, ';')) :: Nil) .build() val results = runner.runWithoutData[CallDetailRecord](scenario).validValue @@ -59,11 +62,12 @@ class CallDetailRecordSourceTest extends Matchers with ValidatedValuesDetailedMe .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input") val runner = TestScenarioRunner .flinkBased(config, flinkMiniCluster) + .withExtraComponents(ComponentDefinition("cdr", CallDetailRecordSourceFactory.prepare("/tmp", ';')) :: Nil) .build() val compilationErrors = runner.runWithoutData[CallDetailRecord](scenario).invalidValue.toList - compilationErrors should contain only CustomNodeError("cdr source", "File: '/tmp/unexisting.csv' is not readable", Some("fileName")) + compilationErrors should contain only CustomNodeError("cdr source", "File: '/tmp/unexisting.csv' is not readable", Some(ParameterName("fileName"))) } } diff --git a/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSinkTest.scala b/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSinkTest.scala index 00ec7b0..5e20454 100644 --- a/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSinkTest.scala +++ b/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSinkTest.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.sample.csv import org.junit.jupiter.api.Test import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression @@ -25,6 +26,7 @@ class CsvSinkTest extends Matchers { .emptySink("end", "csvSink", "fileName" -> s"'${resultsFile.getFileName.toFile}'", "row" -> "{#input, 'const'}") val runner = TestScenarioRunner .flinkBased(config, flinkMiniCluster) + .withExtraComponents(ComponentDefinition("csvSink", new CsvSinkFactory(resultsFile.getParent.toString, ';')) :: Nil) .build() runner.runWithDataIgnoringResults(scenario, List("first", "second")) diff --git a/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSourceTest.scala b/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSourceTest.scala index 062bf9a..1fb61cb 100644 --- a/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSourceTest.scala +++ b/flink-components/src/test/scala/pl/touk/nussknacker/sample/csv/CsvSourceTest.scala @@ -3,8 +3,10 @@ package pl.touk.nussknacker.sample.csv import org.junit.jupiter.api.Test import org.scalatest.Inside.inside import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.context.ProcessCompilationError import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError +import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.graph.expression.Expression @@ -35,6 +37,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage { .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input") val runner = TestScenarioRunner .flinkBased(config, flinkMiniCluster) + .withExtraComponents(ComponentDefinition("csvSource", new GenericCsvSourceFactory(csvFile.getParent.toString, ';')) :: Nil) .build() val results = runner.runWithoutData[java.util.Map[String, Any]](scenario).validValue @@ -53,7 +56,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage { @Test def shouldThrowOnNonReadableFile(): Unit = { testCompilationErrors("fileName" -> "'unexisting.csv'", "definition" -> "{{'name', 'String'}, {'phoneNumber', 'Long'}}") should - contain (CustomNodeError("source", "File: 'unexisting.csv' is not readable", Some("fileName"))) + contain (CustomNodeError("source", "File: 'unexisting.csv' is not readable", Some(ParameterName("fileName")))) } @Test @@ -61,7 +64,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage { val emptyFile = Files.createTempFile("test", ".csv") emptyFile.toFile.deleteOnExit() testCompilationErrors("fileName" -> s"'${emptyFile.getFileName.toString}'", "definition" -> "{{'name', 'String'}, {'phoneNumber'}}") should - contain (CustomNodeError("source", "Column 2 should have name and type", Some("definition"))) + contain (CustomNodeError("source", "Column 2 should have name and type", Some(ParameterName("definition")))) } @Test @@ -70,8 +73,8 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage { emptyFile.toFile.deleteOnExit() testCompilationErrors("fileName" -> s"'${emptyFile.getFileName.toString}'", "definition" -> "{{'name', 'String'}, {'phoneNumber', 'Integer'}, {'callDuration', 'java.time.Duration'}}") should contain allOf( - CustomNodeError("source", "Type for column 'phoneNumber' is not supported", Some("definition")), - CustomNodeError("source", "Type for column 'callDuration' is not supported", Some("definition"))) + CustomNodeError("source", "Type for column 'phoneNumber' is not supported", Some(ParameterName("definition"))), + CustomNodeError("source", "Type for column 'callDuration' is not supported", Some(ParameterName("definition")))) } private def testCompilationErrors(params: (String, Expression)*): List[ProcessCompilationError] = { @@ -81,6 +84,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage { .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input") val runner = TestScenarioRunner .flinkBased(config, flinkMiniCluster) + .withExtraComponents(ComponentDefinition("csvSource", new GenericCsvSourceFactory("/tmp", ';')) :: Nil) .build() runner.runWithoutData[java.util.Map[String, Any]](scenario).invalidValue.toList } diff --git a/lite-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderLiteTest.scala b/lite-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderLiteTest.scala index 3ca4a42..2087d2e 100644 --- a/lite-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderLiteTest.scala +++ b/lite-components/src/test/scala/pl/touk/nussknacker/sample/SampleComponentProviderLiteTest.scala @@ -1,8 +1,8 @@ package pl.touk.nussknacker.sample -import com.typesafe.config.ConfigFactory import org.junit.jupiter.api.Test import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.lite.util.test.LiteTestScenarioRunner import pl.touk.nussknacker.engine.spel.Implicits._ @@ -29,7 +29,10 @@ class SampleComponentProviderLiteTest extends Matchers with ValidatedValuesDetai .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#out1") - val runner = TestScenarioRunner.liteBased().build() + val runner = TestScenarioRunner + .liteBased() + .withExtraComponents(ComponentDefinition("randomString", new RandomStringProvider) :: Nil) + .build() val results = runner.runWithData[SimpleInput, String](scenario, inputData) diff --git a/nussknacker.version b/nussknacker.version index e0a6b34..850e742 100644 --- a/nussknacker.version +++ b/nussknacker.version @@ -1 +1 @@ -1.12.5 +1.14.0