Skip to content

Commit

Permalink
chore: Version KafkaCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Techassi committed Feb 7, 2025
1 parent 3a43ee6 commit 28e17da
Show file tree
Hide file tree
Showing 14 changed files with 397 additions and 131 deletions.
298 changes: 272 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ edition = "2021"
repository = "https://github.com/stackabletech/kafka-operator"

[workspace.dependencies]
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }

anyhow = "1.0"
built = { version = "0.7", features = ["chrono", "git2"] }
clap = "4.5"
const_format = "0.2"
futures = { version = "0.3" }
futures = "0.3"
indoc = "2.0"
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
rstest = "0.24"
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
snafu = "0.8"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1.40", features = ["full"] }
tracing = "0.1"
Expand Down
6 changes: 4 additions & 2 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ repository.workspace = true
publish = false

[dependencies]
stackable-versioned.workspace = true
stackable-operator.workspace = true
product-config.workspace = true

indoc.workspace = true
anyhow.workspace = true
clap.workspace = true
const_format.workspace = true
futures.workspace = true
product-config.workspace = true
serde_json.workspace = true
serde.workspace = true
snafu.workspace = true
stackable-operator.workspace = true
strum.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions rust/operator-binary/src/config/jvm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn is_heap_jvm_argument(jvm_argument: &str) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::crd::{KafkaCluster, KafkaRole};
use crate::crd::{v1alpha1, KafkaRole};

#[test]
fn test_construct_jvm_arguments_defaults() {
Expand Down Expand Up @@ -188,7 +188,8 @@ mod tests {
Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
String,
) {
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(kafka_cluster).expect("illegal test input");

let kafka_role = KafkaRole::Broker;
let rolegroup_ref = kafka.broker_rolegroup_ref("default");
Expand Down
5 changes: 3 additions & 2 deletions rust/operator-binary/src/crd/affinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod tests {
};

use super::*;
use crate::KafkaCluster;
use crate::crd::v1alpha1;

#[rstest]
#[case(KafkaRole::Broker)]
Expand All @@ -54,7 +54,8 @@ mod tests {
replicas: 1
"#;

let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
let merged_config = kafka
.merged_config(&role, &role.rolegroup_ref(&kafka, "default"))
.unwrap();
Expand Down
12 changes: 7 additions & 5 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use snafu::{OptionExt, Snafu};
use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo};
use strum::{EnumDiscriminants, EnumString};

use crate::crd::{security::KafkaTlsSecurity, KafkaCluster, STACKABLE_LISTENER_BROKER_DIR};
use crate::crd::{security::KafkaTlsSecurity, v1alpha1, STACKABLE_LISTENER_BROKER_DIR};

const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0";

Expand Down Expand Up @@ -96,7 +96,7 @@ impl Display for KafkaListener {
}

pub fn get_kafka_listener_config(
kafka: &KafkaCluster,
kafka: &v1alpha1::KafkaCluster,
kafka_security: &KafkaTlsSecurity,
object_name: &str,
cluster_info: &KubernetesClusterInfo,
Expand Down Expand Up @@ -246,7 +246,7 @@ fn node_port_cmd(directory: &str, port_name: &str) -> String {
}

pub fn pod_fqdn(
kafka: &KafkaCluster,
kafka: &v1alpha1::KafkaCluster,
object_name: &str,
cluster_info: &KubernetesClusterInfo,
) -> Result<String, KafkaListenerError> {
Expand Down Expand Up @@ -303,7 +303,8 @@ mod tests {
serverSecretClass: tls
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(kafka_cluster).expect("illegal test input");
let kafka_security = KafkaTlsSecurity::new(
ResolvedAuthenticationClasses::new(vec![AuthenticationClass {
metadata: ObjectMetaBuilder::new().name("auth-class").build(),
Expand Down Expand Up @@ -479,7 +480,8 @@ mod tests {
serverSecretClass: tls
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(kafka_cluster).expect("illegal test input");
let kafka_security = KafkaTlsSecurity::new(
ResolvedAuthenticationClasses::new(vec![AuthenticationClass {
metadata: ObjectMetaBuilder::new().name("auth-class").build(),
Expand Down
113 changes: 60 additions & 53 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use stackable_operator::{
time::Duration,
utils::cluster_info::KubernetesClusterInfo,
};
use stackable_versioned::versioned;
use strum::{Display, EnumIter, EnumString, IntoEnumIterator};

use crate::crd::{authorization::KafkaAuthorization, tls::KafkaTls};
Expand Down Expand Up @@ -76,7 +77,7 @@ pub enum Error {

#[snafu(display("failed to validate config of rolegroup {rolegroup}"))]
RoleGroupValidation {
rolegroup: RoleGroupRef<KafkaCluster>,
rolegroup: RoleGroupRef<v1alpha1::KafkaCluster>,
source: ValidationError,
},

Expand All @@ -103,39 +104,41 @@ pub enum Error {
FragmentValidationFailure { source: ValidationError },
}

/// A Kafka cluster stacklet. This resource is managed by the Stackable operator for Apache Kafka.
/// Find more information on how to use it and the resources that the operator generates in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/kafka/).
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
#[kube(
group = "kafka.stackable.tech",
version = "v1alpha1",
kind = "KafkaCluster",
plural = "kafkaclusters",
status = "KafkaClusterStatus",
shortname = "kafka",
namespaced,
crates(
kube_core = "stackable_operator::kube::core",
k8s_openapi = "stackable_operator::k8s_openapi",
schemars = "stackable_operator::schemars"
)
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaClusterSpec {
// no doc - docs in ProductImage struct.
pub image: ProductImage,

// no doc - docs in Role struct.
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,

/// Kafka settings that affect all roles and role groups.
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
pub cluster_config: KafkaClusterConfig,

// no doc - docs in ClusterOperation struct.
#[serde(default)]
pub cluster_operation: ClusterOperation,
#[versioned(version(name = "v1alpha1"))]
pub mod versioned {
/// A Kafka cluster stacklet. This resource is managed by the Stackable operator for Apache Kafka.
/// Find more information on how to use it and the resources that the operator generates in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/kafka/).
#[versioned(k8s(
group = "kafka.stackable.tech",
kind = "KafkaCluster",
plural = "kafkaclusters",
status = "KafkaClusterStatus",
shortname = "kafka",
namespaced,
crates(
kube_core = "stackable_operator::kube::core",
k8s_openapi = "stackable_operator::k8s_openapi",
schemars = "stackable_operator::schemars"
)
))]
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct KafkaClusterSpec {
// no doc - docs in ProductImage struct.
pub image: ProductImage,

// no doc - docs in Role struct.
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,

/// Kafka settings that affect all roles and role groups.
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
pub cluster_config: KafkaClusterConfig,

// no doc - docs in ClusterOperation struct.
#[serde(default)]
pub cluster_operation: ClusterOperation,
}
}

#[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)]
Expand Down Expand Up @@ -170,18 +173,15 @@ pub struct KafkaClusterConfig {
pub zookeeper_config_map_name: String,
}

impl KafkaCluster {
impl v1alpha1::KafkaCluster {
/// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this
/// to get a list of broker addresses and will use those to transmit data to the correct broker.
pub fn bootstrap_service_name(&self, rolegroup: &RoleGroupRef<Self>) -> String {
format!("{}-bootstrap", rolegroup.object_name())
}

/// Metadata about a broker rolegroup
pub fn broker_rolegroup_ref(
&self,
group_name: impl Into<String>,
) -> RoleGroupRef<KafkaCluster> {
pub fn broker_rolegroup_ref(&self, group_name: impl Into<String>) -> RoleGroupRef<Self> {
RoleGroupRef {
cluster: ObjectRef::from_obj(self),
role: KafkaRole::Broker.to_string(),
Expand All @@ -203,7 +203,7 @@ impl KafkaCluster {

pub fn rolegroup(
&self,
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
rolegroup_ref: &RoleGroupRef<Self>,
) -> Result<&RoleGroup<KafkaConfigFragment, JavaCommonConfig>, Error> {
let role_variant =
KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu {
Expand Down Expand Up @@ -324,9 +324,9 @@ impl KafkaRole {
/// Metadata about a rolegroup
pub fn rolegroup_ref(
&self,
kafka: &KafkaCluster,
kafka: &v1alpha1::KafkaCluster,
group_name: impl Into<String>,
) -> RoleGroupRef<KafkaCluster> {
) -> RoleGroupRef<v1alpha1::KafkaCluster> {
RoleGroupRef {
cluster: ObjectRef::from_obj(kafka),
role: self.to_string(),
Expand Down Expand Up @@ -474,7 +474,7 @@ impl KafkaConfig {
}

impl Configuration for KafkaConfigFragment {
type Configurable = KafkaCluster;
type Configurable = v1alpha1::KafkaCluster;

fn compute_env(
&self,
Expand Down Expand Up @@ -528,7 +528,7 @@ pub struct KafkaClusterStatus {
pub conditions: Vec<ClusterCondition>,
}

impl HasStatusCondition for KafkaCluster {
impl HasStatusCondition for v1alpha1::KafkaCluster {
fn conditions(&self) -> Vec<ClusterCondition> {
match &self.status {
Some(status) => status.conditions.clone(),
Expand All @@ -541,7 +541,7 @@ impl HasStatusCondition for KafkaCluster {
mod tests {
use super::*;

fn get_server_secret_class(kafka: &KafkaCluster) -> Option<String> {
fn get_server_secret_class(kafka: &v1alpha1::KafkaCluster) -> Option<String> {
kafka
.spec
.cluster_config
Expand All @@ -550,7 +550,7 @@ mod tests {
.and_then(|tls| tls.server_secret_class.clone())
}

fn get_internal_secret_class(kafka: &KafkaCluster) -> String {
fn get_internal_secret_class(kafka: &v1alpha1::KafkaCluster) -> String {
kafka
.spec
.cluster_config
Expand All @@ -574,7 +574,8 @@ mod tests {
clusterConfig:
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
assert_eq!(
get_internal_secret_class(&kafka),
Expand All @@ -595,7 +596,8 @@ mod tests {
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(
get_server_secret_class(&kafka).unwrap(),
"simple-kafka-server-tls".to_string()
Expand All @@ -618,7 +620,8 @@ mod tests {
serverSecretClass: null
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(get_server_secret_class(&kafka), None);
assert_eq!(
get_internal_secret_class(&kafka),
Expand All @@ -639,7 +642,8 @@ mod tests {
internalSecretClass: simple-kafka-internal-tls
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
assert_eq!(
get_internal_secret_class(&kafka),
Expand All @@ -660,7 +664,8 @@ mod tests {
clusterConfig:
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
assert_eq!(
get_internal_secret_class(&kafka),
Expand All @@ -680,7 +685,8 @@ mod tests {
internalSecretClass: simple-kafka-internal-tls
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
assert_eq!(
get_internal_secret_class(&kafka),
Expand All @@ -700,7 +706,8 @@ mod tests {
serverSecretClass: simple-kafka-server-tls
zookeeperConfigMapName: xyz
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka: v1alpha1::KafkaCluster =
serde_yaml::from_str(input).expect("illegal test input");
assert_eq!(
get_server_secret_class(&kafka),
Some("simple-kafka-server-tls".to_string())
Expand Down
4 changes: 2 additions & 2 deletions rust/operator-binary/src/crd/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use stackable_operator::{
use crate::crd::{
authentication::{self, ResolvedAuthenticationClasses},
listener::{self, node_address_cmd, KafkaListenerConfig},
tls, KafkaCluster, KafkaRole, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME,
tls, v1alpha1, KafkaRole, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME,
SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH,
STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, STACKABLE_LOG_DIR,
};
Expand Down Expand Up @@ -155,7 +155,7 @@ impl KafkaTlsSecurity {
/// all provided `AuthenticationClass` references.
pub async fn new_from_kafka_cluster(
client: &Client,
kafka: &KafkaCluster,
kafka: &v1alpha1::KafkaCluster,
) -> Result<Self, Error> {
Ok(KafkaTlsSecurity {
resolved_authentication_classes: ResolvedAuthenticationClasses::from_references(
Expand Down
Loading

0 comments on commit 28e17da

Please sign in to comment.