Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
apiVersion: config.karmada.io/v1alpha1
kind: ResourceInterpreterCustomization
metadata:
name: declarative-configuration-sparkapplication
spec:
target:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
customizations:
healthInterpretation:
luaScript: >
function InterpretHealth(observedObj)
if not observedObj or
not observedObj.status or
not observedObj.status.applicationState or
not observedObj.status.applicationState.state then
return false
end

-- Only the 'FAILED' state is considered unhealthy. All other states are treated
-- as healthy or recoverable.
local state = observedObj.status.applicationState.state
if state == 'FAILED' then
return false
end
return true
end
componentResource:
luaScript: |
local kube = require("kube")

local function isempty(s)
return s == nil or s == ''
end

-- Safe fetch of deeply nested table fields.
local function get(obj, path)
local cur = obj
for i = 1, #path do
if cur == nil then
return nil
end
cur = cur[path[i]]
end
return cur
end

-- Normalize possibly-string numbers with a default.
local function to_num(v, default)
if v == nil or v == '' then
return default
end
local n = tonumber(v)
if n ~= nil then
return n
end
return default
end

-- JSON-safe deep clone: strings/numbers/booleans/tables. Needed to prevent shared table references.
local function clone_plain(val, seen)
local tv = type(val)
if tv ~= "table" then
if tv == "string" or tv == "number" or tv == "boolean" or tv == "nil" then
return val
end
return nil
end
seen = seen or {}
if seen[val] then return nil end
seen[val] = true
local out = {}
for k, v in pairs(val) do
local tk = type(k)
if tk == "string" or tk == "number" then
local cv = clone_plain(v, seen)
if cv ~= nil then out[k] = cv end
end
end
seen[val] = nil
return out
end

local function apply_pod_template(pt_spec, requires)
if pt_spec == nil then
return
end

local nodeSelector = clone_plain(pt_spec.nodeSelector)
local tolerations = clone_plain(pt_spec.tolerations)
local priority = pt_spec.priorityClassName
local hardNodeAffinity = nil
if pt_spec.affinity and pt_spec.affinity.nodeAffinity then
hardNodeAffinity = clone_plain(pt_spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution)
end

-- Only create nodeClaim if there is content
if nodeSelector ~= nil or tolerations ~= nil or hardNodeAffinity ~= nil then
requires.nodeClaim = requires.nodeClaim or {}
requires.nodeClaim.nodeSelector = nodeSelector
requires.nodeClaim.tolerations = tolerations
requires.nodeClaim.hardNodeAffinity = hardNodeAffinity
end

if not isempty(priority) then
requires.priorityClassName = priority
end
end

function GetComponents(observedObj)
local components = {}

-- Driver
local drv_replicas = 1 -- Spark Driver always has 1 instance
local drv_requires = {
resourceRequest = {}
}

local drv_cpu = get(observedObj, {"spec","driver","cores"})
local drv_memory = get(observedObj, {"spec","driver","memory"})
drv_requires.resourceRequest.cpu = drv_cpu
drv_requires.resourceRequest.memory = kube.getResourceQuantity(drv_memory)

local drv_gpu = get(observedObj, {"spec","driver","gpu"})
if drv_gpu ~= nil then
local gpu_name = drv_gpu.name
local gpu_qty = to_num(drv_gpu.quantity, 0)
if not isempty(gpu_name) and gpu_qty > 0 then
drv_requires.resourceRequest[gpu_name] = gpu_qty
end
end

apply_pod_template(get(observedObj, {"spec","driver"}), drv_requires)

local driverComponent = {
name = "driver",
replicas = drv_replicas,
replicaRequirements = drv_requires
}
table.insert(components, driverComponent)

-- Executor
local exe_replicas = to_num(get(observedObj, {"spec","executor","instances"}), 1)
local exe_requires = {
resourceRequest = {}
}

local exe_cpu = get(observedObj, {"spec","executor","cores"})
local exe_memory = get(observedObj, {"spec","executor","memory"})
exe_requires.resourceRequest.cpu = exe_cpu
exe_requires.resourceRequest.memory = kube.getResourceQuantity(exe_memory)

local exe_gpu = get(observedObj, {"spec","executor","gpu"})
if exe_gpu ~= nil then
local gpu_name = exe_gpu.name
local gpu_qty = to_num(exe_gpu.quantity, 0)
if not isempty(gpu_name) and gpu_qty > 0 then
exe_requires.resourceRequest[gpu_name] = gpu_qty
end
end

apply_pod_template(get(observedObj, {"spec","executor"}), exe_requires)

local executorComponent = {
name = "executor",
replicas = exe_replicas,
replicaRequirements = exe_requires
}
table.insert(components, executorComponent)

return components
end
statusAggregation:
luaScript: >
function AggregateStatus(desiredObj, statusItems)
if statusItems == nil then
return desiredObj
end
if desiredObj.status == nil then
desiredObj.status = {}
end

local sparkApplicationId = ""
local applicationState = {}
local completionTime = ""
local driverInfo = {}
local executionAttempts = 0
local executorState = {}
local submissionAttempts = 0
local lastSubmissionAttemptTime = ""
local submissionID = ""
local terminationTime = ""

for i = 1, #statusItems do
currentStatus = statusItems[i].status
if currentStatus ~= nil then
sparkApplicationId = currentStatus.sparkApplicationId
applicationState = currentStatus.applicationState
completionTime = currentStatus.completionTime
driverInfo = currentStatus.driverInfo
executionAttempts = currentStatus.executionAttempts
executorState = currentStatus.executorState
submissionAttempts = currentStatus.submissionAttempts
lastSubmissionAttemptTime = currentStatus.lastSubmissionAttemptTime
submissionID = currentStatus.submissionID
terminationTime = currentStatus.terminationTime
end
end

desiredObj.status.sparkApplicationId = sparkApplicationId
desiredObj.status.applicationState = applicationState
desiredObj.status.completionTime = completionTime
desiredObj.status.driverInfo = driverInfo
desiredObj.status.executionAttempts = executionAttempts
desiredObj.status.executorState = executorState
desiredObj.status.submissionAttempts = submissionAttempts
desiredObj.status.lastSubmissionAttemptTime = lastSubmissionAttemptTime
desiredObj.status.submissionID = submissionID
desiredObj.status.terminationTime = terminationTime
return desiredObj
end
statusReflection:
luaScript: >
function ReflectStatus(observedObj)
local status = {}
if observedObj == nil or observedObj.status == nil then
return status
end

status.applicationState = observedObj.status.applicationState
status.driverInfo = observedObj.status.driverInfo
status.executorState = observedObj.status.executorState
status.sparkApplicationId = observedObj.status.sparkApplicationId
status.completionTime = observedObj.status.completionTime
status.lastSubmissionAttemptTime = observedObj.status.lastSubmissionAttemptTime
status.submissionAttempts = observedObj.status.submissionAttempts
status.executionAttempts = observedObj.status.executionAttempts
status.submissionID = observedObj.status.submissionID
status.terminationTime = observedObj.status.terminationTime
return status
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tests:
- desiredInputPath: testdata/desired-sparkapplication.yaml
statusInputPath: testdata/status-file.yaml
operation: AggregateStatus
- desiredInputPath: testdata/desired-sparkapplication.yaml
operation: InterpretComponent
- observedInputPath: testdata/observed-sparkapplication.yaml
operation: InterpretHealth
- observedInputPath: testdata/observed-sparkapplication.yaml
operation: InterpretStatus
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: "spark:3.5.0"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
sparkVersion: "3.5.0"
sparkUIOptions:
serviceLabels:
test-label/v1: 'true'
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 2
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-operator-spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 2
memory: "512m"
labels:
version: 3.5.0
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"sparkoperator.k8s.io/v1beta2","kind":"SparkApplication","metadata":{"annotations":{},"name":"spark-pi","namespace":"default"},"spec":{"driver":{"cores":2,"labels":{"version":"3.5.0"},"memory":"512m","serviceAccount":"spark-operator-spark","volumeMounts":[{"mountPath":"/tmp","name":"test-volume"}]},"executor":{"cores":1,"instances":2,"labels":{"version":"3.5.0"},"memory":"512m","volumeMounts":[{"mountPath":"/tmp","name":"test-volume"}]},"image":"spark:3.5.0","imagePullPolicy":"Always","mainApplicationFile":"local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar","mainClass":"org.apache.spark.examples.SparkPi","mode":"cluster","restartPolicy":{"type":"Never"},"sparkUIOptions":{"serviceLabels":{"test-label/v1":"true"}},"sparkVersion":"3.5.0","type":"Scala","volumes":[{"hostPath":{"path":"/tmp","type":"Directory"},"name":"test-volume"}]}}
propagationpolicy.karmada.io/name: spark-pi-pp
propagationpolicy.karmada.io/namespace: default
creationTimestamp: "2025-10-12T13:57:17Z"
generation: 1
labels:
propagationpolicy.karmada.io/permanent-id: 5c1aa82e-d727-4ec4-8d58-5d3cff7335e1
name: spark-pi
namespace: default
resourceVersion: "1152"
uid: 30dde6fb-dc13-40fd-b131-4839fdf7fddd
spec:
driver:
cores: 2
labels:
version: 3.5.0
memory: 512m
serviceAccount: spark-operator-spark
volumeMounts:
- mountPath: /tmp
name: test-volume
executor:
cores: 1
instances: 2
labels:
version: 3.5.0
memory: 512m
volumeMounts:
- mountPath: /tmp
name: test-volume
image: spark:3.5.0
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar
mainClass: org.apache.spark.examples.SparkPi
mode: cluster
restartPolicy:
type: Never
sparkUIOptions:
serviceLabels:
test-label/v1: "true"
sparkVersion: 3.5.0
type: Scala
volumes:
- hostPath:
path: /tmp
type: Directory
name: test-volume
status:
applicationState:
state: COMPLETED
driverInfo:
podName: spark-pi-driver
webUIAddress: 10.11.254.226:4040
webUIPort: 4040
webUIServiceName: spark-pi-ui-svc
executionAttempts: 1
executorState:
spark-pi-b5777a99d8b732a7-exec-1: COMPLETED
spark-pi-b5777a99d8b732a7-exec-2: COMPLETED
lastSubmissionAttemptTime: "2025-10-12T13:57:17Z"
sparkApplicationId: spark-ff27607fd312454b92455e2feabdd343
submissionAttempts: 1
submissionID: 0df4a04b-620b-425e-997a-e4404010e26a
terminationTime: "2025-10-12T13:58:39Z"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
applied: true
clusterName: member1
health: Healthy
status:
applicationState:
state: COMPLETED
driverInfo:
podName: spark-pi-driver
webUIAddress: 10.11.254.226:4040
webUIPort: 4040
webUIServiceName: spark-pi-ui-svc
executionAttempts: 1
executorState:
spark-pi-b5777a99d8b732a7-exec-1: COMPLETED
spark-pi-b5777a99d8b732a7-exec-2: COMPLETED
lastSubmissionAttemptTime: "2025-10-12T13:57:17Z"
sparkApplicationId: spark-ff27607fd312454b92455e2feabdd343
submissionAttempts: 1
submissionID: 0df4a04b-620b-425e-997a-e4404010e26a
terminationTime: "2025-10-12T13:58:39Z"