Skip to content

Commit

Permalink
NU 1.14.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
mstelmas committed Mar 22, 2024
1 parent 5eda82a commit 88b36de
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,30 @@ import cats.syntax.apply._
import cats.syntax.traverse._
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent
import pl.touk.nussknacker.engine.api.{NodeId, Params}
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputGenericNodeTransformation}
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputDynamicComponent}
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.definition.{NodeDependency, ParameterWithExtractor}
import pl.touk.nussknacker.engine.api.definition.{NodeDependency, ParameterCreatorWithNoDependency, ParameterDeclaration, ParameterExtractor}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.{Source, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult, TypingResult}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.StandardTimestampWatermarkHandler.toAssigner
import pl.touk.nussknacker.engine.util.typing.TypingUtils
import pl.touk.nussknacker.sample.csv.GenericCsvSourceFactory.{ColumnParsers, DefinitionParameter, FileNameParameter}
import pl.touk.nussknacker.sample.csv.GenericCsvSourceFactory.{ColumnParsers, DefinitionParameterName, DefinitionParameterDeclaration, FileNameParameterName, FileNameParameterDeclaration}

import java.io.File
import scala.collection.JavaConverters._

object GenericCsvSourceFactory {
val FileNameParameter: ParameterWithExtractor[String] = ParameterWithExtractor.mandatory("fileName")
val DefinitionParameter: ParameterWithExtractor[java.util.List[java.util.List[String]]] = ParameterWithExtractor.mandatory("definition")
val FileNameParameterName: ParameterName = ParameterName("fileName")
val DefinitionParameterName: ParameterName = ParameterName("definition")
val FileNameParameterDeclaration: ParameterCreatorWithNoDependency with ParameterExtractor[String] =
ParameterDeclaration.mandatory[String](FileNameParameterName).withCreator()
val DefinitionParameterDeclaration: ParameterCreatorWithNoDependency with ParameterExtractor[java.util.List[java.util.List[String]]] =
ParameterDeclaration.mandatory[java.util.List[java.util.List[String]]](DefinitionParameterName).withCreator()

private val ColumnParsers: Map[TypingResult, String => Any] = Map(
Typed[String] -> identity,
Expand All @@ -33,17 +39,17 @@ object GenericCsvSourceFactory {
/**
* A sample generic CSV source. It has two parameters - fileName and definition. Definition describe columns in the file - names and their types.
*/
class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceFactory with SingleInputGenericNodeTransformation[Source] {
class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceFactory with SingleInputDynamicComponent[Source] with UnboundedStreamComponent {

override type State = Nothing

override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])
(implicit nodeId: NodeId): NodeTransformationDefinition = {
(implicit nodeId: NodeId): ContextTransformationDefinition = {
case TransformationStep(Nil, _) =>
NextParameters(FileNameParameter.parameter :: DefinitionParameter.parameter :: Nil)
NextParameters(FileNameParameterDeclaration.createParameter() :: DefinitionParameterDeclaration.createParameter() :: Nil)
case TransformationStep(
(FileNameParameter.parameter.name, DefinedEagerParameter(fileName: String, _)) ::
(DefinitionParameter.parameter.name, DefinedEagerParameter(definition: java.util.List[java.util.List[String]], _)) ::
(`FileNameParameterName`, DefinedEagerParameter(fileName: String, _)) ::
(`DefinitionParameterName`, DefinedEagerParameter(definition: java.util.List[java.util.List[String]], _)) ::
Nil, _) =>
(
validateFileName(fileName),
Expand All @@ -56,10 +62,10 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF
)
}

override def implementation(params: Map[String, Any], dependencies: List[NodeDependencyValue], finalState: Option[State]): Source = {
val fileName = FileNameParameter.extractValue(params)
override def implementation(params: Params, dependencies: List[NodeDependencyValue], finalState: Option[State]): Source = {
val fileName = FileNameParameterDeclaration.extractValueUnsafe(params)
val file = new File(filesDir, fileName)
val definition = DefinitionParameter.extractValue(params)
val definition = DefinitionParameterDeclaration.extractValueUnsafe(params)
// For each event, current time is assigned. We could also add a parameter with timestamp column name and assign timestamps
// based on the given column value.
val assignProcessingTime: SerializableTimestampAssigner[TypedMap] = toAssigner(_ => System.currentTimeMillis())
Expand All @@ -71,7 +77,7 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF

private def validateFileName(fileName: String)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, Unit] = {
val file = new File(filesDir, fileName)
Validated.condNel(file.canRead, (), CustomNodeError(s"File: '$fileName' is not readable", paramName = Some(FileNameParameter.parameter.name)))
Validated.condNel(file.canRead, (), CustomNodeError(s"File: '$fileName' is not readable", paramName = Some(FileNameParameterDeclaration.parameterName)))
}

private def describeInput(definition: java.util.List[java.util.List[String]])
Expand All @@ -80,7 +86,7 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF
Validated.condNel(
column.size() == 2,
(column.get(0), column.get(1)),
CustomNodeError(s"Column ${idx + 1} should have name and type", Some(DefinitionParameter.parameter.name))
CustomNodeError(s"Column ${idx + 1} should have name and type", Some(DefinitionParameterDeclaration.parameterName))
)
}.sequence
validatedDefinitionFormat.map(namesAndTypes => TypingUtils.typeMapDefinition(namesAndTypes.toMap))
Expand All @@ -93,7 +99,7 @@ class GenericCsvSourceFactory(filesDir: String, separator: Char) extends SourceF
Validated.condNel(
ColumnParsers.contains(typingResult),
(name, typingResult),
CustomNodeError(s"Type for column '$name' is not supported", Some(DefinitionParameter.parameter.name))
CustomNodeError(s"Type for column '$name' is not supported", Some(DefinitionParameterDeclaration.parameterName))
)
}.toList.sequence.map(TypedObjectTypingResult(_))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pl.touk.nussknacker.sample.csv

import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.SourceFactory
import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName}
Expand All @@ -12,13 +14,13 @@ import java.io.File
class SpecificRecordCsvSourceFactory[T: TypeInformation](filesDir: File,
separator: Char,
createRecord: Array[String] => T,
extractTimestamp: T => Long) extends SourceFactory {
extractTimestamp: T => Long) extends SourceFactory with UnboundedStreamComponent {

@MethodToInvoke
def create(@ParamName("fileName") fileName: String): FlinkSource = {
val file = new File(filesDir, fileName)
if (!file.canRead) {
throw CustomNodeValidationException(s"File: '$file' is not readable", paramName = Some("fileName"))
throw CustomNodeValidationException(s"File: '$file' is not readable", paramName = Some(ParameterName("fileName")))
}
new CsvSource[T](file, separator, createRecord, toAssigner(extractTimestamp(_)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.sample
import org.junit.jupiter.api.Test
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.spel.Implicits._
Expand All @@ -23,6 +24,7 @@ class SampleComponentProviderTest extends Matchers with ValidatedValuesDetailedM

val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("randomString", new RandomStringProvider) :: Nil)
.build()

val length = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import org.junit.jupiter.api.Test
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression
Expand Down Expand Up @@ -34,6 +36,7 @@ class CallDetailRecordSourceTest extends Matchers with ValidatedValuesDetailedMe
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("cdr", CallDetailRecordSourceFactory.prepare(cdrsFile.getParent.toString, ';')) :: Nil)
.build()

val results = runner.runWithoutData[CallDetailRecord](scenario).validValue
Expand All @@ -59,11 +62,12 @@ class CallDetailRecordSourceTest extends Matchers with ValidatedValuesDetailedMe
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("cdr", CallDetailRecordSourceFactory.prepare("/tmp", ';')) :: Nil)
.build()

val compilationErrors = runner.runWithoutData[CallDetailRecord](scenario).invalidValue.toList

compilationErrors should contain only CustomNodeError("cdr source", "File: '/tmp/unexisting.csv' is not readable", Some("fileName"))
compilationErrors should contain only CustomNodeError("cdr source", "File: '/tmp/unexisting.csv' is not readable", Some(ParameterName("fileName")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.sample.csv

import org.junit.jupiter.api.Test
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression
Expand All @@ -25,6 +26,7 @@ class CsvSinkTest extends Matchers {
.emptySink("end", "csvSink", "fileName" -> s"'${resultsFile.getFileName.toFile}'", "row" -> "{#input, 'const'}")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("csvSink", new CsvSinkFactory(resultsFile.getParent.toString, ';')) :: Nil)
.build()

runner.runWithDataIgnoringResults(scenario, List("first", "second"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package pl.touk.nussknacker.sample.csv
import org.junit.jupiter.api.Test
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.graph.expression.Expression
Expand Down Expand Up @@ -35,6 +37,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage {
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("csvSource", new GenericCsvSourceFactory(csvFile.getParent.toString, ';')) :: Nil)
.build()

val results = runner.runWithoutData[java.util.Map[String, Any]](scenario).validValue
Expand All @@ -53,15 +56,15 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage {
@Test
def shouldThrowOnNonReadableFile(): Unit = {
testCompilationErrors("fileName" -> "'unexisting.csv'", "definition" -> "{{'name', 'String'}, {'phoneNumber', 'Long'}}") should
contain (CustomNodeError("source", "File: 'unexisting.csv' is not readable", Some("fileName")))
contain (CustomNodeError("source", "File: 'unexisting.csv' is not readable", Some(ParameterName("fileName"))))
}

@Test
def shouldThrowOnMalformedDefinition(): Unit = {
val emptyFile = Files.createTempFile("test", ".csv")
emptyFile.toFile.deleteOnExit()
testCompilationErrors("fileName" -> s"'${emptyFile.getFileName.toString}'", "definition" -> "{{'name', 'String'}, {'phoneNumber'}}") should
contain (CustomNodeError("source", "Column 2 should have name and type", Some("definition")))
contain (CustomNodeError("source", "Column 2 should have name and type", Some(ParameterName("definition"))))
}

@Test
Expand All @@ -70,8 +73,8 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage {
emptyFile.toFile.deleteOnExit()
testCompilationErrors("fileName" -> s"'${emptyFile.getFileName.toString}'", "definition" -> "{{'name', 'String'}, {'phoneNumber', 'Integer'}, {'callDuration', 'java.time.Duration'}}") should
contain allOf(
CustomNodeError("source", "Type for column 'phoneNumber' is not supported", Some("definition")),
CustomNodeError("source", "Type for column 'callDuration' is not supported", Some("definition")))
CustomNodeError("source", "Type for column 'phoneNumber' is not supported", Some(ParameterName("definition"))),
CustomNodeError("source", "Type for column 'callDuration' is not supported", Some(ParameterName("definition"))))
}

private def testCompilationErrors(params: (String, Expression)*): List[ProcessCompilationError] = {
Expand All @@ -81,6 +84,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage {
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("csvSource", new GenericCsvSourceFactory("/tmp", ';')) :: Nil)
.build()
runner.runWithoutData[java.util.Map[String, Any]](scenario).invalidValue.toList
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pl.touk.nussknacker.sample

import com.typesafe.config.ConfigFactory
import org.junit.jupiter.api.Test
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.lite.util.test.LiteTestScenarioRunner
import pl.touk.nussknacker.engine.spel.Implicits._
Expand All @@ -29,7 +29,10 @@ class SampleComponentProviderLiteTest extends Matchers with ValidatedValuesDetai
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#out1")


val runner = TestScenarioRunner.liteBased().build()
val runner = TestScenarioRunner
.liteBased()
.withExtraComponents(ComponentDefinition("randomString", new RandomStringProvider) :: Nil)
.build()

val results = runner.runWithData[SimpleInput, String](scenario, inputData)

Expand Down
2 changes: 1 addition & 1 deletion nussknacker.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.12.5
1.14.0

0 comments on commit 88b36de

Please sign in to comment.