Skip to content

Commit

Permalink
chore: Version CRDs and SparkHistoryServerClusterConfig (#525)
Browse files Browse the repository at this point in the history
* chore: Remove separate CRD crate

* chore: Remove unused items

* chore: Version SparkApplication

* chore: Version SparkHistoryServer

* docs: Fix invalid rustdoc reference

* chore: Version SparkHistoryServerConfigCluster

* chore: Remove redundant kind argument
  • Loading branch information
Techassi authored Feb 12, 2025
1 parent cefde43 commit 23e1120
Show file tree
Hide file tree
Showing 18 changed files with 495 additions and 291 deletions.
328 changes: 279 additions & 49 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["rust/crd", "rust/operator-binary"]
members = ["rust/operator-binary"]
resolver = "2"

[workspace.package]
Expand All @@ -10,19 +10,21 @@ edition = "2021"
repository = "https://github.com/stackabletech/spark-k8s-operator"

[workspace.dependencies]
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }

anyhow = "1.0"
built = { version = "0.7", features = ["chrono", "git2"] }
clap = "4.5"
const_format = "0.2"
futures = { version = "0.3", features = ["compat"] }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
rstest = "0.24"
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
snafu = "0.8"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1.39", features = ["full"] }
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spec:
kind: SparkApplication
plural: sparkapplications
shortNames:
- sc
- sparkapp
singular: sparkapplication
scope: Namespaced
versions:
Expand Down Expand Up @@ -998,7 +998,7 @@ spec:
kind: SparkHistoryServer
plural: sparkhistoryservers
shortNames:
- shs
- sparkhist
singular: sparkhistoryserver
scope: Namespaced
versions:
Expand Down
25 changes: 0 additions & 25 deletions rust/crd/Cargo.toml

This file was deleted.

11 changes: 8 additions & 3 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ repository.workspace = true
publish = false

[dependencies]
stackable-spark-k8s-crd = { path = "../crd" }
stackable-versioned.workspace = true
stackable-operator.workspace = true
product-config.workspace = true

anyhow.workspace = true
product-config.workspace = true
const_format.workspace = true
semver.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
snafu.workspace = true
stackable-operator.workspace = true
strum.workspace = true
tracing.workspace = true
tracing-futures.workspace = true
clap.workspace = true
futures.workspace = true
tokio.workspace = true

[dev-dependencies]
indoc.workspace = true
rstest.workspace = true

[build-dependencies]
built.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use stackable_operator::{
k8s_openapi::api::core::v1::PodAntiAffinity,
};

use crate::constants::{APP_NAME, HISTORY_ROLE_NAME};
use crate::crd::constants::{APP_NAME, HISTORY_ROLE_NAME};

pub fn history_affinity(cluster_name: &str) -> StackableAffinityFragment {
let affinity_between_role_pods =
Expand Down Expand Up @@ -36,7 +36,7 @@ mod test {
role_utils::RoleGroupRef,
};

use crate::{constants::HISTORY_ROLE_NAME, history::SparkHistoryServer};
use crate::crd::{constants::HISTORY_ROLE_NAME, history::v1alpha1};

#[test]
pub fn test_history_affinity_defaults() {
Expand All @@ -62,7 +62,7 @@ mod test {
"#;

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
let history: v1alpha1::SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
let expected: StackableAffinity = StackableAffinity {
node_affinity: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub const STACKABLE_TRUST_STORE_NAME: &str = "stackable-truststore";
pub const STACKABLE_TLS_STORE_PASSWORD: &str = "changeit";
pub const SYSTEM_TRUST_STORE_PASSWORD: &str = "changeit";
pub const STACKABLE_MOUNT_PATH_TLS: &str = "/stackable/mount_server_tls";
pub const STACKABLE_MOUNT_NAME_TLS: &str = "servertls";

pub const MIN_MEMORY_OVERHEAD: u32 = 384;
pub const JVM_OVERHEAD_FACTOR: f32 = 0.1;
Expand Down
128 changes: 65 additions & 63 deletions rust/crd/src/history.rs → rust/operator-binary/src/crd/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use stackable_operator::{
schemars::{self, JsonSchema},
time::Duration,
};
use stackable_versioned::versioned;
use strum::{Display, EnumIter};

use crate::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir};
use crate::crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir};

#[derive(Snafu, Debug)]
pub enum Error {
Expand All @@ -48,62 +49,63 @@ pub enum Error {
CannotRetrieveRoleGroup { role_group: String },
}

/// A Spark cluster history server component. This resource is managed by the Stackable operator
/// for Apache Spark. Find more information on how to use it in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server).
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
#[kube(
group = "spark.stackable.tech",
version = "v1alpha1",
kind = "SparkHistoryServer",
shortname = "shs",
namespaced,
crates(
kube_core = "stackable_operator::kube::core",
k8s_openapi = "stackable_operator::k8s_openapi",
schemars = "stackable_operator::schemars"
)
)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerSpec {
pub image: ProductImage,

/// Global Spark history server configuration that applies to all roles and role groups.
#[serde(default)]
pub cluster_config: SparkHistoryServerClusterConfig,

/// Name of the Vector aggregator discovery ConfigMap.
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_aggregator_config_map_name: Option<String>,

/// The log file directory definition used by the Spark history server.
pub log_file_directory: LogFileDirectorySpec,

/// A map of key/value strings that will be passed directly to Spark when deploying the history server.
#[serde(default)]
pub spark_conf: BTreeMap<String, String>,

/// A history server node role definition.
pub nodes: Role<HistoryConfigFragment>,
}
#[versioned(version(name = "v1alpha1"))]
pub mod versioned {
/// A Spark cluster history server component. This resource is managed by the Stackable operator
/// for Apache Spark. Find more information on how to use it in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server).
#[versioned(k8s(
group = "spark.stackable.tech",
shortname = "sparkhist",
namespaced,
crates(
kube_core = "stackable_operator::kube::core",
k8s_openapi = "stackable_operator::k8s_openapi",
schemars = "stackable_operator::schemars"
)
))]
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerSpec {
pub image: ProductImage,

/// Global Spark history server configuration that applies to all roles and role groups.
#[serde(default)]
pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig,

/// Name of the Vector aggregator discovery ConfigMap.
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_aggregator_config_map_name: Option<String>,

/// The log file directory definition used by the Spark history server.
pub log_file_directory: LogFileDirectorySpec,

/// A map of key/value strings that will be passed directly to Spark when deploying the history server.
#[serde(default)]
pub spark_conf: BTreeMap<String, String>,

/// A history server node role definition.
pub nodes: Role<HistoryConfigFragment>,
}

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerClusterConfig {
/// This field controls which type of Service the Operator creates for this HistoryServer:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
///
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerClusterConfig {
/// This field controls which type of Service the Operator creates for this HistoryServer:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
///
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
}
}

// TODO: Temporary solution until listener-operator is finished
Expand All @@ -129,7 +131,7 @@ impl CurrentlySupportedListenerClasses {
}
}

impl SparkHistoryServer {
impl v1alpha1::SparkHistoryServer {
/// Returns a reference to the role. Raises an error if the role is not defined.
pub fn role(&self) -> &Role<HistoryConfigFragment> {
&self.spec.nodes
Expand All @@ -138,7 +140,7 @@ impl SparkHistoryServer {
/// Returns a reference to the role group. Raises an error if the role or role group are not defined.
pub fn rolegroup(
&self,
rolegroup_ref: &RoleGroupRef<SparkHistoryServer>,
rolegroup_ref: &RoleGroupRef<Self>,
) -> Result<RoleGroup<HistoryConfigFragment, GenericProductSpecificCommonConfig>, Error> {
self.spec
.nodes
Expand All @@ -152,7 +154,7 @@ impl SparkHistoryServer {

pub fn merged_config(
&self,
rolegroup_ref: &RoleGroupRef<SparkHistoryServer>,
rolegroup_ref: &RoleGroupRef<Self>,
) -> Result<HistoryConfig, Error> {
// Initialize the result with all default values as baseline
let conf_defaults = HistoryConfig::default_config(&self.name_any());
Expand Down Expand Up @@ -184,7 +186,7 @@ impl SparkHistoryServer {
.map(i32::from)
}

pub fn cleaner_rolegroups(&self) -> Vec<RoleGroupRef<SparkHistoryServer>> {
pub fn cleaner_rolegroups(&self) -> Vec<RoleGroupRef<Self>> {
let mut rgs = vec![];
for (rg_name, rg_config) in &self.spec.nodes.role_groups {
if let Some(true) = rg_config.config.config.cleaner {
Expand Down Expand Up @@ -444,7 +446,7 @@ impl HistoryConfig {
}

impl Configuration for HistoryConfigFragment {
type Configurable = SparkHistoryServer;
type Configurable = v1alpha1::SparkHistoryServer;

fn compute_env(
&self,
Expand Down Expand Up @@ -484,7 +486,7 @@ mod test {
};

use super::*;
use crate::logdir::S3LogDir;
use crate::crd::logdir::S3LogDir;

#[test]
pub fn test_env_overrides() {
Expand Down Expand Up @@ -515,7 +517,7 @@ mod test {
"#};

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
let history: v1alpha1::SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();

let log_dir = ResolvedLogDir::S3(S3LogDir {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use stackable_operator::{
};
use strum::{EnumDiscriminants, IntoStaticStr};

use crate::{
use crate::crd::{
constants::*,
history::{
LogFileDirectorySpec::{self, S3},
Expand Down Expand Up @@ -133,13 +133,6 @@ impl ResolvedLogDir {
}
}

pub fn credentials(&self) -> Option<SecretClassVolume> {
match self {
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials(),
ResolvedLogDir::Custom(_) => None,
}
}

pub fn credentials_mount_path(&self) -> Option<String> {
match self {
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_mount_path(),
Expand Down
Loading

0 comments on commit 23e1120

Please sign in to comment.