diff --git a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Activities should display activities #3.png b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Activities should display activities #3.png index 2dab2dfdb68..690c63f44a9 100644 Binary files a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Activities should display activities #3.png and b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Activities should display activities #3.png differ diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index 8aa3ab96743..48b7b3d87f1 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -147,13 +147,13 @@ trait SchedulingSupported extends SchedulingSupport { deploymentConfig: Config, ): ScheduledExecutionPerformer - def customSchedulePropertyExtractorFactory: Option[SchedulePropertyExtractorFactory] + def customSchedulePropertyExtractorFactory: Option[SchedulePropertyExtractorFactory] = None - def customProcessConfigEnricherFactory: Option[ProcessConfigEnricherFactory] + def customProcessConfigEnricherFactory: Option[ProcessConfigEnricherFactory] = None - def customScheduledProcessListenerFactory: Option[ScheduledProcessListenerFactory] + def customScheduledProcessListenerFactory: Option[ScheduledProcessListenerFactory] = None - def customAdditionalDeploymentDataProvider: Option[AdditionalDeploymentDataProvider] + def customAdditionalDeploymentDataProvider: Option[AdditionalDeploymentDataProvider] = None } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index f532639a2d6..9a22a572795 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -3,28 +3,26 @@ package pl.touk.nussknacker.test.mock import akka.actor.ActorSystem import cats.data.Validated.valid import cats.data.ValidatedNel -import cats.effect.IO import cats.effect.unsafe.IORuntime -import com.google.common.collect.LinkedHashMultimap import com.typesafe.config.Config import sttp.client3.testing.SttpBackendStub import pl.touk.nussknacker.engine._ +import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.ScenarioStateVerificationConfig -import pl.touk.nussknacker.engine.management.{ - FlinkConfig, - FlinkDeploymentManager, - FlinkStreamingDeploymentManagerProvider -} +import pl.touk.nussknacker.engine.management.{FlinkConfig, FlinkDeploymentManager, FlinkDeploymentManagerProvider} import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} import pl.touk.nussknacker.test.config.ConfigWithScalaVersion +import pl.touk.nussknacker.test.mock.MockDeploymentManager.{ + sampleCustomActionActivity, + sampleDeploymentId, + sampleStatusDetails +} import pl.touk.nussknacker.test.utils.domain.TestFactory -import shapeless.syntax.typeable.typeableOps +import pl.touk.nussknacker.ui.process.periodic.flink.FlinkClientStub import java.time.Instant import java.util.UUID @@ -34,78 +32,34 @@ import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Try // DEPRECATED!!! Use `WithMockableDeploymentManager` trait and `MockableDeploymentManager` instead -object MockDeploymentManager { - val savepointPath = "savepoints/123-savepoint" - val stopSavepointPath = "savepoints/246-stop-savepoint" - val maxParallelism = 10 - - def create( - defaultProcessStateStatus: StateStatus = SimpleStateStatus.NotDeployed, - deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider = - new ProcessingTypeDeployedScenariosProviderStub(List.empty), - actionService: ProcessingTypeActionService = new ProcessingTypeActionServiceStub, - scenarioActivityManager: ScenarioActivityManager = NoOpScenarioActivityManager, - customProcessStateDefinitionManager: Option[ProcessStateDefinitionManager] = None, - deploymentManagersClassLoader: Option[DeploymentManagersClassLoader] = None - ): MockDeploymentManager = { - new MockDeploymentManager( - defaultProcessStateStatus, - deployedScenariosProvider, - actionService, - scenarioActivityManager, - customProcessStateDefinitionManager, - DeploymentManagersClassLoader.create(List.empty).allocated.unsafeRunSync()(IORuntime.global) - )(ExecutionContext.global, IORuntime.global) - } - -} - class MockDeploymentManager private ( - defaultProcessStateStatus: StateStatus = SimpleStateStatus.NotDeployed, - deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider = - new ProcessingTypeDeployedScenariosProviderStub(List.empty), - actionService: ProcessingTypeActionService = new ProcessingTypeActionServiceStub, - scenarioActivityManager: ScenarioActivityManager = NoOpScenarioActivityManager, + modelData: ModelData, + deploymentManagerDependencies: DeploymentManagerDependencies, + defaultProcessStateStatus: StateStatus, + scenarioActivityManager: ScenarioActivityManager, customProcessStateDefinitionManager: Option[ProcessStateDefinitionManager], - deploymentManagersClassLoader: (DeploymentManagersClassLoader, IO[Unit]), -)(implicit executionContext: ExecutionContext, ioRuntime: IORuntime) - extends FlinkDeploymentManager( - ModelData( - ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig), - TestFactory.modelDependencies, - ModelClassLoader( - ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig).classPath, - None, - deploymentManagersClassLoader._1 - ) - ), - DeploymentManagerDependencies( - deployedScenariosProvider, - actionService, - scenarioActivityManager, - executionContext, - ioRuntime, - ActorSystem("MockDeploymentManager"), - SttpBackendStub.asynchronousFuture - ), - mainClassName = "UNUSED", - flinkConfig = FlinkConfig(None, scenarioStateVerification = ScenarioStateVerificationConfig(enabled = false)) + closeCreatedDeps: () => Unit, +) extends FlinkDeploymentManager( + modelData, + deploymentManagerDependencies, + FlinkConfig(None, scenarioStateVerification = ScenarioStateVerificationConfig(enabled = false)), + FlinkClientStub ) { - import MockDeploymentManager._ + import deploymentManagerDependencies._ - private def prepareProcessState(status: StateStatus, deploymentId: DeploymentId): List[StatusDetails] = - List(prepareProcessState(status, deploymentId, Some(ProcessVersion.empty))) + val deployResult = new ConcurrentHashMap[ProcessName, Future[Option[ExternalDeploymentId]]] - private def prepareProcessState( - status: StateStatus, - deploymentId: DeploymentId, - version: Option[ProcessVersion] - ): StatusDetails = - StatusDetails(status, Some(deploymentId), Some(ExternalDeploymentId("1")), version) + @volatile + var cancelResult: Future[Unit] = Future.successful(()) - // Pass correct deploymentId - private def fallbackDeploymentId = DeploymentId(UUID.randomUUID().toString) + val managerProcessStates = new ConcurrentHashMap[ProcessName, List[StatusDetails]] + + @volatile + var delayBeforeStateReturn: FiniteDuration = 0 seconds + + // queue of invocations to e.g. check that deploy was already invoked in "DeploymentManager" + val deploys = new ConcurrentLinkedQueue[ProcessName] override def processStateDefinitionManager: ProcessStateDefinitionManager = customProcessStateDefinitionManager match { @@ -119,7 +73,10 @@ class MockDeploymentManager private ( Future { Thread.sleep(delayBeforeStateReturn.toMillis) WithDataFreshnessStatus.fresh( - managerProcessStates.getOrDefault(name, prepareProcessState(defaultProcessStateStatus, fallbackDeploymentId)) + managerProcessStates.getOrDefault( + name, + List(sampleStatusDetails(defaultProcessStateStatus, sampleDeploymentId)) + ) ) } } @@ -129,188 +86,186 @@ class MockDeploymentManager private ( logger.debug(s"Adding deploy for ${processVersion.processName}") deploys.add(processVersion.processName) - val customActivityId = ScenarioActivityId.random for { - _ <- scenarioActivityManager.saveActivity( - ScenarioActivity.CustomAction( - scenarioId = ScenarioId(processVersion.processId.value), - scenarioActivityId = customActivityId, - user = ScenarioUser.internalNuUser, - date = Instant.now(), - scenarioVersionId = Some(ScenarioVersionId.from(processVersion.versionId)), - actionName = "Custom action of MockDeploymentManager just before deployment", - comment = ScenarioComment.from( - content = "With comment from DeploymentManager", - lastModifiedByUserName = ScenarioUser.internalNuUser.name, - lastModifiedAt = Instant.now() - ), - result = DeploymentResult.Success(Instant.now()), - ) - ) - externalDeploymentId <- this.synchronized { - Option(deployResult.get(processVersion.processName)) - .map(_.toArray(Array.empty[Future[Option[ExternalDeploymentId]]])) - .getOrElse(Array.empty) - .lastOption - .getOrElse(Future.successful(None)) - } + _ <- scenarioActivityManager.saveActivity(sampleCustomActionActivity(processVersion)) + externalDeploymentId <- deployResult.getOrDefault(processVersion.processName, Future.successful(None)) } yield externalDeploymentId } - override protected def waitForDuringDeployFinished( - processName: ProcessName, - deploymentId: ExternalDeploymentId - ): Future[Unit] = Future.successful(()) - - private val deployResult = LinkedHashMultimap.create[ProcessName, Future[Option[ExternalDeploymentId]]] - - private var cancelResult: Future[Unit] = Future.successful(()) + override protected def cancelScenario(command: DMCancelScenarioCommand): Future[Unit] = cancelResult - private val managerProcessStates = new ConcurrentHashMap[ProcessName, List[StatusDetails]] - - @volatile - private var delayBeforeStateReturn: FiniteDuration = 0 seconds - - // queue of invocations to e.g. check that deploy was already invoked in "ProcessManager" - val deploys = new ConcurrentLinkedQueue[ProcessName]() + // We override this field, because currently, this mock returns fallback for not defined scenarios states. + // To make stateQueryForAllScenariosSupport consistent with this approach, we should remove this fallback. + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport - def withWaitForDeployFinish[T](name: ProcessName)(action: => T): T = { - val promise = Promise[Option[ExternalDeploymentId]]() - val future = promise.future - synchronized { - deployResult.put(name, future) - } - try { - action - } finally { - promise.complete(Try(None)) - synchronized { - deployResult.remove(name, future) - } - } + override def close(): Unit = { + super.close() + closeCreatedDeps() } - def withWaitForCancelFinish[T](action: => T): T = { - val promise = Promise[Unit]() - try { - cancelResult = promise.future - action - } finally { - promise.complete(Try(())) - cancelResult = Future.successful(()) - } - } +} - def withFailingDeployment[T](name: ProcessName)(action: => T): T = { - val future = Future.failed(new RuntimeException("Failing deployment...")) - synchronized { - deployResult.put(name, future) - } - try { - action - } finally { - synchronized { - deployResult.remove(name, future) - } - } - } +object MockDeploymentManager { - def withDelayBeforeStateReturn[T](delay: FiniteDuration)(action: => T): T = { - delayBeforeStateReturn = delay - try { - action - } finally { - delayBeforeStateReturn = 0 seconds + def create( + defaultProcessStateStatus: StateStatus = SimpleStateStatus.NotDeployed, + deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider = + new ProcessingTypeDeployedScenariosProviderStub(List.empty), + actionService: ProcessingTypeActionService = new ProcessingTypeActionServiceStub, + scenarioActivityManager: ScenarioActivityManager = NoOpScenarioActivityManager, + customProcessStateDefinitionManager: Option[ProcessStateDefinitionManager] = None, + ): MockDeploymentManager = { + val actorSystem = ActorSystem("MockDeploymentManager") + val (deploymentManagersClassLoader, closeDeploymentManagerClassLoader) = + DeploymentManagersClassLoader.create(List.empty).allocated.unsafeRunSync()(IORuntime.global) + val modelData = ModelData( + ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig), + TestFactory.modelDependencies, + ModelClassLoader( + ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig).classPath, + None, + deploymentManagersClassLoader + ) + ) + val deploymentManagerDependencies = DeploymentManagerDependencies( + deployedScenariosProvider, + actionService, + scenarioActivityManager, + ExecutionContext.global, + IORuntime.global, + actorSystem, + SttpBackendStub.asynchronousFuture + ) + def closeCreatedDeps(): Unit = { + closeDeploymentManagerClassLoader.unsafeRunSync()(IORuntime.global) + actorSystem.terminate() } + new MockDeploymentManager( + modelData, + deploymentManagerDependencies, + defaultProcessStateStatus, + scenarioActivityManager, + customProcessStateDefinitionManager, + closeCreatedDeps, + ) } - def withProcessRunning[T](processName: ProcessName)(action: => T): T = { - withProcessStateStatus(processName, SimpleStateStatus.Running)(action) - } + private[mock] def sampleStatusDetails( + status: StateStatus, + deploymentId: DeploymentId, + version: Option[ProcessVersion] = Some(ProcessVersion.empty) + ): StatusDetails = StatusDetails(status, Some(deploymentId), Some(ExternalDeploymentId("1")), version) - def withProcessFinished[T](processName: ProcessName, deploymentId: DeploymentId = fallbackDeploymentId)( - action: => T - ): T = { - withProcessStateStatus(processName, SimpleStateStatus.Finished, deploymentId)(action) - } + // Pass correct deploymentId + private[mock] def sampleDeploymentId: DeploymentId = DeploymentId(UUID.randomUUID().toString) + + private def sampleCustomActionActivity(processVersion: ProcessVersion) = + ScenarioActivity.CustomAction( + scenarioId = ScenarioId(processVersion.processId.value), + scenarioActivityId = ScenarioActivityId.random, + user = ScenarioUser.internalNuUser, + date = Instant.now(), + scenarioVersionId = Some(ScenarioVersionId.from(processVersion.versionId)), + actionName = "Custom action of MockDeploymentManager just before deployment", + comment = ScenarioComment.from( + content = "With comment from DeploymentManager", + lastModifiedByUserName = ScenarioUser.internalNuUser.name, + lastModifiedAt = Instant.now() + ), + result = DeploymentResult.Success(Instant.now()), + ) - def withProcessStateStatus[T]( - processName: ProcessName, - status: StateStatus, - deploymentId: DeploymentId = fallbackDeploymentId - )(action: => T): T = { - withProcessStates(processName, prepareProcessState(status, deploymentId))(action) - } +} - def withProcessStateVersion[T](processName: ProcessName, status: StateStatus, version: Option[ProcessVersion])( - action: => T - ): T = { - withProcessStates(processName, List(prepareProcessState(status, fallbackDeploymentId, version)))(action) - } +object MockDeploymentManagerSyntaxSugar { - def withEmptyProcessState[T](processName: ProcessName)(action: => T): T = { - withProcessStates(processName, List.empty)(action) - } + implicit class Ops(deploymentManager: MockDeploymentManager) { - def withProcessStates[T](processName: ProcessName, statuses: List[StatusDetails])(action: => T): T = { - try { - managerProcessStates.put(processName, statuses) - action - } finally { - managerProcessStates.remove(processName) + def withWaitForDeployFinish[T](name: ProcessName)(action: => T): T = { + val promise = Promise[Option[ExternalDeploymentId]]() + val future = promise.future + deploymentManager.deployResult.put(name, future) + try { + action + } finally { + promise.complete(Try(None)) + deploymentManager.deployResult.remove(name, future) + } } - } - override protected def makeSavepoint( - deploymentId: ExternalDeploymentId, - savepointDir: Option[String] - ): Future[SavepointResult] = Future.successful(SavepointResult(path = savepointPath)) + def withWaitForCancelFinish[T](action: => T): T = { + val promise = Promise[Unit]() + try { + deploymentManager.cancelResult = promise.future + action + } finally { + promise.complete(Try(())) + deploymentManager.cancelResult = Future.successful(()) + } + } - override protected def stop( - deploymentId: ExternalDeploymentId, - savepointDir: Option[String] - ): Future[SavepointResult] = Future.successful(SavepointResult(path = stopSavepointPath)) + def withFailingDeployment[T](name: ProcessName)(action: => T): T = { + val future = Future.failed(new RuntimeException("Failing deployment...")) + deploymentManager.deployResult.put(name, future) + try { + action + } finally { + deploymentManager.deployResult.remove(name, future) + } + } - override protected def runProgram( - processName: ProcessName, - mainClass: String, - args: List[String], - savepointPath: Option[String], - deploymentId: Option[newdeployment.DeploymentId] - ): Future[Option[ExternalDeploymentId]] = ??? + def withDelayBeforeStateReturn[T](delay: FiniteDuration)(action: => T): T = { + deploymentManager.delayBeforeStateReturn = delay + try { + action + } finally { + deploymentManager.delayBeforeStateReturn = 0 seconds + } + } - override def close(): Unit = { - deploymentManagersClassLoader._2.unsafeRunSync() - } + def withProcessStates[T](processName: ProcessName, statuses: List[StatusDetails])(action: => T): T = { + try { + deploymentManager.managerProcessStates.put(processName, statuses) + action + } finally { + deploymentManager.managerProcessStates.remove(processName) + } + } - override def cancelDeployment(command: DMCancelDeploymentCommand): Future[Unit] = Future.successful(()) + def withProcessRunning[T](processName: ProcessName)(action: => T): T = { + withProcessStateStatus(processName, SimpleStateStatus.Running)(action) + } - override def cancelScenario(command: DMCancelScenarioCommand): Future[Unit] = cancelResult + def withProcessFinished[T](processName: ProcessName, deploymentId: DeploymentId = sampleDeploymentId)( + action: => T + ): T = { + withProcessStateStatus(processName, SimpleStateStatus.Finished, deploymentId)(action) + } - override protected def cancelFlinkJob(deploymentId: ExternalDeploymentId): Future[Unit] = Future.successful(()) + def withProcessStateStatus[T]( + processName: ProcessName, + status: StateStatus, + deploymentId: DeploymentId = sampleDeploymentId + )(action: => T): T = { + withProcessStates(processName, List(sampleStatusDetails(status, deploymentId)))(action) + } - override protected def checkRequiredSlotsExceedAvailableSlots( - canonicalProcess: CanonicalProcess, - currentlyDeployedJobsIds: List[ExternalDeploymentId] - ): Future[Unit] = - if (canonicalProcess.metaData.typeSpecificData - .cast[StreamMetaData] - .flatMap(_.parallelism) - .exists(_ > maxParallelism)) { - Future.failed(new IllegalArgumentException("Parallelism too large")) - } else { - Future.successful(()) + def withProcessStateVersion[T](processName: ProcessName, status: StateStatus, version: Option[ProcessVersion])( + action: => T + ): T = { + withProcessStates(processName, List(sampleStatusDetails(status, sampleDeploymentId, version)))(action) } - override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + def withEmptyProcessState[T](processName: ProcessName)(action: => T): T = { + withProcessStates(processName, List.empty)(action) + } - override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + } - override def schedulingSupport: SchedulingSupport = NoSchedulingSupport } class MockManagerProvider(deploymentManager: DeploymentManager = MockDeploymentManager.create()) - extends FlinkStreamingDeploymentManagerProvider { + extends FlinkDeploymentManagerProvider { override def createDeploymentManager( modelData: BaseModelData, diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesConcurrentSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesConcurrentSpec.scala index bebf7f0d395..5c71e827b8b 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesConcurrentSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesConcurrentSpec.scala @@ -10,6 +10,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, OptionValues} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.test.PatientScalaFutures import pl.touk.nussknacker.test.base.it.NuResourcesTest +import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops import pl.touk.nussknacker.test.utils.domain.ProcessTestData import scala.jdk.CollectionConverters._ diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala index c9ad9f2cc67..57341620cac 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala @@ -22,12 +22,13 @@ import pl.touk.nussknacker.restmodel.scenariodetails._ import pl.touk.nussknacker.security.Permission import pl.touk.nussknacker.test.PatientScalaFutures import pl.touk.nussknacker.test.base.it.NuResourcesTest -import pl.touk.nussknacker.test.mock.MockDeploymentManager +import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops import pl.touk.nussknacker.test.utils.domain.TestFactory.{withAllPermissions, withPermissions} import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory} import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos import pl.touk.nussknacker.ui.process.ScenarioQuery import pl.touk.nussknacker.ui.process.exception.ProcessIllegalAction +import pl.touk.nussknacker.ui.process.periodic.flink.FlinkClientStub // TODO: all these tests should be migrated to ManagementApiHttpServiceBusinessSpec or ManagementApiHttpServiceSecuritySpec class ManagementResourcesSpec @@ -286,10 +287,11 @@ class ManagementResourcesSpec } test("should return failure for not validating deployment") { + val requestedParallelism = FlinkClientStub.maxParallelism + 1 val largeParallelismScenario = ProcessTestData.sampleScenario.copy(metaData = MetaData( ProcessTestData.sampleScenario.name.value, - StreamMetaData(parallelism = Some(MockDeploymentManager.maxParallelism + 1)) + StreamMetaData(parallelism = Some(requestedParallelism)) ) ) saveCanonicalProcessAndAssertSuccess(largeParallelismScenario) @@ -297,7 +299,10 @@ class ManagementResourcesSpec deploymentManager.withFailingDeployment(largeParallelismScenario.name) { deployProcess(largeParallelismScenario.name) ~> check { status shouldBe StatusCodes.BadRequest - responseAs[String] shouldBe "Parallelism too large" + responseAs[ + String + ] shouldBe s"Not enough free slots on Flink cluster. Available slots: ${FlinkClientStub.maxParallelism}, requested: ${requestedParallelism}. " + + s"Decrease scenario's parallelism or extend Flink cluster resources" } } } @@ -317,7 +322,7 @@ class ManagementResourcesSpec deploymentManager.withProcessRunning(ProcessTestData.sampleScenario.name) { snapshot(ProcessTestData.sampleScenario.name) ~> check { status shouldBe StatusCodes.OK - responseAs[String] shouldBe MockDeploymentManager.savepointPath + responseAs[String] shouldBe FlinkClientStub.savepointPath } } } @@ -327,7 +332,7 @@ class ManagementResourcesSpec deploymentManager.withProcessRunning(ProcessTestData.sampleScenario.name) { stop(ProcessTestData.sampleScenario.name) ~> check { status shouldBe StatusCodes.OK - responseAs[String] shouldBe MockDeploymentManager.stopSavepointPath + responseAs[String] shouldBe FlinkClientStub.stopSavepointPath } } } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala index 8c64b6553b9..f08e9135969 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala @@ -12,6 +12,7 @@ import pl.touk.nussknacker.security.Permission import pl.touk.nussknacker.test.PatientScalaFutures import pl.touk.nussknacker.test.utils.domain.TestFactory.withAllPermissions import pl.touk.nussknacker.test.base.it.NuResourcesTest +import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops import pl.touk.nussknacker.test.utils.domain.ProcessTestData import pl.touk.nussknacker.ui.listener.ProcessChangeEvent._ import pl.touk.nussknacker.ui.security.api.LoggedUser diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index 8ca798b1793..74d462f2b33 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.ui.notifications import akka.actor.ActorSystem import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.when -import org.scalatest.OptionValues +import org.scalatest.{BeforeAndAfterAll, OptionValues} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar @@ -59,7 +59,8 @@ class NotificationServiceTest with WithHsqlDbTesting with EitherValuesDetailedMessage with OptionValues - with DBIOActionValues { + with DBIOActionValues + with BeforeAndAfterAll { private implicit val system: ActorSystem = ActorSystem(getClass.getSimpleName) override protected val dbioRunner: DBIOActionRunner = DBIOActionRunner(testDbRef) @@ -93,6 +94,11 @@ class NotificationServiceTest private val expectedRefreshAfterSuccess = List(DataToRefresh.activity, DataToRefresh.state) private val expectedRefreshAfterFail = List(DataToRefresh.state) + override protected def afterAll(): Unit = { + super.afterAll() + system.terminate().futureValue + } + test("Should return only events for user in given time") { val processName = ProcessName("fooProcess") val id = saveSampleProcess(processName) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala index ff1f48a023b..b9e4d7ec8ce 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting import pl.touk.nussknacker.test.base.it.WithClock +import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops import pl.touk.nussknacker.test.mock.{MockDeploymentManager, TestProcessChangeListener} import pl.touk.nussknacker.test.utils.domain.TestFactory._ import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory} @@ -27,6 +28,7 @@ import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, NuScalaTestAssertions, PatientScalaFutures} import pl.touk.nussknacker.ui.api.DeploymentCommentSettings import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionSuccess} +import pl.touk.nussknacker.ui.process.periodic.flink.FlinkClientStub import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState} @@ -53,8 +55,6 @@ class DeploymentServiceSpec with WithClock with EitherValuesDetailedMessage { - import VersionId._ - private implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh private implicit val system: ActorSystem = ActorSystem() @@ -420,8 +420,9 @@ class DeploymentServiceSpec test("Should skip notifications and deployment on validation errors") { val processName: ProcessName = generateProcessName + val requestedParallelism = FlinkClientStub.maxParallelism + 1 val processIdWithName = - prepareProcess(processName, Some(MockDeploymentManager.maxParallelism + 1)).dbioActionValues + prepareProcess(processName, Some(requestedParallelism)).dbioActionValues deploymentManager.withEmptyProcessState(processName) { val result = @@ -435,7 +436,8 @@ class DeploymentServiceSpec ) .failed .futureValue - result.getMessage shouldBe "Parallelism too large" + result.getMessage shouldBe s"Not enough free slots on Flink cluster. Available slots: ${FlinkClientStub.maxParallelism}, requested: $requestedParallelism. " + + s"Decrease scenario's parallelism or extend Flink cluster resources" deploymentManager.deploys should not contain processName fetchingProcessRepository .fetchLatestProcessDetailsForProcessId[Unit](processIdWithName.id) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/FlinkClientStub.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/FlinkClientStub.scala index b1b260d1a7d..56c72dcc2eb 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/FlinkClientStub.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/FlinkClientStub.scala @@ -8,24 +8,31 @@ import pl.touk.nussknacker.engine.management.rest.{FlinkClient, flinkRestModel} import java.io.File import scala.concurrent.Future -class FlinkClientStub extends FlinkClient { +object FlinkClientStub extends FlinkClient { + + val maxParallelism = 10 + val savepointPath = "savepoints/123-savepoint" + val stopSavepointPath = "savepoints/246-stop-savepoint" override def getJobsOverviews()( implicit freshnessPolicy: DataFreshnessPolicy - ): Future[WithDataFreshnessStatus[List[flinkRestModel.JobOverview]]] = ??? + ): Future[WithDataFreshnessStatus[List[flinkRestModel.JobOverview]]] = + Future.successful(WithDataFreshnessStatus.fresh(List.empty)) - override def getJobDetails(jobId: String): Future[Option[flinkRestModel.JobDetails]] = ??? + override def getJobDetails(jobId: String): Future[Option[flinkRestModel.JobDetails]] = Future.successful(None) - override def getJobConfig(jobId: String): Future[flinkRestModel.ExecutionConfig] = ??? + override def getJobConfig(jobId: String): Future[flinkRestModel.ExecutionConfig] = + Future.successful(flinkRestModel.ExecutionConfig(1, Map.empty)) - override def cancel(deploymentId: ExternalDeploymentId): Future[Unit] = ??? + override def cancel(deploymentId: ExternalDeploymentId): Future[Unit] = Future.successful(()) override def makeSavepoint( deploymentId: ExternalDeploymentId, savepointDir: Option[String] - ): Future[SavepointResult] = ??? + ): Future[SavepointResult] = Future.successful(SavepointResult(savepointPath)) - override def stop(deploymentId: ExternalDeploymentId, savepointDir: Option[String]): Future[SavepointResult] = ??? + override def stop(deploymentId: ExternalDeploymentId, savepointDir: Option[String]): Future[SavepointResult] = + Future.successful(SavepointResult(stopSavepointPath)) override def runProgram( jarFile: File, @@ -33,12 +40,15 @@ class FlinkClientStub extends FlinkClient { args: List[String], savepointPath: Option[String], jobId: Option[String] - ): Future[Option[ExternalDeploymentId]] = ??? + ): Future[Option[ExternalDeploymentId]] = Future.successful(None) override def deleteJarIfExists(jarFileName: String): Future[Unit] = Future.successful(()) - override def getClusterOverview: Future[flinkRestModel.ClusterOverview] = ??? + override def getClusterOverview: Future[flinkRestModel.ClusterOverview] = + Future.successful( + flinkRestModel.ClusterOverview(`slots-total` = maxParallelism, `slots-available` = maxParallelism) + ) - override def getJobManagerConfig: Future[Configuration] = ??? + override def getJobManagerConfig: Future[Configuration] = Future.successful(new Configuration) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/ScheduledExecutionPerformerTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/ScheduledExecutionPerformerTest.scala index 2dc0e14252d..c0d8e8b5a16 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/ScheduledExecutionPerformerTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/ScheduledExecutionPerformerTest.scala @@ -40,7 +40,7 @@ class ScheduledExecutionPerformerTest extends AnyFunSuite with Matchers with Sca ): ScheduledExecutionPerformer = { new FlinkScheduledExecutionPerformer( - flinkClient = new FlinkClientStub, + flinkClient = FlinkClientStub, jarsDir = jarsDir, inputConfigDuringExecution = InputConfigDuringExecution(ConfigFactory.empty()), modelJarProvider = modelJarProvider diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index e728a37b012..5fc66549da7 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -109,6 +109,8 @@ To see the biggest differences please consult the [changelog](Changelog.md). * Other methods were considered too much low-level and were removed * Instead of using `ResultsCollectingListenerHolder.registerListener` or `ResultsCollectingListenerHolder.registerTestEngineListener` should be used `withListener`/`withTestEngineListener` methods which properly cleanup allocated resources. +* [#7540](https://github.com/TouK/nussknacker/pull/7540) `FlinkStreamingDeploymentManagerProvider` was renamed to `FlinkDeploymentManagerProvider`, + `FlinkStreamingRestManager` and `FlinkRestManager` abstraction layers were removed - only `FlinkDeploymentManager` exists ## In version 1.18.0 diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala index 2664aeccb2e..c94a35f3a1c 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala @@ -15,7 +15,7 @@ import pl.touk.nussknacker.engine.api.deployment.{ ProcessingTypeDeployedScenariosProviderStub } import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode -import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider +import pl.touk.nussknacker.engine.management.FlinkDeploymentManagerProvider import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} object FlinkStreamingDeploymentManagerProviderHelper { @@ -48,7 +48,7 @@ object FlinkStreamingDeploymentManagerProviderHelper { actorSystem, backend ) - new FlinkStreamingDeploymentManagerProvider() + new FlinkDeploymentManagerProvider() .createDeploymentManager( modelData, deploymentManagerDependencies, diff --git a/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider b/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider index a98ccd750e2..ef5b47a11fb 100644 --- a/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider +++ b/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider @@ -1 +1 @@ -pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider \ No newline at end of file +pl.touk.nussknacker.engine.management.FlinkDeploymentManagerProvider diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 96631df5e2a..b023ca7b755 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -1,37 +1,47 @@ package pl.touk.nussknacker.engine.management +import cats.data.NonEmptyList import cats.implicits._ +import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import io.circe.syntax.EncoderOps +import org.apache.flink.api.common.{JobID, JobStatus} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector +import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} +import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.{ FlinkMiniClusterScenarioStateVerifier, FlinkMiniClusterScenarioTestRunner } import pl.touk.nussknacker.engine.flink.minicluster.util.DurationToRetryPolicyConverterOps.DurationOps -import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs +import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.{MainClassName, prepareProgramArgs} +import pl.touk.nussknacker.engine.management.rest.FlinkClient +import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} import scala.concurrent.Future -abstract class FlinkDeploymentManager( +class FlinkDeploymentManager( modelData: BaseModelData, dependencies: DeploymentManagerDependencies, - mainClassName: String, - flinkConfig: FlinkConfig + flinkConfig: FlinkConfig, + client: FlinkClient, ) extends DeploymentManager with LazyLogging { import dependencies._ + private val modelJarProvider = new FlinkModelJarProvider(modelData.modelClassLoaderUrls) + + private val slotsChecker = new FlinkSlotsChecker(client) + private val miniClusterWithServicesOpt = { FlinkMiniClusterFactory.createMiniClusterWithServicesIfConfigured( modelData.modelClassLoader, @@ -56,6 +66,8 @@ abstract class FlinkDeploymentManager( flinkConfig.scenarioStateVerification.timeout.toPausePolicy ) + private val statusDeterminer = new FlinkStatusDetailsDeterminer(modelData.namingStrategy, client.getJobConfig) + /** * Gets status from engine, handles finished state, resolves possible inconsistency with lastAction and formats status using `ProcessStateDefinitionManager` */ @@ -128,16 +140,16 @@ abstract class FlinkDeploymentManager( case command: DMCancelScenarioCommand => cancelScenario(command) case DMStopDeploymentCommand(processName, deploymentId, savepointDir, _) => requireSingleRunningJob(processName, _.deploymentId.contains(deploymentId)) { - stop(_, savepointDir) + client.stop(_, savepointDir) } case DMStopScenarioCommand(processName, savepointDir, _) => requireSingleRunningJob(processName, _ => true) { - stop(_, savepointDir) + client.stop(_, savepointDir) } case DMMakeScenarioSavepointCommand(processName, savepointDir) => // TODO: savepoint for given deployment id requireSingleRunningJob(processName, _ => true) { - makeSavepoint(_, savepointDir) + client.makeSavepoint(_, savepointDir) } case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => testRunner.runTests(canonicalProcess, scenarioTestData) @@ -159,45 +171,20 @@ abstract class FlinkDeploymentManager( import command._ val processName = processVersion.processName - val stoppingResult = command.updateStrategy match { - case DeploymentUpdateStrategy.ReplaceDeploymentWithSameScenarioName(_) => - for { - oldJobs <- oldJobsToStop(processVersion) - externalDeploymentIds = oldJobs - .sortBy(_.startTime)(Ordering[Option[Long]].reverse) - .flatMap(_.externalDeploymentId) - savepoints <- Future.sequence( - externalDeploymentIds.map(stopSavingSavepoint(processVersion, _, canonicalProcess)) - ) - } yield { - logger.info(s"Deploying $processName. ${Option(savepoints) - .filter(_.nonEmpty) - .map(_.mkString("Saving savepoints finished: ", ", ", ".")) - .getOrElse("There was no job to stop.")}") - savepoints - } - case DeploymentUpdateStrategy.DontReplaceDeployment => - Future.successful(List.empty) - } for { - savepointList <- stoppingResult + stoppedJobsSavepoints <- stopOldJobsIfNeeded(command) + _ = { + logger.info(s"Deploying $processName. ${NonEmptyList + .fromList(stoppedJobsSavepoints) + .map(_.toList.mkString("Saving savepoints finished: ", ", ", ".")) + .getOrElse("There was no job to stop.")}") + } + savepointPath = determineSavepointPath(command.updateStrategy, stoppedJobsSavepoints) // In case of redeploy we double check required slots which is not bad because can be some run between jobs and it is better to check it again _ <- checkRequiredSlotsExceedAvailableSlots(canonicalProcess, List.empty) - savepointPath = command.updateStrategy match { - case DeploymentUpdateStrategy.DontReplaceDeployment => None - case DeploymentUpdateStrategy.ReplaceDeploymentWithSameScenarioName( - StateRestoringStrategy.RestoreStateFromReplacedJobSavepoint - ) => - // TODO: Better handle situation with more than one jobs stopped - savepointList.headOption - case DeploymentUpdateStrategy.ReplaceDeploymentWithSameScenarioName( - StateRestoringStrategy.RestoreStateFromCustomSavepoint(savepointPath) - ) => - Some(savepointPath) - } runResult <- runProgram( processName, - mainClassName, + MainClassName, prepareProgramArgs( modelData.inputConfigDuringExecution.serialized, processVersion, @@ -211,7 +198,24 @@ abstract class FlinkDeploymentManager( } yield runResult } - protected def waitForDuringDeployFinished(processName: ProcessName, deploymentId: ExternalDeploymentId): Future[Unit] + private def stopOldJobsIfNeeded(command: DMRunDeploymentCommand) = { + import command._ + + command.updateStrategy match { + case DeploymentUpdateStrategy.ReplaceDeploymentWithSameScenarioName(_) => + for { + oldJobs <- oldJobsToStop(processVersion) + externalDeploymentIds = oldJobs + .sortBy(_.startTime)(Ordering[Option[Long]].reverse) + .flatMap(_.externalDeploymentId) + savepoints <- Future.sequence( + externalDeploymentIds.map(stopSavingSavepoint(processVersion, _, canonicalProcess)) + ) + } yield savepoints + case DeploymentUpdateStrategy.DontReplaceDeployment => + Future.successful(List.empty) + } + } private def oldJobsToStop(processVersion: ProcessVersion): Future[List[StatusDetails]] = { implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh @@ -219,10 +223,19 @@ abstract class FlinkDeploymentManager( .map(_.value.filter(details => SimpleStateStatus.DefaultFollowingDeployStatuses.contains(details.status))) } - protected def checkRequiredSlotsExceedAvailableSlots( - canonicalProcess: CanonicalProcess, - currentlyDeployedJobsIds: List[ExternalDeploymentId] - ): Future[Unit] + private def determineSavepointPath(updateStrategy: DeploymentUpdateStrategy, stoppedJobsSavepoints: List[String]) = + updateStrategy match { + case DeploymentUpdateStrategy.DontReplaceDeployment => None + case DeploymentUpdateStrategy.ReplaceDeploymentWithSameScenarioName( + StateRestoringStrategy.RestoreStateFromReplacedJobSavepoint + ) => + // TODO: Better handle situation with more than one jobs stopped + stoppedJobsSavepoints.headOption + case DeploymentUpdateStrategy.ReplaceDeploymentWithSameScenarioName( + StateRestoringStrategy.RestoreStateFromCustomSavepoint(savepointPath) + ) => + Some(savepointPath) + } private def requireSingleRunningJob[T](processName: ProcessName, statusDetailsPredicate: StatusDetails => Boolean)( action: ExternalDeploymentId => Future[T] @@ -243,16 +256,6 @@ abstract class FlinkDeploymentManager( } } - private def checkIfJobIsCompatible( - savepointPath: String, - canonicalProcess: CanonicalProcess, - processVersion: ProcessVersion - ): Future[Unit] = - if (flinkConfig.scenarioStateVerification.enabled) - verification.verify(processVersion, canonicalProcess, savepointPath) - else - Future.successful(()) - private def stopSavingSavepoint( processVersion: ProcessVersion, deploymentId: ExternalDeploymentId, @@ -260,31 +263,182 @@ abstract class FlinkDeploymentManager( ): Future[String] = { logger.debug(s"Making savepoint of ${processVersion.processName}. Deployment: $deploymentId") for { - savepointResult <- makeSavepoint(deploymentId, savepointDir = None) + savepointResult <- client.makeSavepoint(deploymentId, savepointDir = None) savepointPath = savepointResult.path _ <- checkIfJobIsCompatible(savepointPath, canonicalProcess, processVersion) - _ <- cancelFlinkJob(deploymentId) + _ <- client.cancel(deploymentId) } yield savepointPath } - protected def cancelScenario(command: DMCancelScenarioCommand): Future[Unit] + private def checkIfJobIsCompatible( + savepointPath: String, + canonicalProcess: CanonicalProcess, + processVersion: ProcessVersion + ): Future[Unit] = + if (flinkConfig.scenarioStateVerification.enabled) + verification.verify(processVersion, canonicalProcess, savepointPath) + else + Future.successful(()) - protected def cancelDeployment(command: DMCancelDeploymentCommand): Future[Unit] + override def getProcessStates( + name: ProcessName + )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { + getAllProcessesStatesFromFlink().map(_.getOrElse(name, List.empty)) + } - protected def cancelFlinkJob(deploymentId: ExternalDeploymentId): Future[Unit] + override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport = + new DeploymentSynchronisationSupported { + + override def getDeploymentStatusesToUpdate( + deploymentIdsToCheck: Set[newdeployment.DeploymentId] + ): Future[Map[newdeployment.DeploymentId, DeploymentStatus]] = { + Future + .sequence( + deploymentIdsToCheck.toSeq + .map { deploymentId => + client + .getJobDetails(toJobId(deploymentId)) + .map(_.map { jobDetails => + deploymentId -> FlinkStatusDetailsDeterminer + .toDeploymentStatus(JobStatus.valueOf(jobDetails.state), jobDetails.`status-counts`) + }) + } + ) + .map(_.flatten.toMap) + } + + } - protected def makeSavepoint(deploymentId: ExternalDeploymentId, savepointDir: Option[String]): Future[SavepointResult] + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = + new StateQueryForAllScenariosSupported { + + override def getAllProcessesStates()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = getAllProcessesStatesFromFlink() + + } - protected def stop(deploymentId: ExternalDeploymentId, savepointDir: Option[String]): Future[SavepointResult] + override def schedulingSupport: SchedulingSupport = new SchedulingSupported { - protected def runProgram( + override def createScheduledExecutionPerformer( + modelData: BaseModelData, + dependencies: DeploymentManagerDependencies, + config: Config, + ): ScheduledExecutionPerformer = FlinkScheduledExecutionPerformer.create(modelData, dependencies, config) + + } + + private def getAllProcessesStatesFromFlink()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = { + client + .getJobsOverviews() + .flatMap { result => + statusDeterminer + .statusDetailsFromJobOverviews(result.value) + .map( + WithDataFreshnessStatus(_, cached = result.cached) + ) // TODO: How to do it nicer? + } + } + + private def waitForDuringDeployFinished( + processName: ProcessName, + deploymentId: ExternalDeploymentId + ): Future[Unit] = { + flinkConfig.waitForDuringDeployFinish.toEnabledConfig + .map { config => + retry + .Pause(config.maxChecks, config.delay) + .apply { + implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh + getProcessStates(processName).map { statuses => + statuses.value + .find(details => + details.externalDeploymentId + .contains(deploymentId) && details.status == SimpleStateStatus.DuringDeploy + ) + .map(Left(_)) + .getOrElse(Right(())) + } + } + .map( + _.getOrElse( + throw new IllegalStateException( + "Deploy execution finished, but job is still in during deploy state on Flink" + ) + ) + ) + } + .getOrElse(Future.successful(())) + } + + protected def cancelScenario(command: DMCancelScenarioCommand): Future[Unit] = { + import command._ + implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh + getProcessStates(scenarioName).map(_.value).flatMap { statuses => + cancelEachMatchingJob(scenarioName, None, statuses) + } + } + + private def cancelDeployment(command: DMCancelDeploymentCommand): Future[Unit] = { + import command._ + implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh + getProcessStates(scenarioName).map(_.value).flatMap { statuses => + cancelEachMatchingJob(scenarioName, Some(deploymentId), statuses.filter(_.deploymentId.contains(deploymentId))) + } + } + + private def cancelEachMatchingJob( + processName: ProcessName, + deploymentId: Option[DeploymentId], + statuses: List[StatusDetails] + ) = { + statuses.filterNot(details => SimpleStateStatus.isFinalOrTransitioningToFinalStatus(details.status)) match { + case Nil => + logger.warn( + s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not active on Flink." + ) + Future.successful(()) + case single :: Nil => client.cancel(single.externalDeploymentIdUnsafe) + case moreThanOne @ (_ :: _ :: _) => + logger.warn( + s"Found duplicate jobs of $processName${deploymentId.map(" with id: " + _).getOrElse("")}: $moreThanOne. Cancelling all in non terminal state." + ) + Future.sequence(moreThanOne.map(_.externalDeploymentIdUnsafe).map(client.cancel)).map(_ => ()) + } + } + + private def runProgram( processName: ProcessName, mainClass: String, args: List[String], savepointPath: Option[String], - // TODO: make it mandatory - see TODO in newdeployment.DeploymentService deploymentId: Option[newdeployment.DeploymentId] - ): Future[Option[ExternalDeploymentId]] + ): Future[Option[ExternalDeploymentId]] = { + logger.debug(s"Starting to deploy scenario: $processName with savepoint $savepointPath") + client.runProgram( + modelJarProvider.getJobJar(), + mainClass, + args, + savepointPath, + deploymentId.map(toJobId) + ) + } + + private def toJobId(did: newdeployment.DeploymentId) = { + new JobID(did.value.getLeastSignificantBits, did.value.getMostSignificantBits).toHexString + } + + private def checkRequiredSlotsExceedAvailableSlots( + canonicalProcess: CanonicalProcess, + currentlyDeployedJobsIds: List[ExternalDeploymentId] + ): Future[Unit] = { + if (flinkConfig.shouldCheckAvailableSlots) { + slotsChecker.checkRequiredSlotsExceedAvailableSlots(canonicalProcess, currentlyDeployedJobsIds) + } else + Future.successful(()) + } override def processStateDefinitionManager: ProcessStateDefinitionManager = FlinkProcessStateDefinitionManager @@ -297,6 +451,8 @@ abstract class FlinkDeploymentManager( object FlinkDeploymentManager { + private[management] val MainClassName = "pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain" + def prepareProgramArgs( serializedConfig: String, processVersion: ProcessVersion, diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerProvider.scala similarity index 96% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala rename to engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerProvider.scala index 515c1db54ed..e719cb76604 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerProvider.scala @@ -20,7 +20,7 @@ import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.Try -class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider with LazyLogging { +class FlinkDeploymentManagerProvider extends DeploymentManagerProvider with LazyLogging { import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ @@ -39,7 +39,7 @@ class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider import dependencies._ val flinkConfig = deploymentConfig.rootAs[FlinkConfig] FlinkClient.create(flinkConfig, scenarioStateCacheTTL).map { client => - val underlying = new FlinkStreamingRestManager(client, flinkConfig, modelData, dependencies) + val underlying = new FlinkDeploymentManager(modelData, dependencies, flinkConfig, client) CachingProcessStateDeploymentManager.wrapWithCachingIfNeeded(underlying, scenarioStateCacheTTL) } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala deleted file mode 100644 index 5242ec6725e..00000000000 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ /dev/null @@ -1,318 +0,0 @@ -package pl.touk.nussknacker.engine.management - -import com.typesafe.config.Config -import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.api.common.{JobID, JobStatus} -import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} -import pl.touk.nussknacker.engine.management.FlinkRestManager.ParsedJobConfig -import pl.touk.nussknacker.engine.management.rest.FlinkClient -import pl.touk.nussknacker.engine.management.rest.flinkRestModel.{BaseJobStatusCounts, JobOverview} -import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps -import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} - -import scala.concurrent.Future - -class FlinkRestManager( - client: FlinkClient, - config: FlinkConfig, - modelData: BaseModelData, - dependencies: DeploymentManagerDependencies, - mainClassName: String, -) extends FlinkDeploymentManager( - modelData, - dependencies, - mainClassName, - config - ) - with LazyLogging { - - import dependencies._ - - private val modelJarProvider = new FlinkModelJarProvider(modelData.modelClassLoaderUrls) - - private val slotsChecker = new FlinkSlotsChecker(client) - - override def getProcessStates( - name: ProcessName - )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - getAllProcessesStatesFromFlink().map(_.getOrElse(name, List.empty)) - } - - override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport = - new DeploymentSynchronisationSupported { - - override def getDeploymentStatusesToUpdate( - deploymentIdsToCheck: Set[newdeployment.DeploymentId] - ): Future[Map[newdeployment.DeploymentId, DeploymentStatus]] = { - Future - .sequence( - deploymentIdsToCheck.toSeq - .map { deploymentId => - client - .getJobDetails(toJobId(deploymentId)) - .map(_.map { jobDetails => - deploymentId -> toDeploymentStatus(jobDetails.state, jobDetails.`status-counts`) - }) - } - ) - .map(_.flatten.toMap) - } - - } - - override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = - new StateQueryForAllScenariosSupported { - - override def getAllProcessesStates()( - implicit freshnessPolicy: DataFreshnessPolicy - ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = getAllProcessesStatesFromFlink() - - } - - override def schedulingSupport: SchedulingSupport = new SchedulingSupported { - - override def createScheduledExecutionPerformer( - modelData: BaseModelData, - dependencies: DeploymentManagerDependencies, - config: Config, - ): ScheduledExecutionPerformer = FlinkScheduledExecutionPerformer.create(modelData, dependencies, config) - - override def customSchedulePropertyExtractorFactory: Option[SchedulePropertyExtractorFactory] = None - override def customProcessConfigEnricherFactory: Option[ProcessConfigEnricherFactory] = None - override def customScheduledProcessListenerFactory: Option[ScheduledProcessListenerFactory] = None - override def customAdditionalDeploymentDataProvider: Option[AdditionalDeploymentDataProvider] = None - - } - - private def getAllProcessesStatesFromFlink()( - implicit freshnessPolicy: DataFreshnessPolicy - ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = { - client - .getJobsOverviews() - .flatMap { result => - statusDetailsFromJobOverviews(result.value).map( - WithDataFreshnessStatus(_, cached = result.cached) - ) // TODO: How to do it nicer? - } - } - - private def statusDetailsFromJobOverviews( - jobOverviews: List[JobOverview] - ): Future[Map[ProcessName, List[StatusDetails]]] = Future - .sequence { - jobOverviews - .groupBy(_.name) - .flatMap { case (name, jobs) => - modelData.namingStrategy.decodeName(name).map(decoded => (ProcessName(decoded), jobs)) - } - .map { case (name, jobs) => - val statusDetails = jobs.map { job => - withParsedJobConfig(job.jid, name).map { jobConfig => - // TODO: return error when there's no correct version in process - // currently we're rather lax on this, so that this change is backward-compatible - // we log debug here for now, since it's invoked v. often - if (jobConfig.isEmpty) { - logger.debug(s"No correct job details in deployed scenario: ${job.name}") - } - StatusDetails( - SimpleStateStatus.fromDeploymentStatus(toDeploymentStatus(job.state, job.tasks)), - jobConfig.flatMap(_.deploymentId), - Some(ExternalDeploymentId(job.jid)), - version = jobConfig.map(_.version), - startTime = Some(job.`start-time`), - attributes = Option.empty, - errors = List.empty - ) - } - } - Future.sequence(statusDetails).map((name, _)) - } - - } - .map(_.toMap) - - private def toDeploymentStatus(jobState: String, jobStatusCounts: BaseJobStatusCounts): DeploymentStatus = { - toJobStatus(jobState) match { - case JobStatus.RUNNING if ensureTasksRunning(jobStatusCounts) => DeploymentStatus.Running - case JobStatus.RUNNING | JobStatus.INITIALIZING | JobStatus.CREATED => DeploymentStatus.DuringDeploy - case JobStatus.FINISHED => DeploymentStatus.Finished - case JobStatus.RESTARTING => DeploymentStatus.Restarting - case JobStatus.CANCELED => DeploymentStatus.Canceled - case JobStatus.CANCELLING => DeploymentStatus.DuringCancel - // The job is not technically running, but should be in a moment - case JobStatus.RECONCILING | JobStatus.SUSPENDED => DeploymentStatus.Running - case JobStatus.FAILING | JobStatus.FAILED => - DeploymentStatus.Problem.Failed // redeploy allowed, handle with restartStrategy - } - } - - private def toJobStatus(state: String): JobStatus = { - import org.apache.flink.api.common.JobStatus - JobStatus.valueOf(state) - } - - protected def ensureTasksRunning(jobStatusCount: BaseJobStatusCounts): Boolean = { - // We sum running and finished tasks because for batch jobs some tasks can be already finished but the others are still running. - // We don't handle correctly case when job creates some tasks lazily e.g. in batch case. Without knowledge about what - // kind of job is deployed, we don't know if it is such case or it is just a streaming job which is not fully running yet - jobStatusCount.running + jobStatusCount.finished == jobStatusCount.total - } - - override protected def waitForDuringDeployFinished( - processName: ProcessName, - deploymentId: ExternalDeploymentId - ): Future[Unit] = { - config.waitForDuringDeployFinish.toEnabledConfig - .map { config => - retry - .Pause(config.maxChecks, config.delay) - .apply { - implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(processName).map { statuses => - statuses.value - .find(details => - details.externalDeploymentId - .contains(deploymentId) && details.status == SimpleStateStatus.DuringDeploy - ) - .map(Left(_)) - .getOrElse(Right(())) - } - } - .map( - _.getOrElse( - throw new IllegalStateException( - "Deploy execution finished, but job is still in during deploy state on Flink" - ) - ) - ) - } - .getOrElse(Future.successful(())) - } - - private def withParsedJobConfig(jobId: String, name: ProcessName): Future[Option[ParsedJobConfig]] = { - client.getJobConfig(jobId).map { executionConfig => - val userConfig = executionConfig.`user-config` - for { - version <- userConfig.get("versionId").flatMap(_.asString).map(_.toLong).map(VersionId(_)) - user <- userConfig.get("user").map(_.asString.getOrElse("")) - modelVersion = userConfig.get("modelVersion").flatMap(_.asString).map(_.toInt) - processId = ProcessId(userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L)) - labels = userConfig.get("labels").flatMap(_.asArray).map(_.toList.flatMap(_.asString)).toList.flatten - deploymentId = userConfig.get("deploymentId").flatMap(_.asString).map(DeploymentId(_)) - } yield { - val versionDetails = ProcessVersion(version, name, processId, labels, user, modelVersion) - ParsedJobConfig(versionDetails, deploymentId) - } - } - } - - override protected def cancelScenario(command: DMCancelScenarioCommand): Future[Unit] = { - import command._ - implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(scenarioName).map(_.value).flatMap { statuses => - cancelEachMatchingJob(scenarioName, None, statuses) - } - } - - override protected def cancelDeployment(command: DMCancelDeploymentCommand): Future[Unit] = { - import command._ - implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(scenarioName).map(_.value).flatMap { statuses => - cancelEachMatchingJob(scenarioName, Some(deploymentId), statuses.filter(_.deploymentId.contains(deploymentId))) - } - } - - private def cancelEachMatchingJob( - processName: ProcessName, - deploymentId: Option[DeploymentId], - statuses: List[StatusDetails] - ) = { - statuses.filterNot(details => SimpleStateStatus.isFinalOrTransitioningToFinalStatus(details.status)) match { - case Nil => - logger.warn( - s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not active on Flink." - ) - Future.successful(()) - case single :: Nil => cancelFlinkJob(single) - case moreThanOne @ (_ :: _ :: _) => - logger.warn( - s"Found duplicate jobs of $processName${deploymentId.map(" with id: " + _).getOrElse("")}: $moreThanOne. Cancelling all in non terminal state." - ) - Future.sequence(moreThanOne.map(cancelFlinkJob)).map(_ => ()) - } - } - - private def cancelFlinkJob(details: StatusDetails): Future[Unit] = { - cancelFlinkJob( - details.externalDeploymentId.getOrElse( - throw new IllegalStateException( - "Error during cancelling scenario: returned status details has no external deployment id" - ) - ) - ) - } - - override protected def cancelFlinkJob(deploymentId: ExternalDeploymentId): Future[Unit] = { - client.cancel(deploymentId) - } - - override protected def makeSavepoint( - deploymentId: ExternalDeploymentId, - savepointDir: Option[String] - ): Future[SavepointResult] = { - client.makeSavepoint(deploymentId, savepointDir) - } - - override protected def stop( - deploymentId: ExternalDeploymentId, - savepointDir: Option[String] - ): Future[SavepointResult] = { - client.stop(deploymentId, savepointDir) - } - - override protected def runProgram( - processName: ProcessName, - mainClass: String, - args: List[String], - savepointPath: Option[String], - deploymentId: Option[newdeployment.DeploymentId] - ): Future[Option[ExternalDeploymentId]] = { - logger.debug(s"Starting to deploy scenario: $processName with savepoint $savepointPath") - client.runProgram( - modelJarProvider.getJobJar(), - mainClass, - args, - savepointPath, - deploymentId.map(toJobId) - ) - } - - private def toJobId(did: newdeployment.DeploymentId) = { - new JobID(did.value.getLeastSignificantBits, did.value.getMostSignificantBits).toHexString - } - - override protected def checkRequiredSlotsExceedAvailableSlots( - canonicalProcess: CanonicalProcess, - currentlyDeployedJobsIds: List[ExternalDeploymentId] - ): Future[Unit] = { - if (config.shouldCheckAvailableSlots) { - slotsChecker.checkRequiredSlotsExceedAvailableSlots(canonicalProcess, currentlyDeployedJobsIds) - } else - Future.successful(()) - } - -} - -object FlinkRestManager { - - // TODO: deploymentId is optional to handle situation when on Flink there is old version of runtime and in designer is the new one. - // After fully deploy of new version it should be mandatory - private case class ParsedJobConfig(version: ProcessVersion, deploymentId: Option[DeploymentId]) - -} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala index e46f0ba2c74..b51bdcc084c 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala @@ -105,7 +105,7 @@ class FlinkScheduledExecutionPerformer( ) flinkClient.runProgram( jarFile, - FlinkStreamingRestManager.MainClassName, + FlinkDeploymentManager.MainClassName, args, None, deploymentData.deploymentId.toNewDeploymentIdOpt.map(toJobId) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStatusDetailsDeterminer.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStatusDetailsDeterminer.scala new file mode 100644 index 00000000000..f23268b45cc --- /dev/null +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStatusDetailsDeterminer.scala @@ -0,0 +1,107 @@ +package pl.touk.nussknacker.engine.management + +import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.api.common.JobStatus +import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.deployment.{DeploymentStatus, StatusDetails} +import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} +import pl.touk.nussknacker.engine.management.FlinkStatusDetailsDeterminer.{ParsedJobConfig, toDeploymentStatus} +import pl.touk.nussknacker.engine.management.rest.flinkRestModel +import pl.touk.nussknacker.engine.management.rest.flinkRestModel.{BaseJobStatusCounts, JobOverview} + +import scala.concurrent.{ExecutionContext, Future} + +class FlinkStatusDetailsDeterminer( + namingStrategy: NamingStrategy, + getJobConfig: String => Future[flinkRestModel.ExecutionConfig] +)(implicit ec: ExecutionContext) + extends LazyLogging { + + def statusDetailsFromJobOverviews(jobOverviews: List[JobOverview]): Future[Map[ProcessName, List[StatusDetails]]] = + Future + .sequence { + jobOverviews + .groupBy(_.name) + .flatMap { case (name, jobs) => + namingStrategy.decodeName(name).map(decoded => (ProcessName(decoded), jobs)) + } + .map { case (name, jobs) => + val statusDetails = jobs.map { job => + withParsedJobConfig(job.jid, name).map { jobConfig => + // TODO: return error when there's no correct version in process + // currently we're rather lax on this, so that this change is backward-compatible + // we log debug here for now, since it's invoked v. often + if (jobConfig.isEmpty) { + logger.debug(s"No correct job details in deployed scenario: ${job.name}") + } + StatusDetails( + SimpleStateStatus.fromDeploymentStatus(toDeploymentStatus(JobStatus.valueOf(job.state), job.tasks)), + jobConfig.flatMap(_.deploymentId), + Some(ExternalDeploymentId(job.jid)), + version = jobConfig.map(_.version), + startTime = Some(job.`start-time`), + attributes = Option.empty, + errors = List.empty + ) + } + } + Future.sequence(statusDetails).map((name, _)) + } + + } + .map(_.toMap) + + private def withParsedJobConfig(jobId: String, name: ProcessName): Future[Option[ParsedJobConfig]] = { + getJobConfig(jobId).map { executionConfig => + val userConfig = executionConfig.`user-config` + for { + version <- userConfig.get("versionId").flatMap(_.asString).map(_.toLong).map(VersionId(_)) + user <- userConfig.get("user").map(_.asString.getOrElse("")) + modelVersion = userConfig.get("modelVersion").flatMap(_.asString).map(_.toInt) + processId = ProcessId(userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L)) + labels = userConfig.get("labels").flatMap(_.asArray).map(_.toList.flatMap(_.asString)).toList.flatten + deploymentId = userConfig.get("deploymentId").flatMap(_.asString).map(DeploymentId(_)) + } yield { + val versionDetails = ProcessVersion(version, name, processId, labels, user, modelVersion) + ParsedJobConfig(versionDetails, deploymentId) + } + } + } + +} + +object FlinkStatusDetailsDeterminer { + + // TODO: deploymentId is optional to handle situation when on Flink there is old version of runtime and in designer is the new one. + // After fully deploy of new version it should be mandatory + private case class ParsedJobConfig(version: ProcessVersion, deploymentId: Option[DeploymentId]) + + private[management] def toDeploymentStatus( + jobStatus: JobStatus, + jobStatusCounts: BaseJobStatusCounts + ): DeploymentStatus = { + jobStatus match { + case JobStatus.RUNNING if ensureTasksRunning(jobStatusCounts) => DeploymentStatus.Running + case JobStatus.RUNNING | JobStatus.INITIALIZING | JobStatus.CREATED => DeploymentStatus.DuringDeploy + case JobStatus.FINISHED => DeploymentStatus.Finished + case JobStatus.RESTARTING => DeploymentStatus.Restarting + case JobStatus.CANCELED => DeploymentStatus.Canceled + case JobStatus.CANCELLING => DeploymentStatus.DuringCancel + // The job is not technically running, but should be in a moment + case JobStatus.RECONCILING | JobStatus.SUSPENDED => DeploymentStatus.Running + case JobStatus.FAILING | JobStatus.FAILED => + DeploymentStatus.Problem.Failed // redeploy allowed, handle with restartStrategy + } + } + + private def ensureTasksRunning(jobStatusCount: BaseJobStatusCounts): Boolean = { + // We sum running and finished tasks because for batch jobs some tasks can be already finished but the others are still running. + // We don't handle correctly case when job creates some tasks lazily e.g. in batch case. Without knowledge about what + // kind of job is deployed, we don't know if it is such case or it is just a streaming job which is not fully running yet + jobStatusCount.running + jobStatusCount.finished == jobStatusCount.total + } + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingRestManager.scala deleted file mode 100644 index b53d0d44ff7..00000000000 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingRestManager.scala +++ /dev/null @@ -1,16 +0,0 @@ -package pl.touk.nussknacker.engine.management - -import pl.touk.nussknacker.engine.management.FlinkStreamingRestManager.MainClassName -import pl.touk.nussknacker.engine.management.rest.FlinkClient -import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies} - -class FlinkStreamingRestManager( - client: FlinkClient, - config: FlinkConfig, - modelData: BaseModelData, - dependencies: DeploymentManagerDependencies -) extends FlinkRestManager(client, config, modelData, dependencies, mainClassName = MainClassName) - -object FlinkStreamingRestManager { - val MainClassName = "pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain" -} diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala similarity index 98% rename from engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala rename to engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala index 6d1bdbf3525..1034125e999 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala @@ -40,7 +40,7 @@ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} //TODO move some tests to FlinkHttpClientTest -class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFutures { +class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientScalaFutures { private implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh @@ -78,7 +78,7 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu statusCode: StatusCode = StatusCode.Ok, exceptionOnDeploy: Option[Exception] = None, freeSlots: Int = 1 - ): FlinkRestManager = + ): DeploymentManager = createManagerWithHistory( statuses, acceptSavepoint, @@ -101,7 +101,7 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu statusCode: StatusCode = StatusCode.Ok, exceptionOnDeploy: Option[Exception] = None, freeSlots: Int = 1 - ): (FlinkRestManager, mutable.Buffer[HistoryEntry]) = { + ): (DeploymentManager, mutable.Buffer[HistoryEntry]) = { import scala.jdk.CollectionConverters._ val history: mutable.Buffer[HistoryEntry] = Collections.synchronizedList(new java.util.ArrayList[HistoryEntry]()).asScala @@ -546,7 +546,7 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu private def createDeploymentManager( config: FlinkConfig = defaultConfig, sttpBackend: SttpBackend[Future, Any] = AsyncHttpClientFutureBackend() - ): FlinkRestManager = { + ): DeploymentManager = { val deploymentManagerDependencies = DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(List.empty), new ProcessingTypeActionServiceStub, @@ -556,12 +556,11 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu ActorSystem(getClass.getSimpleName), sttpBackend ) - new FlinkRestManager( - client = HttpFlinkClient.createUnsafe(config)(sttpBackend, ExecutionContext.global), - config = config, - modelData = LocalModelData(ConfigFactory.empty, List.empty), + new FlinkDeploymentManager( + LocalModelData(ConfigFactory.empty, List.empty), deploymentManagerDependencies, - mainClassName = "UNUSED" + config, + HttpFlinkClient.createUnsafe(config)(sttpBackend, ExecutionContext.global), ) } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala index ddb425e15dc..d9bbec4b1b8 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala @@ -91,4 +91,7 @@ case class StatusDetails( startTime: Option[Long] = None, attributes: Option[Json] = None, errors: List[String] = List.empty -) +) { + def externalDeploymentIdUnsafe: ExternalDeploymentId = + externalDeploymentId.getOrElse(throw new IllegalStateException(s"externalDeploymentId is missing")) +}