Skip to content

Commit

Permalink
[NU-1962] Flink scenario testing mechanism: mini cluster created once…
Browse files Browse the repository at this point in the history
… and reused each time (#7458)
  • Loading branch information
arkadius authored Jan 25, 2025
1 parent 700ae63 commit 0f1ccfb
Show file tree
Hide file tree
Showing 41 changed files with 949 additions and 534 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ jobs:
echo "scala_version_matrix=[\"2.13\"]" >> $GITHUB_OUTPUT
fi
build:
name: Build
runs-on: ubuntu-latest
Expand Down
32 changes: 19 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ lazy val flinkDeploymentManager = (project in flink("management"))
componentsApi % Provided,
httpUtils % Provided,
flinkScalaUtils % Provided,
flinkTestUtils % IntegrationTest,
// test->test dependency is needed to load SimpleProcessConfigCreator
flinkExecutor % "test,test->test",
flinkTestUtils % "it,test",
kafkaTestUtils % "it,test"
)

Expand Down Expand Up @@ -708,18 +710,22 @@ lazy val flinkTests = (project in flink("tests"))
}
)
.dependsOn(
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
flinkTableApiComponents % Test,
flinkTestUtils % Test,
kafkaTestUtils % Test,
flinkComponentsTestkit % Test,
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
flinkTableApiComponents % Test,
flinkTestUtils % Test,
kafkaTestUtils % Test,
flinkComponentsTestkit % Test,
flinkDeploymentManager % Test,
// test->test dependencies are needed to load components from these modules
flinkKafkaComponentsUtils % "test->test",
flinkSchemedKafkaComponentsUtils % "test->test",
// for local development
designer % Test,
deploymentManagerApi % Test
designer % Test,
deploymentManagerApi % Test
)

lazy val defaultModel = (project in (file("defaultModel")))
Expand Down Expand Up @@ -963,7 +969,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com
componentsUtils % Provided,
kafkaTestUtils % Test,
flinkTestUtils % Test,
flinkExecutor % Test
flinkExecutor % Test,
)

lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
package pl.touk.nussknacker.ui.api.description

import io.circe.Encoder
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.engine.definition.test.TestingCapabilities
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions.SecuredEndpoint
import pl.touk.nussknacker.restmodel.definition.{UIParameter, UISourceParameters}
import pl.touk.nussknacker.restmodel.definition.UISourceParameters
import pl.touk.nussknacker.restmodel.validation.ValidationResults.{NodeValidationError, NodeValidationErrorType}
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.security.AuthCredentials
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioGraphCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioTestingCodecs._
import pl.touk.nussknacker.ui.definition.DefinitionsService
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir._
import sttp.tapir.json.circe.jsonBody
import io.circe.Encoder
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.ui.api.TestingApiHttpService.Examples.{
malformedTypingResultExample,
noScenarioExample,
testDataGenerationErrorExample
}
import pl.touk.nussknacker.ui.api.TestingApiHttpService.TestingError
import pl.touk.nussknacker.ui.definition.DefinitionsService
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir._
import sttp.tapir.json.circe.jsonBody

class TestingApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions {
import NodesApiEndpoints.Dtos._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider}
import pl.touk.nussknacker.engine.util.loader.DeploymentManagersClassLoader
import pl.touk.nussknacker.engine.management.{
FlinkDeploymentManager,
FlinkStreamingDeploymentManagerProvider,
ScenarioTestingConfig
}
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.utils.domain.TestFactory
Expand Down Expand Up @@ -84,7 +89,8 @@ class MockDeploymentManager private (
SttpBackendStub.asynchronousFuture
),
shouldVerifyBeforeDeploy = false,
mainClassName = "UNUSED"
mainClassName = "UNUSED",
scenarioTestingConfig = ScenarioTestingConfig()
) {

import MockDeploymentManager._
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* [#7335](https://github.com/TouK/nussknacker/pull/7335) introduced `managersDirs` config to configure deployment managers directory paths (you can use `MANAGERS_DIR` env in case of docker-based deployments). The default is `./managers`.
* [#7481](https://github.com/TouK/nussknacker/pull/7481) Ignore jobs in CANCELLING status when checking for duplicate jobs on Flink
* [#7483](https://github.com/TouK/nussknacker/pull/7483) It's possible to configure kafka source to work without schema registry. To do that you should not provide property "schema.registry.url" in kafkaProperties config.
* [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: mini cluster created once and reused each time

## 1.18

Expand Down
3 changes: 3 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ To see the biggest differences please consult the [changelog](Changelog.md).
}
```
* [#7335](https://github.com/TouK/nussknacker/pull/7335) Deployment managers are loaded using separate class loader (not the Application ClassLoader - `/opt/nussknacker/managers/*` should be removed from CLASSPATH definition). The default location for deployment managers jars is the `managers` folder inside the working directory.
* [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: by default mini cluster is created once and reused each time
To revert previous behaviour (creating minicluster each time), change `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioTesting` or/and
`deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioStateVerification` to `false`
### Code API changes
* [#7368](https://github.com/TouK/nussknacker/pull/7368) Renamed `PeriodicSourceFactory` to `SampleGeneratorSourceFactory`
Expand Down
Loading

0 comments on commit 0f1ccfb

Please sign in to comment.