Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1979] DeploymentManager not reloaded during processingtype reload #7588

Draft
wants to merge 28 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f68b964
[NU-1979] Snenario status resolving moved from DM to core
arkadius Feb 14, 2025
dea343a
test fix
arkadius Feb 14, 2025
064fa83
separate presentation from business logic
arkadius Feb 17, 2025
a6fc258
separate presentation from business logic - part 2
arkadius Feb 17, 2025
62aa694
build fix + removed redundant fields
arkadius Feb 17, 2025
79834de
removed even more fields, added ScenarioStatusDto.statusName
arkadius Feb 17, 2025
ce37f72
InconsistentStateDetector moved to core
arkadius Feb 18, 2025
391d820
package moved
arkadius Feb 18, 2025
620da81
deployment statuses fetching encapsulated
arkadius Feb 18, 2025
db151cb
resolving simpler
arkadius Feb 18, 2025
4d49ac4
DeploymentsStatusProvider
arkadius Feb 18, 2025
acfa370
renames
arkadius Feb 18, 2025
5a90c77
Removed errors field
arkadius Feb 18, 2025
acf8e43
StatusDetails to DeploymentStatusDetails rename
arkadius Feb 18, 2025
aab7bb3
DeploymentStatusDetails: removed externalDeploymentId and startTime
arkadius Feb 18, 2025
2d7233f
DeploymentStatusDetails: ProcessVersion replaced by VersionId
arkadius Feb 19, 2025
ac015aa
Setting jobID also for mini cluster
arkadius Feb 19, 2025
80efd7b
TestDeploymentServiceFactory, removed ActionService -> DeploymentMana…
arkadius Feb 19, 2025
401dec8
PeriodicDeploymentManagerTest uses DeploymentService - skip Inconsist…
arkadius Feb 19, 2025
10dc416
Removed unused miniCluster.streamExecutionEnvConfig configuration option
arkadius Feb 19, 2025
b7c3ec3
BulkQueriedDeploymentStatuses + TODOses
arkadius Feb 20, 2025
95a3da2
Rollback to actions use only during scenario status resolving
arkadius Feb 21, 2025
c7a228d
reconciler
arkadius Feb 21, 2025
bedb14f
Merge remote-tracking branch 'origin/staging' into scenario-status-re…
arkadius Feb 21, 2025
144fbf6
typo fix
arkadius Feb 21, 2025
55ce30d
[NU-1979] ProcessingTypeDataProvider simpler
arkadius Feb 24, 2025
4b24a51
[NU-1979] ProcessingTypeDataProvider: tests for reload
arkadius Feb 24, 2025
c49867a
[NU-1979] ProcessingTypeDataProvider: reload using Ref and Mutex
arkadius Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions designer/client/src/components/Process/ProcessErrors.tsx

This file was deleted.

2 changes: 0 additions & 2 deletions designer/client/src/components/Process/ProcessStateIcon.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { ProcessStateType, Scenario } from "./types";
import ProcessStateUtils from "./ProcessStateUtils";
import UrlIcon from "../UrlIcon";
import { Box, Divider, Popover, styled, Typography } from "@mui/material";
import { Errors } from "./ProcessErrors";

const StyledUrlIcon = styled(UrlIcon)(({ theme }) => ({
width: theme.spacing(2.5),
Expand Down Expand Up @@ -44,7 +43,6 @@ function ProcessStateIcon({ scenario, processState }: Props) {
<Typography variant="body2" style={{ whiteSpace: "pre-wrap" }}>
{tooltip}
</Typography>
<Errors state={processState} />
</Box>
</Popover>
</>
Expand Down
7 changes: 1 addition & 6 deletions designer/client/src/components/Process/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable i18next/no-literal-string */
import { UnknownRecord, Instant } from "../../types/common";
import { Instant } from "../../types/common";
import { ScenarioGraph, ValidationResult } from "../../types";
import { ProcessingMode } from "../../http/HttpService";

Expand Down Expand Up @@ -48,17 +48,12 @@ export type ProcessName = Scenario["name"];

export type ProcessStateType = {
status: StatusType;
externalDeploymentId?: string;
visibleActions: Array<ActionName>;
allowedActions: Array<ActionName>;
actionTooltips: Record<ActionName, string>;
icon: string;
tooltip: string;
description: string;
startTime?: Date;
attributes?: UnknownRecord;
errors?: Array<string>;
version?: number | null;
};

export type StatusType = {
Expand Down
5 changes: 0 additions & 5 deletions designer/client/src/reducers/graph/utils.fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,15 @@ export const state: GraphState = {
],
},
state: {
externalDeploymentId: null,
status: {
name: "NOT_DEPLOYED",
},
version: null,
visibleActions: ["DEPLOY", "ARCHIVE", "RENAME"],
allowedActions: ["DEPLOY", "ARCHIVE", "RENAME"],
actionTooltips: {},
icon: "/assets/states/not-deployed.svg",
tooltip: "The scenario is not deployed.",
description: "The scenario is not deployed.",
startTime: null,
attributes: null,
errors: [],
},
validationResult: {
errors: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,101 +1,31 @@
package pl.touk.nussknacker.engine.api.deployment

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future

trait DeploymentManagerInconsistentStateHandlerMixIn {
self: DeploymentManager =>

final override def resolve(
idWithName: ProcessIdWithName,
statusDetails: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState] = {
val engineStateResolvedWithLastAction = flattenStatus(lastStateAction, statusDetails)
Future.successful(
processStateDefinitionManager.processState(
engineStateResolvedWithLastAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId
)
)
}

// This method is protected to make possible to override it with own logic handling different edge cases like
// other state on engine than based on lastStateAction
protected def flattenStatus(
lastStateAction: Option[ProcessAction],
statusDetails: List[StatusDetails]
): StatusDetails = {
InconsistentStateDetector.resolve(statusDetails, lastStateAction)
}

}

trait DeploymentManager extends AutoCloseable {

def deploymentSynchronisationSupport: DeploymentSynchronisationSupport

def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport
def deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport

def schedulingSupport: SchedulingSupport

def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result]

final def getProcessState(
idWithName: ProcessIdWithName,
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
)(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[ProcessState]] = {
for {
statusDetailsWithFreshness <- getProcessStates(idWithName.name)
stateWithFreshness <- resolve(
idWithName,
statusDetailsWithFreshness.value,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
).map(statusDetailsWithFreshness.withValue)
} yield stateWithFreshness
}

/**
* We provide a special wrapper called WithDataFreshnessStatus to ensure that fetched data is restored
* from the cache or not. If you use any kind of cache in your DM implementation please wrap result data
* with WithDataFreshnessStatus.cached(data) in opposite situation use WithDataFreshnessStatus.fresh(data)
*/
def getProcessStates(name: ProcessName)(
def getScenarioDeploymentsStatuses(scenarioName: ProcessName)(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[List[StatusDetails]]]

/**
* Resolves possible inconsistency with lastAction and formats status using `ProcessStateDefinitionManager`
*/
def resolve(
idWithName: ProcessIdWithName,
statusDetails: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState]
): Future[WithDataFreshnessStatus[List[DeploymentStatusDetails]]]

def processStateDefinitionManager: ProcessStateDefinitionManager

Expand All @@ -113,17 +43,17 @@ trait ManagerSpecificScenarioActivitiesStoredByManager { self: DeploymentManager

}

sealed trait StateQueryForAllScenariosSupport
sealed trait DeploymentsStatusesQueryForAllScenariosSupport

trait StateQueryForAllScenariosSupported extends StateQueryForAllScenariosSupport {
trait DeploymentsStatusesQueryForAllScenariosSupported extends DeploymentsStatusesQueryForAllScenariosSupport {

def getAllProcessesStates()(
def getAllScenariosDeploymentsStatuses()(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]]
): Future[WithDataFreshnessStatus[Map[ProcessName, List[DeploymentStatusDetails]]]]

}

case object NoStateQueryForAllScenariosSupport extends StateQueryForAllScenariosSupport
case object NoDeploymentsStatusesQueryForAllScenariosSupport extends DeploymentsStatusesQueryForAllScenariosSupport

sealed trait DeploymentSynchronisationSupport

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.touk.nussknacker.engine.api.deployment

import pl.touk.nussknacker.engine.api.process.VersionId
import pl.touk.nussknacker.engine.deployment.DeploymentId

// TODO replace by simple tuple DeploymentId -> DeploymentStatus after fixing TODOs
case class DeploymentStatusDetails(
status: StateStatus,
// deploymentId is optional because some deployment managers (k8s) don't support it
deploymentId: Option[DeploymentId],
// version might be unavailable in some failing cases. It is used during checking if deployed version is the same
// as expected by user - see InconsistentStateDetector.
// TODO it should be an attribute of "following deploy" StateStatuses: DuringDeploy, Running and Finished
version: Option[VersionId],
) {

def deploymentIdUnsafe: DeploymentId =
deploymentId.getOrElse(throw new IllegalStateException(s"deploymentId is missing"))

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.touk.nussknacker.engine.api.deployment

import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus
import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext
import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName

import java.net.URI
Expand All @@ -21,32 +21,42 @@ import java.net.URI
*/
class OverridingProcessStateDefinitionManager(
delegate: ProcessStateDefinitionManager,
statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = PartialFunction.empty,
statusActionsPF: PartialFunction[ScenarioStatusWithScenarioContext, Set[ScenarioActionName]] =
PartialFunction.empty,
statusIconsPF: PartialFunction[StateStatus, URI] = PartialFunction.empty,
statusTooltipsPF: PartialFunction[StateStatus, String] = PartialFunction.empty,
statusDescriptionsPF: PartialFunction[StateStatus, String] = PartialFunction.empty,
customStateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty,
customVisibleActions: Option[List[ScenarioActionName]] = None,
customActionTooltips: Option[ProcessStatus => Map[ScenarioActionName, String]] = None,
customActionTooltips: Option[ScenarioStatusWithScenarioContext => Map[ScenarioActionName, String]] = None,
) extends ProcessStateDefinitionManager {

override def visibleActions: List[ScenarioActionName] =
customVisibleActions.getOrElse(delegate.visibleActions)
override def visibleActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] =
customVisibleActions.getOrElse(delegate.visibleActions(input))

override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] =
statusActionsPF.applyOrElse(processStatus, delegate.statusActions)
override def statusActions(input: ScenarioStatusWithScenarioContext): Set[ScenarioActionName] =
statusActionsPF.applyOrElse(input, delegate.statusActions)

override def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, String] =
customActionTooltips.map(_(processStatus)).getOrElse(delegate.actionTooltips(processStatus))
override def actionTooltips(input: ScenarioStatusWithScenarioContext): Map[ScenarioActionName, String] =
customActionTooltips.map(_(input)).getOrElse(delegate.actionTooltips(input))

override def statusIcon(stateStatus: StateStatus): URI =
statusIconsPF.orElse(stateDefinitionsPF(_.icon)).applyOrElse(stateStatus, delegate.statusIcon)
override def statusIcon(input: ScenarioStatusWithScenarioContext): URI =
statusIconsPF
.orElse(stateDefinitionsPF(_.icon))
.lift(input.scenarioStatus)
.getOrElse(delegate.statusIcon(input))

override def statusTooltip(stateStatus: StateStatus): String =
statusTooltipsPF.orElse(stateDefinitionsPF(_.tooltip)).applyOrElse(stateStatus, delegate.statusTooltip)
override def statusTooltip(input: ScenarioStatusWithScenarioContext): String =
statusTooltipsPF
.orElse(stateDefinitionsPF(_.tooltip))
.lift(input.scenarioStatus)
.getOrElse(delegate.statusTooltip(input))

override def statusDescription(stateStatus: StateStatus): String =
statusDescriptionsPF.orElse(stateDefinitionsPF(_.description)).applyOrElse(stateStatus, delegate.statusDescription)
override def statusDescription(input: ScenarioStatusWithScenarioContext): String =
statusDescriptionsPF
.orElse(stateDefinitionsPF(_.description))
.lift(input.scenarioStatus)
.getOrElse(delegate.statusDescription(input))

override def stateDefinitions: Map[StatusName, StateDefinitionDetails] =
delegate.stateDefinitions ++ customStateDefinitions
Expand Down
Loading
Loading