Skip to content

Commit

Permalink
chore: Version CRD and NifiClusterConfig (#752)
Browse files Browse the repository at this point in the history
* chore: Remove separate CRD crate

* chore: Remove unused constant

* chore: Version NifiCluster

* docs: Fix invalid rustdoc reference

* chore: Version NifiClusterConfig

* chore: Move NifiCluster impl blocks
  • Loading branch information
Techassi authored Feb 12, 2025
1 parent ebbea85 commit 43281b2
Show file tree
Hide file tree
Showing 24 changed files with 1,730 additions and 571 deletions.
314 changes: 272 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

1,340 changes: 1,137 additions & 203 deletions Cargo.nix

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["rust/crd", "rust/operator-binary"]
members = ["rust/operator-binary"]
resolver = "2"

[workspace.package]
Expand All @@ -10,6 +10,10 @@ edition = "2021"
repository = "https://github.com/stackabletech/nifi-operator"

[workspace.dependencies]
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }

anyhow = "1.0"
built = { version = "0.7", features = ["chrono", "git2"] }
clap = "4.5"
Expand All @@ -18,15 +22,13 @@ fnv = "1.0"
futures = { version = "0.3", features = ["compat"] }
indoc = "2.0"
pin-project = "1.1"
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
rand = "0.8"
rstest = "0.24"
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
snafu = "0.8"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1.40", features = ["full"] }
tracing = "0.1"
Expand Down
3 changes: 3 additions & 0 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
name: v1alpha1
schema:
openAPIV3Schema:
description: Auto-generated derived type for NifiSpec via `CustomResource`
description: Auto-generated derived type for NifiClusterSpec via `CustomResource`
properties:
spec:
description: A NiFi cluster stacklet. This resource is managed by the Stackable operator for Apache NiFi. Find more information on how to use it and the resources that the operator generates in the [operator documentation](https://docs.stackable.tech/home/nightly/nifi/).
Expand Down
22 changes: 0 additions & 22 deletions rust/crd/Cargo.toml

This file was deleted.

6 changes: 3 additions & 3 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ repository.workspace = true
publish = false

[dependencies]
stackable-nifi-crd = { path = "../crd" }
stackable-versioned.workspace = true
stackable-operator.workspace = true
product-config.workspace = true

anyhow.workspace = true
clap.workspace = true
Expand All @@ -23,8 +25,6 @@ semver.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
stackable-operator.workspace = true
product-config.workspace = true
strum.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions rust/operator-binary/src/config/jvm.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_nifi_crd::{NifiConfig, NifiConfigFragment};
use stackable_operator::{
memory::{BinaryMultiple, MemoryQuantity},
role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role},
};

use super::{JVM_SECURITY_PROPERTIES_FILE, NIFI_CONFIG_DIRECTORY};
use crate::{
config::{JVM_SECURITY_PROPERTIES_FILE, NIFI_CONFIG_DIRECTORY},
crd::{NifiConfig, NifiConfigFragment},
};

// Part of memory resources allocated for Java heap
const JAVA_HEAP_FACTOR: f32 = 0.8;
Expand Down
18 changes: 9 additions & 9 deletions rust/operator-binary/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ use std::{
use jvm::build_merged_jvm_config;
use product_config::{types::PropertyNameKind, ProductConfigManager};
use snafu::{ResultExt, Snafu};
use stackable_nifi_crd::{
NifiCluster, NifiConfig, NifiConfigFragment, NifiRole, NifiSpec, NifiStorageConfig, HTTPS_PORT,
PROTOCOL_PORT,
};
use stackable_operator::{
commons::resources::Resources,
memory::MemoryQuantity,
Expand All @@ -22,6 +18,10 @@ use stackable_operator::{
use strum::{Display, EnumIter};

use crate::{
crd::{
v1alpha1, NifiConfig, NifiConfigFragment, NifiRole, NifiStorageConfig, HTTPS_PORT,
PROTOCOL_PORT,
},
operations::graceful_shutdown::graceful_shutdown_config_properties,
security::{
authentication::{
Expand Down Expand Up @@ -135,7 +135,7 @@ pub fn build_bootstrap_conf(

/// Create the NiFi nifi.properties
pub fn build_nifi_properties(
spec: &NifiSpec,
spec: &v1alpha1::NifiClusterSpec,
resource_config: &Resources<NifiStorageConfig>,
proxy_hosts: &str,
auth_config: &NifiAuthenticationConfig,
Expand Down Expand Up @@ -616,7 +616,7 @@ pub fn build_state_management_xml() -> String {
/// * `product_config` - The product config to validate and complement the user config.
///
pub fn validated_product_config(
resource: &NifiCluster,
resource: &v1alpha1::NifiCluster,
version: &str,
role: &Role<NifiConfigFragment, GenericRoleConfig, JavaCommonConfig>,
product_config: &ProductConfigManager,
Expand Down Expand Up @@ -667,10 +667,9 @@ fn storage_quantity_to_nifi(quantity: MemoryQuantity) -> String {
#[cfg(test)]
mod tests {
use indoc::indoc;
use stackable_nifi_crd::NifiCluster;

use super::*;
use crate::config::build_bootstrap_conf;
use crate::{config::build_bootstrap_conf, crd::v1alpha1};

#[test]
fn test_build_bootstrap_conf_defaults() {
Expand Down Expand Up @@ -792,7 +791,8 @@ mod tests {
}

fn construct_bootstrap_conf(nifi_cluster: &str) -> String {
let nifi: NifiCluster = serde_yaml::from_str(nifi_cluster).expect("illegal test input");
let nifi: v1alpha1::NifiCluster =
serde_yaml::from_str(nifi_cluster).expect("illegal test input");

let nifi_role = NifiRole::Node;
let role = nifi.spec.nodes.as_ref().unwrap();
Expand Down
58 changes: 30 additions & 28 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Ensures that `Pod`s are configured and running for each [`NifiCluster`]
//! Ensures that `Pod`s are configured and running for each [`v1alpha1::NifiCluster`].
use std::{
borrow::Cow,
collections::{BTreeMap, HashMap, HashSet},
Expand All @@ -13,12 +14,6 @@ use product_config::{
ProductConfigManager,
};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_nifi_crd::{
authentication::AuthenticationClassResolved, Container, CurrentlySupportedListenerClasses,
NifiCluster, NifiConfig, NifiConfigFragment, NifiRole, NifiStatus, APP_NAME, BALANCE_PORT,
BALANCE_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, PROTOCOL_PORT,
PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
};
use stackable_operator::{
builder::{
self,
Expand Down Expand Up @@ -84,6 +79,13 @@ use crate::{
validated_product_config, NifiRepository, JVM_SECURITY_PROPERTIES_FILE,
NIFI_BOOTSTRAP_CONF, NIFI_CONFIG_DIRECTORY, NIFI_PROPERTIES, NIFI_STATE_MANAGEMENT_XML,
},
crd::{
authentication::AuthenticationClassResolved, v1alpha1, Container,
CurrentlySupportedListenerClasses, NifiConfig, NifiConfigFragment, NifiRole, NifiStatus,
APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT,
METRICS_PORT_NAME, PROTOCOL_PORT, PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR,
STACKABLE_LOG_DIR,
},
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address},
reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name},
Expand Down Expand Up @@ -159,13 +161,13 @@ pub enum Error {
#[snafu(display("failed to apply Service for {}", rolegroup))]
ApplyRoleGroupService {
source: stackable_operator::cluster_resources::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to build ConfigMap for {}", rolegroup))]
BuildRoleGroupConfig {
source: stackable_operator::builder::configmap::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("object has no nodes defined"))]
Expand All @@ -174,13 +176,13 @@ pub enum Error {
#[snafu(display("failed to apply ConfigMap for {}", rolegroup))]
ApplyRoleGroupConfig {
source: stackable_operator::cluster_resources::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to apply StatefulSet for {}", rolegroup))]
ApplyRoleGroupStatefulSet {
source: stackable_operator::cluster_resources::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to apply create ReportingTask service"))]
Expand Down Expand Up @@ -210,7 +212,7 @@ pub enum Error {
#[snafu(display("Failed to find any nodes in cluster {obj_ref}",))]
MissingNodes {
source: stackable_operator::client::Error,
obj_ref: ObjectRef<NifiCluster>,
obj_ref: ObjectRef<v1alpha1::NifiCluster>,
},

#[snafu(display("Failed to find service {obj_ref}"))]
Expand All @@ -235,7 +237,7 @@ pub enum Error {
BuildProductConfig {
#[snafu(source(from(config::Error, Box::new)))]
source: Box<config::Error>,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("illegal container name: [{container_name}]"))]
Expand All @@ -247,11 +249,11 @@ pub enum Error {
#[snafu(display("failed to validate resources for {rolegroup}"))]
ResourceValidation {
source: fragment::ValidationError,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to resolve and merge config for role and role group"))]
FailedToResolveConfig { source: stackable_nifi_crd::Error },
FailedToResolveConfig { source: crate::crd::Error },

#[snafu(display("failed to resolve the Vector aggregator address"))]
ResolveVectorAggregatorAddress {
Expand Down Expand Up @@ -295,7 +297,7 @@ pub enum Error {

#[snafu(display("Failed to resolve NiFi Authentication Configuration"))]
FailedResolveNifiAuthenticationConfig {
source: stackable_nifi_crd::authentication::Error,
source: crate::crd::authentication::Error,
},

#[snafu(display("failed to create PodDisruptionBudget"))]
Expand Down Expand Up @@ -365,7 +367,7 @@ pub enum VersionChangeState {
}

pub async fn reconcile_nifi(
nifi: Arc<DeserializeGuard<NifiCluster>>,
nifi: Arc<DeserializeGuard<v1alpha1::NifiCluster>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile");
Expand Down Expand Up @@ -686,7 +688,7 @@ pub async fn reconcile_nifi(
/// The node-role service is the primary endpoint that should be used by clients that do not
/// perform internal load balancing including targets outside of the cluster.
pub fn build_node_role_service(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
) -> Result<Service> {
let role_name = NifiRole::Node.to_string();
Expand Down Expand Up @@ -732,11 +734,11 @@ pub fn build_node_role_service(
/// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator
#[allow(clippy::too_many_arguments)]
async fn build_node_rolegroup_config_map(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
nifi_auth_config: &NifiAuthenticationConfig,
role: &Role<NifiConfigFragment, GenericRoleConfig, JavaCommonConfig>,
rolegroup: &RoleGroupRef<NifiCluster>,
rolegroup: &RoleGroupRef<v1alpha1::NifiCluster>,
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &NifiConfig,
vector_aggregator_address: Option<&str>,
Expand Down Expand Up @@ -846,9 +848,9 @@ async fn build_node_rolegroup_config_map(
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_node_rolegroup_service(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<NifiCluster>,
rolegroup: &RoleGroupRef<v1alpha1::NifiCluster>,
) -> Result<Service> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
Expand Down Expand Up @@ -903,10 +905,10 @@ const USERDATA_MOUNTPOINT: &str = "/stackable/userdata";
/// corresponding [`Service`] (from [`build_node_rolegroup_service`]).
#[allow(clippy::too_many_arguments)]
async fn build_node_rolegroup_statefulset(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
cluster_info: &KubernetesClusterInfo,
rolegroup_ref: &RoleGroupRef<NifiCluster>,
rolegroup_ref: &RoleGroupRef<v1alpha1::NifiCluster>,
role: &Role<NifiConfigFragment, GenericRoleConfig, JavaCommonConfig>,
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &NifiConfig,
Expand Down Expand Up @@ -1476,7 +1478,7 @@ fn zookeeper_env_var(name: &str, configmap_name: &str) -> EnvVar {

async fn get_proxy_hosts(
client: &Client,
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
nifi_service: &Service,
) -> Result<String> {
let host_header_check = nifi.spec.cluster_config.host_header_check.clone();
Expand Down Expand Up @@ -1541,7 +1543,7 @@ async fn get_proxy_hosts(
}

pub fn error_policy(
_obj: Arc<DeserializeGuard<NifiCluster>>,
_obj: Arc<DeserializeGuard<v1alpha1::NifiCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
Expand All @@ -1554,11 +1556,11 @@ pub fn error_policy(
}

pub fn build_recommended_labels<'a>(
owner: &'a NifiCluster,
owner: &'a v1alpha1::NifiCluster,
app_version: &'a str,
role: &'a str,
role_group: &'a str,
) -> ObjectLabels<'a, NifiCluster> {
) -> ObjectLabels<'a, v1alpha1::NifiCluster> {
ObjectLabels {
owner,
app_name: APP_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use stackable_operator::{
k8s_openapi::api::core::v1::PodAntiAffinity,
};

use crate::{NifiRole, APP_NAME};
use crate::crd::{NifiRole, APP_NAME};

pub fn get_affinity(cluster_name: &str, role: &NifiRole) -> StackableAffinityFragment {
StackableAffinityFragment {
Expand Down Expand Up @@ -32,7 +32,7 @@ mod tests {
};

use super::*;
use crate::NifiCluster;
use crate::crd::v1alpha1;

#[test]
fn test_affinity_defaults() {
Expand All @@ -57,7 +57,7 @@ mod tests {
replicas: 1
"#;
let deserializer = serde_yaml::Deserializer::from_str(input);
let nifi: NifiCluster =
let nifi: v1alpha1::NifiCluster =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
let merged_config = nifi.merged_config(&NifiRole::Node, "default").unwrap();

Expand Down
Loading

0 comments on commit 43281b2

Please sign in to comment.