From 23e1120eae4e8a6ed07d13efe84e002e1ba1c9de Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 12 Feb 2025 14:10:12 +0100 Subject: [PATCH] chore: Version CRDs and SparkHistoryServerClusterConfig (#525) * chore: Remove separate CRD crate * chore: Remove unused items * chore: Version SparkApplication * chore: Version SparkHistoryServer * docs: Fix invalid rustdoc reference * chore: Version SparkHistoryServerConfigCluster * chore: Remove redundant kind argument --- Cargo.lock | 328 +++++++++++++++--- Cargo.toml | 8 +- deploy/helm/spark-k8s-operator/crds/crds.yaml | 4 +- rust/crd/Cargo.toml | 25 -- rust/operator-binary/Cargo.toml | 11 +- .../src/crd}/affinity.rs | 6 +- .../src/crd}/constants.rs | 1 - .../src/crd}/history.rs | 128 +++---- .../src => operator-binary/src/crd}/logdir.rs | 9 +- .../lib.rs => operator-binary/src/crd/mod.rs} | 66 ++-- .../src => operator-binary/src/crd}/roles.rs | 10 +- .../src/crd}/tlscerts.rs | 2 +- .../src/history/history_controller.rs | 69 ++-- .../src/history/operations/pdb.rs | 7 +- rust/operator-binary/src/main.rs | 40 ++- .../src/pod_driver_controller.rs | 13 +- rust/operator-binary/src/product_logging.rs | 7 +- .../src/spark_k8s_controller.rs | 52 ++- 18 files changed, 495 insertions(+), 291 deletions(-) delete mode 100644 rust/crd/Cargo.toml rename rust/{crd/src => operator-binary/src/crd}/affinity.rs (95%) rename rust/{crd/src => operator-binary/src/crd}/constants.rs (98%) rename rust/{crd/src => operator-binary/src/crd}/history.rs (83%) rename rust/{crd/src => operator-binary/src/crd}/logdir.rs (97%) rename rust/{crd/src/lib.rs => operator-binary/src/crd/mod.rs} (97%) rename rust/{crd/src => operator-binary/src/crd}/roles.rs (96%) rename rust/{crd/src => operator-binary/src/crd}/tlscerts.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index b8b7b62b..4e3afcbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.8.0" @@ -368,6 +374,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -624,6 +639,15 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "fluent-uri" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -780,7 +804,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags", + "bitflags 2.8.0", "libc", "libgit2-sys", "log", @@ -793,6 +817,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.2" @@ -1171,7 +1205,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1201,6 +1235,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -1237,18 +1280,45 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-patch" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" +dependencies = [ + "jsonptr 0.4.7", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json-patch" version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "863726d7afb6bc2590eeff7135d923545e5e964f004c2ccf8716c25e70a86f08" dependencies = [ - "jsonptr", + "jsonptr 0.6.3", "serde", "serde_json", "thiserror 1.0.69", ] +[[package]] +name = "jsonpath-rust" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d8fe85bd70ff715f31ce8c739194b423d79811a19602115d611a3ec85d6200" +dependencies = [ + "lazy_static", + "once_cell", + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "jsonpath-rust" version = "0.7.5" @@ -1262,6 +1332,17 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "jsonptr" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +dependencies = [ + "fluent-uri", + "serde", + "serde_json", +] + [[package]] name = "jsonptr" version = "0.6.3" @@ -1272,6 +1353,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "k8s-openapi" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8847402328d8301354c94d605481f25a6bdc1ed65471fd96af8eca71141b13" +dependencies = [ + "base64 0.22.1", + "chrono", + "schemars", + "serde", + "serde-value", + "serde_json", +] + [[package]] name = "k8s-openapi" version = "0.24.0" @@ -1286,17 +1381,78 @@ dependencies = [ "serde_json", ] +[[package]] +name = "k8s-version" +version = "0.1.2" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.5.0#048c7d8befddc2f2c6414444006871c95412d67c" +dependencies = [ + "darling", + "regex", + "snafu 0.8.5", +] + +[[package]] +name = "kube" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efffeb3df0bd4ef3e5d65044573499c0e4889b988070b08c50b25b1329289a1f" +dependencies = [ + "k8s-openapi 0.23.0", + "kube-client 0.96.0", + "kube-core 0.96.0", + "kube-derive 0.96.0", + "kube-runtime 0.96.0", +] + [[package]] name = "kube" version = "0.98.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32053dc495efad4d188c7b33cc7c02ef4a6e43038115348348876efd39a53cba" dependencies = [ - "k8s-openapi", - "kube-client", - "kube-core", - "kube-derive", - "kube-runtime", + "k8s-openapi 0.24.0", + "kube-client 0.98.0", + "kube-core 0.98.0", + "kube-derive 0.98.0", + "kube-runtime 0.98.0", +] + +[[package]] +name = "kube-client" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf471ece8ff8d24735ce78dac4d091e9fcb8d74811aeb6b75de4d1c3f5de0f1" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures 0.3.31", + "home", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-http-proxy", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust 0.5.1", + "k8s-openapi 0.23.0", + "kube-core 0.96.0", + "pem", + "rustls", + "rustls-pemfile", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", ] [[package]] @@ -1319,9 +1475,9 @@ dependencies = [ "hyper-rustls", "hyper-timeout", "hyper-util", - "jsonpath-rust", - "k8s-openapi", - "kube-core", + "jsonpath-rust 0.7.5", + "k8s-openapi 0.24.0", + "kube-core 0.98.0", "pem", "rustls", "rustls-pemfile", @@ -1337,6 +1493,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "kube-core" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f42346d30bb34d1d7adc5c549b691bce7aa3a1e60254e68fab7e2d7b26fe3d77" +dependencies = [ + "chrono", + "form_urlencoded", + "http", + "json-patch 2.0.0", + "k8s-openapi 0.23.0", + "schemars", + "serde", + "serde-value", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "kube-core" version = "0.98.0" @@ -1346,8 +1520,8 @@ dependencies = [ "chrono", "form_urlencoded", "http", - "json-patch", - "k8s-openapi", + "json-patch 3.0.1", + "k8s-openapi 0.24.0", "schemars", "serde", "serde-value", @@ -1355,6 +1529,19 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "kube-derive" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9364e04cc5e0482136c6ee8b7fb7551812da25802249f35b3def7aaa31e82ad" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.96", +] + [[package]] name = "kube-derive" version = "0.98.0" @@ -1368,6 +1555,34 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "kube-runtime" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fbf1f6ffa98e65f1d2a9a69338bb60605d46be7edf00237784b89e62c9bd44" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "async-trait", + "backoff", + "educe", + "futures 0.3.31", + "hashbrown 0.14.5", + "json-patch 2.0.0", + "jsonptr 0.4.7", + "k8s-openapi 0.23.0", + "kube-client 0.96.0", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "kube-runtime" version = "0.98.0" @@ -1381,12 +1596,12 @@ dependencies = [ "backoff", "educe", "futures 0.3.31", - "hashbrown", + "hashbrown 0.15.2", "hostname", - "json-patch", - "jsonptr", - "k8s-openapi", - "kube-client", + "json-patch 3.0.1", + "jsonptr 0.6.3", + "k8s-openapi 0.24.0", + "kube-client 0.98.0", "parking_lot", "pin-project", "serde", @@ -1856,7 +2071,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags", + "bitflags 2.8.0", ] [[package]] @@ -2102,7 +2317,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -2115,7 +2330,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -2343,9 +2558,9 @@ dependencies = [ "either", "futures 0.3.31", "indexmap", - "json-patch", - "k8s-openapi", - "kube", + "json-patch 3.0.1", + "k8s-openapi 0.24.0", + "kube 0.98.0", "opentelemetry-jaeger", "opentelemetry_sdk", "product-config", @@ -2383,31 +2598,13 @@ name = "stackable-shared" version = "0.0.1" source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.85.0#59506c6202778889a27b6ae8153457e60a49c68d" dependencies = [ - "kube", + "kube 0.98.0", "semver", "serde", "serde_yaml", "snafu 0.8.5", ] -[[package]] -name = "stackable-spark-k8s-crd" -version = "0.0.0-dev" -dependencies = [ - "const_format", - "indoc", - "product-config", - "rstest", - "semver", - "serde", - "serde_json", - "serde_yaml", - "snafu 0.8.5", - "stackable-operator", - "strum", - "tracing", -] - [[package]] name = "stackable-spark-k8s-operator" version = "0.0.0-dev" @@ -2415,21 +2612,48 @@ dependencies = [ "anyhow", "built", "clap", + "const_format", "futures 0.3.31", + "indoc", "product-config", + "rstest", "semver", "serde", "serde_json", "serde_yaml", "snafu 0.8.5", "stackable-operator", - "stackable-spark-k8s-crd", + "stackable-versioned", "strum", "tokio", "tracing", "tracing-futures", ] +[[package]] +name = "stackable-versioned" +version = "0.5.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.5.0#048c7d8befddc2f2c6414444006871c95412d67c" +dependencies = [ + "stackable-versioned-macros", +] + +[[package]] +name = "stackable-versioned-macros" +version = "0.5.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.5.0#048c7d8befddc2f2c6414444006871c95412d67c" +dependencies = [ + "convert_case", + "darling", + "itertools", + "k8s-openapi 0.23.0", + "k8s-version", + "kube 0.96.0", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2688,9 +2912,9 @@ checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" [[package]] name = "toml_edit" -version = "0.22.22" +version = "0.22.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +checksum = "02a8b472d1a3d7c18e2d61a489aee3453fd9031c33e4f55bd533f4a7adca1bee" dependencies = [ "indexmap", "toml_datetime", @@ -2721,7 +2945,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ "base64 0.22.1", - "bitflags", + "bitflags 2.8.0", "bytes", "http", "http-body", @@ -2885,6 +3109,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -3159,9 +3389,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.6.24" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8d71a593cc5c42ad7876e2c1fda56f314f3754c084128833e64f1345ff8a03a" +checksum = "86e376c75f4f43f44db463cf729e0d3acbf954d13e22c51e26e4c264b4ab545f" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index e377e329..00d1ed01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["rust/crd", "rust/operator-binary"] +members = ["rust/operator-binary"] resolver = "2" [workspace.package] @@ -10,19 +10,21 @@ edition = "2021" repository = "https://github.com/stackabletech/spark-k8s-operator" [workspace.dependencies] +stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" } +product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } + anyhow = "1.0" built = { version = "0.7", features = ["chrono", "git2"] } clap = "4.5" const_format = "0.2" futures = { version = "0.3", features = ["compat"] } -product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } rstest = "0.24" semver = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" snafu = "0.8" -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" } strum = { version = "0.26", features = ["derive"] } tokio = { version = "1.39", features = ["full"] } tracing = "0.1" diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 096f6b47..eac4ad07 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -12,7 +12,7 @@ spec: kind: SparkApplication plural: sparkapplications shortNames: - - sc + - sparkapp singular: sparkapplication scope: Namespaced versions: @@ -998,7 +998,7 @@ spec: kind: SparkHistoryServer plural: sparkhistoryservers shortNames: - - shs + - sparkhist singular: sparkhistoryserver scope: Namespaced versions: diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml deleted file mode 100644 index 0661eb65..00000000 --- a/rust/crd/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "stackable-spark-k8s-crd" -description = "Contains the Apache Spark CRD structs and utilities" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true -repository.workspace = true -publish = false - -[dependencies] -const_format.workspace = true -product-config.workspace = true -semver.workspace = true -serde.workspace = true -serde_json.workspace = true -snafu.workspace = true -stackable-operator.workspace = true -strum.workspace = true -tracing.workspace = true - -[dev-dependencies] -rstest.workspace = true -serde_yaml.workspace = true -indoc.workspace = true diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 1815ae58..b955c11e 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -9,16 +9,17 @@ repository.workspace = true publish = false [dependencies] -stackable-spark-k8s-crd = { path = "../crd" } +stackable-versioned.workspace = true +stackable-operator.workspace = true +product-config.workspace = true anyhow.workspace = true -product-config.workspace = true +const_format.workspace = true semver.workspace = true serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true snafu.workspace = true -stackable-operator.workspace = true strum.workspace = true tracing.workspace = true tracing-futures.workspace = true @@ -26,5 +27,9 @@ clap.workspace = true futures.workspace = true tokio.workspace = true +[dev-dependencies] +indoc.workspace = true +rstest.workspace = true + [build-dependencies] built.workspace = true diff --git a/rust/crd/src/affinity.rs b/rust/operator-binary/src/crd/affinity.rs similarity index 95% rename from rust/crd/src/affinity.rs rename to rust/operator-binary/src/crd/affinity.rs index ac476c04..4023fbc0 100644 --- a/rust/crd/src/affinity.rs +++ b/rust/operator-binary/src/crd/affinity.rs @@ -3,7 +3,7 @@ use stackable_operator::{ k8s_openapi::api::core::v1::PodAntiAffinity, }; -use crate::constants::{APP_NAME, HISTORY_ROLE_NAME}; +use crate::crd::constants::{APP_NAME, HISTORY_ROLE_NAME}; pub fn history_affinity(cluster_name: &str) -> StackableAffinityFragment { let affinity_between_role_pods = @@ -36,7 +36,7 @@ mod test { role_utils::RoleGroupRef, }; - use crate::{constants::HISTORY_ROLE_NAME, history::SparkHistoryServer}; + use crate::crd::{constants::HISTORY_ROLE_NAME, history::v1alpha1}; #[test] pub fn test_history_affinity_defaults() { @@ -62,7 +62,7 @@ mod test { "#; let deserializer = serde_yaml::Deserializer::from_str(input); - let history: SparkHistoryServer = + let history: v1alpha1::SparkHistoryServer = serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); let expected: StackableAffinity = StackableAffinity { node_affinity: None, diff --git a/rust/crd/src/constants.rs b/rust/operator-binary/src/crd/constants.rs similarity index 98% rename from rust/crd/src/constants.rs rename to rust/operator-binary/src/crd/constants.rs index c37ed8e2..9b7f3d63 100644 --- a/rust/crd/src/constants.rs +++ b/rust/operator-binary/src/crd/constants.rs @@ -43,7 +43,6 @@ pub const STACKABLE_TRUST_STORE_NAME: &str = "stackable-truststore"; pub const STACKABLE_TLS_STORE_PASSWORD: &str = "changeit"; pub const SYSTEM_TRUST_STORE_PASSWORD: &str = "changeit"; pub const STACKABLE_MOUNT_PATH_TLS: &str = "/stackable/mount_server_tls"; -pub const STACKABLE_MOUNT_NAME_TLS: &str = "servertls"; pub const MIN_MEMORY_OVERHEAD: u32 = 384; pub const JVM_OVERHEAD_FACTOR: f32 = 0.1; diff --git a/rust/crd/src/history.rs b/rust/operator-binary/src/crd/history.rs similarity index 83% rename from rust/crd/src/history.rs rename to rust/operator-binary/src/crd/history.rs index af837c13..0bb1cb6a 100644 --- a/rust/crd/src/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -28,9 +28,10 @@ use stackable_operator::{ schemars::{self, JsonSchema}, time::Duration, }; +use stackable_versioned::versioned; use strum::{Display, EnumIter}; -use crate::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}; +use crate::crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}; #[derive(Snafu, Debug)] pub enum Error { @@ -48,62 +49,63 @@ pub enum Error { CannotRetrieveRoleGroup { role_group: String }, } -/// A Spark cluster history server component. This resource is managed by the Stackable operator -/// for Apache Spark. Find more information on how to use it in the -/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server). -#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] -#[kube( - group = "spark.stackable.tech", - version = "v1alpha1", - kind = "SparkHistoryServer", - shortname = "shs", - namespaced, - crates( - kube_core = "stackable_operator::kube::core", - k8s_openapi = "stackable_operator::k8s_openapi", - schemars = "stackable_operator::schemars" - ) -)] -#[serde(rename_all = "camelCase")] -pub struct SparkHistoryServerSpec { - pub image: ProductImage, - - /// Global Spark history server configuration that applies to all roles and role groups. - #[serde(default)] - pub cluster_config: SparkHistoryServerClusterConfig, - - /// Name of the Vector aggregator discovery ConfigMap. - /// It must contain the key `ADDRESS` with the address of the Vector aggregator. - #[serde(skip_serializing_if = "Option::is_none")] - pub vector_aggregator_config_map_name: Option, - - /// The log file directory definition used by the Spark history server. - pub log_file_directory: LogFileDirectorySpec, - - /// A map of key/value strings that will be passed directly to Spark when deploying the history server. - #[serde(default)] - pub spark_conf: BTreeMap, - - /// A history server node role definition. - pub nodes: Role, -} +#[versioned(version(name = "v1alpha1"))] +pub mod versioned { + /// A Spark cluster history server component. This resource is managed by the Stackable operator + /// for Apache Spark. Find more information on how to use it in the + /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server). + #[versioned(k8s( + group = "spark.stackable.tech", + shortname = "sparkhist", + namespaced, + crates( + kube_core = "stackable_operator::kube::core", + k8s_openapi = "stackable_operator::k8s_openapi", + schemars = "stackable_operator::schemars" + ) + ))] + #[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct SparkHistoryServerSpec { + pub image: ProductImage, + + /// Global Spark history server configuration that applies to all roles and role groups. + #[serde(default)] + pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig, + + /// Name of the Vector aggregator discovery ConfigMap. + /// It must contain the key `ADDRESS` with the address of the Vector aggregator. + #[serde(skip_serializing_if = "Option::is_none")] + pub vector_aggregator_config_map_name: Option, + + /// The log file directory definition used by the Spark history server. + pub log_file_directory: LogFileDirectorySpec, + + /// A map of key/value strings that will be passed directly to Spark when deploying the history server. + #[serde(default)] + pub spark_conf: BTreeMap, + + /// A history server node role definition. + pub nodes: Role, + } -#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct SparkHistoryServerClusterConfig { - /// This field controls which type of Service the Operator creates for this HistoryServer: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// * external-stable: Use a LoadBalancer service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which ListenerClass - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, + #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct SparkHistoryServerClusterConfig { + /// This field controls which type of Service the Operator creates for this HistoryServer: + /// + /// * cluster-internal: Use a ClusterIP service + /// + /// * external-unstable: Use a NodePort service + /// + /// * external-stable: Use a LoadBalancer service + /// + /// This is a temporary solution with the goal to keep yaml manifests forward compatible. + /// In the future, this setting will control which ListenerClass + /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. + #[serde(default)] + pub listener_class: CurrentlySupportedListenerClasses, + } } // TODO: Temporary solution until listener-operator is finished @@ -129,7 +131,7 @@ impl CurrentlySupportedListenerClasses { } } -impl SparkHistoryServer { +impl v1alpha1::SparkHistoryServer { /// Returns a reference to the role. Raises an error if the role is not defined. pub fn role(&self) -> &Role { &self.spec.nodes @@ -138,7 +140,7 @@ impl SparkHistoryServer { /// Returns a reference to the role group. Raises an error if the role or role group are not defined. pub fn rolegroup( &self, - rolegroup_ref: &RoleGroupRef, + rolegroup_ref: &RoleGroupRef, ) -> Result, Error> { self.spec .nodes @@ -152,7 +154,7 @@ impl SparkHistoryServer { pub fn merged_config( &self, - rolegroup_ref: &RoleGroupRef, + rolegroup_ref: &RoleGroupRef, ) -> Result { // Initialize the result with all default values as baseline let conf_defaults = HistoryConfig::default_config(&self.name_any()); @@ -184,7 +186,7 @@ impl SparkHistoryServer { .map(i32::from) } - pub fn cleaner_rolegroups(&self) -> Vec> { + pub fn cleaner_rolegroups(&self) -> Vec> { let mut rgs = vec![]; for (rg_name, rg_config) in &self.spec.nodes.role_groups { if let Some(true) = rg_config.config.config.cleaner { @@ -444,7 +446,7 @@ impl HistoryConfig { } impl Configuration for HistoryConfigFragment { - type Configurable = SparkHistoryServer; + type Configurable = v1alpha1::SparkHistoryServer; fn compute_env( &self, @@ -484,7 +486,7 @@ mod test { }; use super::*; - use crate::logdir::S3LogDir; + use crate::crd::logdir::S3LogDir; #[test] pub fn test_env_overrides() { @@ -515,7 +517,7 @@ mod test { "#}; let deserializer = serde_yaml::Deserializer::from_str(input); - let history: SparkHistoryServer = + let history: v1alpha1::SparkHistoryServer = serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); let log_dir = ResolvedLogDir::S3(S3LogDir { diff --git a/rust/crd/src/logdir.rs b/rust/operator-binary/src/crd/logdir.rs similarity index 97% rename from rust/crd/src/logdir.rs rename to rust/operator-binary/src/crd/logdir.rs index 63c00c7e..bf8e2c81 100644 --- a/rust/crd/src/logdir.rs +++ b/rust/operator-binary/src/crd/logdir.rs @@ -15,7 +15,7 @@ use stackable_operator::{ }; use strum::{EnumDiscriminants, IntoStaticStr}; -use crate::{ +use crate::crd::{ constants::*, history::{ LogFileDirectorySpec::{self, S3}, @@ -133,13 +133,6 @@ impl ResolvedLogDir { } } - pub fn credentials(&self) -> Option { - match self { - ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials(), - ResolvedLogDir::Custom(_) => None, - } - } - pub fn credentials_mount_path(&self) -> Option { match self { ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_mount_path(), diff --git a/rust/crd/src/lib.rs b/rust/operator-binary/src/crd/mod.rs similarity index 97% rename from rust/crd/src/lib.rs rename to rust/operator-binary/src/crd/mod.rs index ee669e21..ec7a2eb7 100644 --- a/rust/crd/src/lib.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -1,12 +1,5 @@ //! This module provides all required CRD definitions and additional helper methods. -pub mod affinity; -pub mod constants; -pub mod history; -pub mod logdir; -pub mod roles; -pub mod tlscerts; - use std::{ cmp::max, collections::{BTreeMap, HashMap}, @@ -51,8 +44,19 @@ use stackable_operator::{ time::Duration, utils::crds::raw_object_list_schema, }; +use stackable_versioned::versioned; -pub use crate::roles::*; +use crate::crd::roles::{ + RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, SubmitConfig, + SubmitConfigFragment, VolumeMounts, +}; + +pub mod affinity; +pub mod constants; +pub mod history; +pub mod logdir; +pub mod roles; +pub mod tlscerts; #[derive(Snafu, Debug)] pub enum Error { @@ -123,20 +127,21 @@ pub struct SparkApplicationStatus { /// /// The SparkApplication CRD looks a little different than the CRDs of the other products on the /// Stackable Data Platform. -#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] -#[kube( - group = "spark.stackable.tech", - version = "v1alpha1", - kind = "SparkApplication", - shortname = "sc", - status = "SparkApplicationStatus", - namespaced, - crates( - kube_core = "stackable_operator::kube::core", - k8s_openapi = "stackable_operator::k8s_openapi", - schemars = "stackable_operator::schemars" +#[versioned( + version(name = "v1alpha1"), + k8s( + group = "spark.stackable.tech", + shortname = "sparkapp", + status = "SparkApplicationStatus", + namespaced, + crates( + kube_core = "stackable_operator::kube::core", + k8s_openapi = "stackable_operator::k8s_openapi", + schemars = "stackable_operator::schemars" + ) ) )] +#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] #[serde(rename_all = "camelCase")] pub struct SparkApplicationSpec { /// Mode: cluster or client. Currently only cluster is supported. @@ -238,7 +243,7 @@ pub struct JobDependencies { pub exclude_packages: Vec, } -impl SparkApplication { +impl v1alpha1::SparkApplication { /// Returns if this [`SparkApplication`] has already created a Kubernetes Job doing the actual `spark-submit`. /// /// This is needed because Kubernetes will remove the succeeded Job after some time. When the spark-k8s-operator is @@ -262,10 +267,6 @@ impl SparkApplication { format!("{app_name}-{role}-pod-template", app_name = self.name_any()) } - pub fn image(&self) -> Option<&str> { - self.spec.image.as_deref() - } - pub fn application_artifact(&self) -> &str { self.spec.main_application_file.as_ref() } @@ -510,7 +511,7 @@ impl SparkApplication { &'a self, app_version: &'a str, role: &'a str, - ) -> ObjectLabels { + ) -> ObjectLabels { ObjectLabels { owner: self, app_name: APP_NAME, @@ -1101,14 +1102,11 @@ mod tests { }; use super::*; - use crate::{ - cores_from_quantity, resources_to_driver_props, resources_to_executor_props, Quantity, - RoleConfig, SparkApplication, SparkStorageConfig, - }; + use crate::crd::roles::SparkStorageConfig; #[test] fn test_default_resource_limits() { - let spark_application = serde_yaml::from_str::(indoc! {" + let spark_application = serde_yaml::from_str::(indoc! {" --- apiVersion: spark.stackable.tech/v1alpha1 kind: SparkApplication @@ -1137,7 +1135,7 @@ mod tests { #[test] fn test_merged_resource_limits() { - let spark_application = serde_yaml::from_str::(indoc! {r#" + let spark_application = serde_yaml::from_str::(indoc! {r#" --- apiVersion: spark.stackable.tech/v1alpha1 kind: SparkApplication @@ -1314,7 +1312,7 @@ mod tests { #[test] fn test_validated_config() { - let spark_application = serde_yaml::from_str::(indoc! {r#" + let spark_application = serde_yaml::from_str::(indoc! {r#" --- apiVersion: spark.stackable.tech/v1alpha1 kind: SparkApplication @@ -1379,7 +1377,7 @@ mod tests { #[test] fn test_job_volume_mounts() { - let spark_application = serde_yaml::from_str::(indoc! {r#" + let spark_application = serde_yaml::from_str::(indoc! {r#" --- apiVersion: spark.stackable.tech/v1alpha1 kind: SparkApplication diff --git a/rust/crd/src/roles.rs b/rust/operator-binary/src/crd/roles.rs similarity index 96% rename from rust/crd/src/roles.rs rename to rust/operator-binary/src/crd/roles.rs index 94da26f2..f903a5bf 100644 --- a/rust/crd/src/roles.rs +++ b/rust/operator-binary/src/crd/roles.rs @@ -7,7 +7,7 @@ //! this responsibility to the Submit job. //! //! The submit job only supports one group per role. For this reason, the -//! [`SparkApplication`] spec doesn't declare Role objects directly. Instead it +//! [`v1alpha1::SparkApplication`] spec doesn't declare Role objects directly. Instead it //! only declares [`stackable_operator::role_utils::CommonConfiguration`] objects for job, //! driver and executor and constructs the Roles dynamically when needed. The only group under //! each role is named "default". These roles are transparent to the user. @@ -38,7 +38,7 @@ use stackable_operator::{ }; use strum::{Display, EnumIter}; -use crate::{ResolvedLogDir, SparkApplication}; +use crate::crd::{v1alpha1, ResolvedLogDir}; #[derive(Clone, Debug, Deserialize, Display, Eq, PartialEq, Serialize, JsonSchema)] #[strum(serialize_all = "kebab-case")] @@ -155,7 +155,7 @@ impl RoleConfig { } pub fn volume_mounts( &self, - spark_application: &SparkApplication, + spark_application: &v1alpha1::SparkApplication, s3conn: &Option, logdir: &Option, ) -> Vec { @@ -165,7 +165,7 @@ impl RoleConfig { } impl Configuration for RoleConfigFragment { - type Configurable = SparkApplication; + type Configurable = v1alpha1::SparkApplication; fn compute_env( &self, @@ -246,7 +246,7 @@ impl SubmitConfig { } impl Configuration for SubmitConfigFragment { - type Configurable = SparkApplication; + type Configurable = v1alpha1::SparkApplication; fn compute_env( &self, diff --git a/rust/crd/src/tlscerts.rs b/rust/operator-binary/src/crd/tlscerts.rs similarity index 99% rename from rust/crd/src/tlscerts.rs rename to rust/operator-binary/src/crd/tlscerts.rs index 77ac2e8e..6600823d 100644 --- a/rust/crd/src/tlscerts.rs +++ b/rust/operator-binary/src/crd/tlscerts.rs @@ -3,7 +3,7 @@ use stackable_operator::commons::{ tls_verification::{CaCert, Tls, TlsClientDetails, TlsServerVerification, TlsVerification}, }; -use crate::{ +use crate::crd::{ constants::{ STACKABLE_MOUNT_PATH_TLS, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, SYSTEM_TRUST_STORE, SYSTEM_TRUST_STORE_PASSWORD, diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index aeec5742..96535e2f 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -45,23 +45,22 @@ use stackable_operator::{ role_utils::RoleGroupRef, time::Duration, }; -use stackable_spark_k8s_crd::{ - constants::{ - ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, - JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, - SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, - SPARK_IMAGE_BASE_NAME, SPARK_UID, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, - VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, - VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, - }, - history, - history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer}, - logdir::ResolvedLogDir, - tlscerts, to_spark_env_sh_string, -}; use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ + crd::{ + constants::{ + ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, + JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, + SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, + SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID, STACKABLE_TRUST_STORE, + VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, + VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + history::{self, v1alpha1, HistoryConfig, SparkHistoryServerContainer}, + logdir::ResolvedLogDir, + tlscerts, to_spark_env_sh_string, + }, history::operations::pdb::add_pdbs, product_logging::{self, resolve_vector_aggregator_address}, Ctx, @@ -119,14 +118,10 @@ pub enum Error { }, #[snafu(display("product config validation failed"))] - ProductConfigValidation { - source: stackable_spark_k8s_crd::history::Error, - }, + ProductConfigValidation { source: crate::crd::history::Error }, #[snafu(display("failed to resolve and merge config for role and role group"))] - FailedToResolveConfig { - source: stackable_spark_k8s_crd::history::Error, - }, + FailedToResolveConfig { source: crate::crd::history::Error }, #[snafu(display("number of cleaner rolegroups exceeds 1"))] TooManyCleanerRoleGroups, @@ -135,9 +130,7 @@ pub enum Error { TooManyCleanerReplicas, #[snafu(display("failed to resolve the log dir configuration"))] - LogDir { - source: stackable_spark_k8s_crd::logdir::Error, - }, + LogDir { source: crate::crd::logdir::Error }, #[snafu(display("failed to create cluster resources"))] CreateClusterResources { @@ -195,9 +188,7 @@ pub enum Error { }, #[snafu(display("failed to create the log dir volumes specification"))] - CreateLogDirVolumesSpec { - source: stackable_spark_k8s_crd::logdir::Error, - }, + CreateLogDirVolumesSpec { source: crate::crd::logdir::Error }, #[snafu(display("failed to add needed volume"))] AddVolume { source: builder::pod::Error }, @@ -222,7 +213,7 @@ impl ReconcilerError for Error { } /// Updates the status of the SparkApplication that started the pod. pub async fn reconcile( - shs: Arc>, + shs: Arc>, ctx: Arc, ) -> Result { tracing::info!("Starting reconcile history server"); @@ -365,7 +356,7 @@ pub async fn reconcile( } pub fn error_policy( - _obj: Arc>, + _obj: Arc>, error: &Error, _ctx: Arc, ) -> Action { @@ -377,11 +368,11 @@ pub fn error_policy( #[allow(clippy::result_large_err)] fn build_config_map( - shs: &SparkHistoryServer, + shs: &v1alpha1::SparkHistoryServer, config: &HashMap>, merged_config: &HistoryConfig, app_version_label: &str, - rolegroupref: &RoleGroupRef, + rolegroupref: &RoleGroupRef, log_dir: &ResolvedLogDir, vector_aggregator_address: Option<&str>, ) -> Result { @@ -449,9 +440,9 @@ fn build_config_map( #[allow(clippy::result_large_err)] fn build_stateful_set( - shs: &SparkHistoryServer, + shs: &v1alpha1::SparkHistoryServer, resolved_product_image: &ResolvedProductImage, - rolegroupref: &RoleGroupRef, + rolegroupref: &RoleGroupRef, log_dir: &ResolvedLogDir, merged_config: &HistoryConfig, serviceaccount: &ServiceAccount, @@ -617,10 +608,10 @@ fn build_stateful_set( #[allow(clippy::result_large_err)] fn build_service( - shs: &SparkHistoryServer, + shs: &v1alpha1::SparkHistoryServer, app_version_label: &str, role: &str, - group: Option<&RoleGroupRef>, + group: Option<&RoleGroupRef>, ) -> Result { let group_name = match group { Some(rgr) => rgr.role_group.clone(), @@ -685,7 +676,7 @@ fn build_service( // See: https://github.com/stackabletech/spark-k8s-operator/issues/499 #[allow(clippy::result_large_err)] fn build_history_role_serviceaccount( - shs: &SparkHistoryServer, + shs: &v1alpha1::SparkHistoryServer, app_version_label: &str, ) -> Result<(ServiceAccount, RoleBinding)> { let sa = ServiceAccount { @@ -726,9 +717,9 @@ fn build_history_role_serviceaccount( #[allow(clippy::result_large_err)] fn spark_defaults( - shs: &SparkHistoryServer, + shs: &v1alpha1::SparkHistoryServer, log_dir: &ResolvedLogDir, - rolegroupref: &RoleGroupRef, + rolegroupref: &RoleGroupRef, ) -> Result { let mut log_dir_settings = log_dir.history_server_spark_config().context(LogDirSnafu)?; @@ -790,8 +781,8 @@ fn labels<'a, T>( /// group should have a replica count of 0 or 1. #[allow(clippy::result_large_err)] fn cleaner_config( - shs: &SparkHistoryServer, - rolegroup_ref: &RoleGroupRef, + shs: &v1alpha1::SparkHistoryServer, + rolegroup_ref: &RoleGroupRef, ) -> Result, Error> { let mut result = BTreeMap::new(); diff --git a/rust/operator-binary/src/history/operations/pdb.rs b/rust/operator-binary/src/history/operations/pdb.rs index ca27f840..f89b4978 100644 --- a/rust/operator-binary/src/history/operations/pdb.rs +++ b/rust/operator-binary/src/history/operations/pdb.rs @@ -3,9 +3,10 @@ use stackable_operator::{ builder::pdb::PodDisruptionBudgetBuilder, client::Client, cluster_resources::ClusterResources, commons::pdb::PdbConfig, kube::ResourceExt, }; -use stackable_spark_k8s_crd::{ + +use crate::crd::{ constants::{APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, OPERATOR_NAME}, - history::SparkHistoryServer, + history::v1alpha1, }; #[derive(Snafu, Debug)] @@ -24,7 +25,7 @@ pub enum Error { pub async fn add_pdbs( pdb: &PdbConfig, - history: &SparkHistoryServer, + history: &v1alpha1::SparkHistoryServer, client: &Client, cluster_resources: &mut ClusterResources, ) -> Result<(), Error> { diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 109b3379..a3dc11a3 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,8 +1,3 @@ -mod history; -mod pod_driver_controller; -mod product_logging; -mod spark_k8s_controller; - use std::sync::Arc; use clap::{crate_description, crate_version, Parser}; @@ -23,9 +18,13 @@ use stackable_operator::{ }, }, logging::controller::report_controller_reconciled, - CustomResourceExt, + shared::yaml::SerializeOptions, + YamlSchema, }; -use stackable_spark_k8s_crd::{ +use tracing::info_span; +use tracing_futures::Instrument; + +use crate::crd::{ constants::{ HISTORY_FULL_CONTROLLER_NAME, OPERATOR_NAME, POD_DRIVER_FULL_CONTROLLER_NAME, SPARK_CONTROLLER_NAME, SPARK_FULL_CONTROLLER_NAME, @@ -33,8 +32,12 @@ use stackable_spark_k8s_crd::{ history::SparkHistoryServer, SparkApplication, }; -use tracing::info_span; -use tracing_futures::Instrument; + +mod crd; +mod history; +mod pod_driver_controller; +mod product_logging; +mod spark_k8s_controller; mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); @@ -61,8 +64,10 @@ async fn main() -> anyhow::Result<()> { let opts = Opts::parse(); match opts.cmd { Command::Crd => { - SparkApplication::print_yaml_schema(built_info::PKG_VERSION)?; - SparkHistoryServer::print_yaml_schema(built_info::PKG_VERSION)?; + SparkApplication::merged_crd(SparkApplication::V1Alpha1)? + .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; + SparkHistoryServer::merged_crd(SparkHistoryServer::V1Alpha1)? + .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; } Command::Run(ProductOperatorRun { product_config, @@ -102,7 +107,8 @@ async fn main() -> anyhow::Result<()> { }, )); let app_controller = Controller::new( - watch_namespace.get_api::>(&client), + watch_namespace + .get_api::>(&client), watcher::Config::default(), ) .owns( @@ -188,11 +194,17 @@ async fn main() -> anyhow::Result<()> { }, )); let history_controller = Controller::new( - watch_namespace.get_api::>(&client), + watch_namespace + .get_api::>( + &client, + ), watcher::Config::default(), ) .owns( - watch_namespace.get_api::>(&client), + watch_namespace + .get_api::>( + &client, + ), watcher::Config::default(), ) .owns( diff --git a/rust/operator-binary/src/pod_driver_controller.rs b/rust/operator-binary/src/pod_driver_controller.rs index a2c93d1c..39684620 100644 --- a/rust/operator-binary/src/pod_driver_controller.rs +++ b/rust/operator-binary/src/pod_driver_controller.rs @@ -11,11 +11,10 @@ use stackable_operator::{ logging::controller::ReconcilerError, time::Duration, }; -use stackable_spark_k8s_crd::{ - constants::POD_DRIVER_CONTROLLER_NAME, SparkApplication, SparkApplicationStatus, -}; use strum::{EnumDiscriminants, IntoStaticStr}; +use crate::crd::{constants::POD_DRIVER_CONTROLLER_NAME, v1alpha1, SparkApplicationStatus}; + const LABEL_NAME_INSTANCE: &str = "app.kubernetes.io/instance"; #[derive(Snafu, Debug, EnumDiscriminants)] @@ -24,22 +23,28 @@ const LABEL_NAME_INSTANCE: &str = "app.kubernetes.io/instance"; pub enum Error { #[snafu(display("Label [{LABEL_NAME_INSTANCE}] not found for pod name [{pod_name}]"))] LabelInstanceNotFound { pod_name: String }, + #[snafu(display("Failed to update status for application [{name}]"))] ApplySparkApplicationStatus { source: stackable_operator::client::Error, name: String, }, + #[snafu(display("Pod name not found"))] PodNameNotFound, + #[snafu(display("Namespace not found"))] NamespaceNotFound, + #[snafu(display("Status phase not found for pod [{pod_name}]"))] PodStatusPhaseNotFound { pod_name: String }, + #[snafu(display("Spark application [{name}] not found"))] SparkApplicationNotFound { source: stackable_operator::client::Error, name: String, }, + #[snafu(display("Pod object is invalid"))] InvalidPod { source: error_boundary::InvalidObject, @@ -79,7 +84,7 @@ pub async fn reconcile(pod: Arc>, client: Arc) -> )?; let app = client - .get::( + .get::( app_name.as_ref(), pod.metadata .namespace diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 3f9116ec..852dee11 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -13,9 +13,8 @@ use stackable_operator::{ }, role_utils::RoleGroupRef, }; -use stackable_spark_k8s_crd::constants::{ - LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, VOLUME_MOUNT_PATH_LOG, -}; + +use crate::crd::constants::{LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, VOLUME_MOUNT_PATH_LOG}; #[derive(Snafu, Debug)] pub enum Error { @@ -24,11 +23,13 @@ pub enum Error { source: stackable_operator::client::Error, cm_name: String, }, + #[snafu(display("failed to retrieve the entry {entry} for ConfigMap {cm_name}"))] MissingConfigMapEntry { entry: &'static str, cm_name: String, }, + #[snafu(display("vectorAggregatorConfigMapName must be set"))] MissingVectorAggregatorAddress, } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index dc4dbf3e..74f45b9d 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -7,8 +7,8 @@ use std::{ use product_config::{types::PropertyNameKind, writer::to_java_properties_string}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ - builder, builder::{ + self, configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, pod::{ @@ -52,13 +52,15 @@ use stackable_operator::{ role_utils::RoleGroupRef, time::Duration, }; -use stackable_spark_k8s_crd::{ - constants::*, logdir::ResolvedLogDir, tlscerts, to_spark_env_sh_string, RoleConfig, - SparkApplication, SparkApplicationRole, SparkApplicationStatus, SparkContainer, SubmitConfig, -}; use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ + crd::{ + constants::*, + logdir::ResolvedLogDir, + roles::{RoleConfig, SparkApplicationRole, SparkContainer, SubmitConfig}, + tlscerts, to_spark_env_sh_string, v1alpha1, SparkApplicationStatus, + }, product_logging::{self, resolve_vector_aggregator_address}, Ctx, }; @@ -94,9 +96,7 @@ pub enum Error { }, #[snafu(display("failed to build stark-submit command"))] - BuildCommand { - source: stackable_spark_k8s_crd::Error, - }, + BuildCommand { source: crate::crd::Error }, #[snafu(display("failed to build the pod template config map"))] PodTemplateConfigMap { @@ -116,9 +116,7 @@ pub enum Error { S3TlsCaVerificationNotSupported, #[snafu(display("failed to resolve and merge config"))] - FailedToResolveConfig { - source: stackable_spark_k8s_crd::Error, - }, + FailedToResolveConfig { source: crate::crd::Error }, #[snafu(display("failed to recognise the container name"))] UnrecognisedContainerName, @@ -129,9 +127,7 @@ pub enum Error { }, #[snafu(display("failed to resolve the log dir configuration"))] - LogDir { - source: stackable_spark_k8s_crd::logdir::Error, - }, + LogDir { source: crate::crd::logdir::Error }, #[snafu(display("failed to resolve the Vector aggregator address"))] ResolveVectorAggregatorAddress { source: product_logging::Error }, @@ -157,14 +153,10 @@ pub enum Error { }, #[snafu(display("invalid product config"))] - InvalidProductConfig { - source: stackable_spark_k8s_crd::Error, - }, + InvalidProductConfig { source: crate::crd::Error }, #[snafu(display("invalid submit config"))] - SubmitConfig { - source: stackable_spark_k8s_crd::Error, - }, + SubmitConfig { source: crate::crd::Error }, #[snafu(display("failed to build Labels"))] LabelBuild { @@ -183,9 +175,7 @@ pub enum Error { }, #[snafu(display("failed to create Volumes for SparkApplication"))] - CreateVolumes { - source: stackable_spark_k8s_crd::Error, - }, + CreateVolumes { source: crate::crd::Error }, #[snafu(display("Failed to update status for application {name:?}"))] ApplySparkApplicationStatus { @@ -216,7 +206,7 @@ impl ReconcilerError for Error { } pub async fn reconcile( - spark_application: Arc>, + spark_application: Arc>, ctx: Arc, ) -> Result { tracing::info!("Starting reconcile"); @@ -440,7 +430,7 @@ pub async fn reconcile( } fn init_containers( - spark_application: &SparkApplication, + spark_application: &v1alpha1::SparkApplication, logging: &Logging, s3conn: &Option, logdir: &Option, @@ -595,7 +585,7 @@ fn init_containers( #[allow(clippy::too_many_arguments)] fn pod_template( - spark_application: &SparkApplication, + spark_application: &v1alpha1::SparkApplication, role: SparkApplicationRole, config: &RoleConfig, volumes: &[Volume], @@ -688,7 +678,7 @@ fn pod_template( #[allow(clippy::too_many_arguments)] fn pod_template_config_map( - spark_application: &SparkApplication, + spark_application: &v1alpha1::SparkApplication, role: SparkApplicationRole, merged_config: &RoleConfig, product_config: Option<&HashMap>>, @@ -807,7 +797,7 @@ fn pod_template_config_map( } fn submit_job_config_map( - spark_application: &SparkApplication, + spark_application: &v1alpha1::SparkApplication, product_config: Option<&HashMap>>, spark_image: &ResolvedProductImage, ) -> Result { @@ -866,7 +856,7 @@ fn submit_job_config_map( #[allow(clippy::too_many_arguments)] fn spark_job( - spark_application: &SparkApplication, + spark_application: &v1alpha1::SparkApplication, spark_image: &ResolvedProductImage, serviceaccount: &ServiceAccount, env: &[EnvVar], @@ -984,7 +974,7 @@ fn spark_job( /// Both objects have an owner reference to the SparkApplication, as well as the same name as the app. /// They are deleted when the job is deleted. fn build_spark_role_serviceaccount( - spark_app: &SparkApplication, + spark_app: &v1alpha1::SparkApplication, spark_image: &ResolvedProductImage, ) -> Result<(ServiceAccount, RoleBinding)> { let sa_name = spark_app.metadata.name.as_ref().unwrap().to_string(); @@ -1039,7 +1029,7 @@ fn security_context() -> PodSecurityContext { } pub fn error_policy( - _obj: Arc>, + _obj: Arc>, error: &Error, _ctx: Arc, ) -> Action {