Skip to content

Commit 90a6236

Browse files
authored
Add option to use the kubernetes scheduler for workflow pods (actions#111)
* Add option to use kube scheduler This should only be used when rwx volumes are supported or when using a single node cluster. * Add option to set timeout for prepare job If the kube scheduler is used to hold jobs until sufficient resources are available, then prepare job needs to wait for a longer period until the workflow pod is running. This timeout will mostly need an increase in cases where many jobs are triggered which together exceed the resources available in the cluster. The workflows can then be gracefully handled later when sufficient resources become available again. * Skip name override warning when names match or job extension * Add guard for positive timeouts with a warning * Write out ReadWriteMany in full
1 parent 496287d commit 90a6236

File tree

4 files changed

+62
-7
lines changed

4 files changed

+62
-7
lines changed

packages/k8s/src/hooks/prepare-job.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import {
1313
createPod,
1414
isPodContainerAlpine,
1515
prunePods,
16-
waitForPodPhases
16+
waitForPodPhases,
17+
getPrepareJobTimeoutSeconds
1718
} from '../k8s'
1819
import {
1920
containerVolumes,
@@ -91,7 +92,8 @@ export async function prepareJob(
9192
await waitForPodPhases(
9293
createdPod.metadata.name,
9394
new Set([PodPhase.RUNNING]),
94-
new Set([PodPhase.PENDING])
95+
new Set([PodPhase.PENDING]),
96+
getPrepareJobTimeoutSeconds()
9597
)
9698
} catch (err) {
9799
await prunePods()

packages/k8s/src/k8s/index.ts

+37-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import {
1010
getVolumeClaimName,
1111
RunnerInstanceLabel
1212
} from '../hooks/constants'
13-
import { PodPhase, mergePodSpecWithOptions, mergeObjectMeta } from './utils'
13+
import {
14+
PodPhase,
15+
mergePodSpecWithOptions,
16+
mergeObjectMeta,
17+
useKubeScheduler
18+
} from './utils'
1419

1520
const kc = new k8s.KubeConfig()
1621

@@ -20,6 +25,8 @@ const k8sApi = kc.makeApiClient(k8s.CoreV1Api)
2025
const k8sBatchV1Api = kc.makeApiClient(k8s.BatchV1Api)
2126
const k8sAuthorizationV1Api = kc.makeApiClient(k8s.AuthorizationV1Api)
2227

28+
const DEFAULT_WAIT_FOR_POD_TIME_SECONDS = 10 * 60 // 10 min
29+
2330
export const POD_VOLUME_NAME = 'work'
2431

2532
export const requiredPermissions = [
@@ -86,7 +93,11 @@ export async function createPod(
8693
appPod.spec = new k8s.V1PodSpec()
8794
appPod.spec.containers = containers
8895
appPod.spec.restartPolicy = 'Never'
89-
appPod.spec.nodeName = await getCurrentNodeName()
96+
97+
if (!useKubeScheduler()) {
98+
appPod.spec.nodeName = await getCurrentNodeName()
99+
}
100+
90101
const claimName = getVolumeClaimName()
91102
appPod.spec.volumes = [
92103
{
@@ -142,7 +153,10 @@ export async function createJob(
142153
job.spec.template.metadata.annotations = {}
143154
job.spec.template.spec.containers = [container]
144155
job.spec.template.spec.restartPolicy = 'Never'
145-
job.spec.template.spec.nodeName = await getCurrentNodeName()
156+
157+
if (!useKubeScheduler()) {
158+
job.spec.template.spec.nodeName = await getCurrentNodeName()
159+
}
146160

147161
const claimName = getVolumeClaimName()
148162
job.spec.template.spec.volumes = [
@@ -346,7 +360,7 @@ export async function waitForPodPhases(
346360
podName: string,
347361
awaitingPhases: Set<PodPhase>,
348362
backOffPhases: Set<PodPhase>,
349-
maxTimeSeconds = 10 * 60 // 10 min
363+
maxTimeSeconds = DEFAULT_WAIT_FOR_POD_TIME_SECONDS
350364
): Promise<void> {
351365
const backOffManager = new BackOffManager(maxTimeSeconds)
352366
let phase: PodPhase = PodPhase.UNKNOWN
@@ -369,6 +383,25 @@ export async function waitForPodPhases(
369383
}
370384
}
371385

386+
export function getPrepareJobTimeoutSeconds(): number {
387+
const envTimeoutSeconds =
388+
process.env['ACTIONS_RUNNER_PREPARE_JOB_TIMEOUT_SECONDS']
389+
390+
if (!envTimeoutSeconds) {
391+
return DEFAULT_WAIT_FOR_POD_TIME_SECONDS
392+
}
393+
394+
const timeoutSeconds = parseInt(envTimeoutSeconds, 10)
395+
if (!timeoutSeconds || timeoutSeconds <= 0) {
396+
core.warning(
397+
`Prepare job timeout is invalid ("${timeoutSeconds}"): use an int > 0`
398+
)
399+
return DEFAULT_WAIT_FOR_POD_TIME_SECONDS
400+
}
401+
402+
return timeoutSeconds
403+
}
404+
372405
async function getPodPhase(podName: string): Promise<PodPhase> {
373406
const podPhaseLookup = new Set<string>([
374407
PodPhase.PENDING,

packages/k8s/src/k8s/utils.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ import { Mount } from 'hooklib'
66
import * as path from 'path'
77
import { v1 as uuidv4 } from 'uuid'
88
import { POD_VOLUME_NAME } from './index'
9+
import { JOB_CONTAINER_EXTENSION_NAME } from '../hooks/constants'
910

1011
export const DEFAULT_CONTAINER_ENTRY_POINT_ARGS = [`-f`, `/dev/null`]
1112
export const DEFAULT_CONTAINER_ENTRY_POINT = 'tail'
1213

1314
export const ENV_HOOK_TEMPLATE_PATH = 'ACTIONS_RUNNER_CONTAINER_HOOK_TEMPLATE'
15+
export const ENV_USE_KUBE_SCHEDULER = 'ACTIONS_RUNNER_USE_KUBE_SCHEDULER'
1416

1517
export function containerVolumes(
1618
userMountVolumes: Mount[] = [],
@@ -177,7 +179,9 @@ export function mergeContainerWithOptions(
177179
): void {
178180
for (const [key, value] of Object.entries(from)) {
179181
if (key === 'name') {
180-
core.warning("Skipping name override: name can't be overwritten")
182+
if (value !== base.name && value !== JOB_CONTAINER_EXTENSION_NAME) {
183+
core.warning("Skipping name override: name can't be overwritten")
184+
}
181185
continue
182186
} else if (key === 'image') {
183187
core.warning("Skipping image override: image can't be overwritten")
@@ -257,6 +261,10 @@ export function readExtensionFromFile(): k8s.V1PodTemplateSpec | undefined {
257261
return doc as k8s.V1PodTemplateSpec
258262
}
259263

264+
export function useKubeScheduler(): boolean {
265+
return process.env[ENV_USE_KUBE_SCHEDULER] === 'true'
266+
}
267+
260268
export enum PodPhase {
261269
PENDING = 'Pending',
262270
RUNNING = 'Running',

packages/k8s/tests/prepare-job-test.ts

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { createContainerSpec, prepareJob } from '../src/hooks/prepare-job'
55
import { TestHelper } from './test-setup'
66
import {
77
ENV_HOOK_TEMPLATE_PATH,
8+
ENV_USE_KUBE_SCHEDULER,
89
generateContainerName,
910
readExtensionFromFile
1011
} from '../src/k8s/utils'
@@ -130,6 +131,17 @@ describe('Prepare job', () => {
130131
expect(got.spec?.containers[2].args).toEqual(['-c', 'sleep 60'])
131132
})
132133

134+
it('should not throw exception using kube scheduler', async () => {
135+
// only for ReadWriteMany volumes or single node cluster
136+
process.env[ENV_USE_KUBE_SCHEDULER] = 'true'
137+
138+
await expect(
139+
prepareJob(prepareJobData.args, prepareJobOutputFilePath)
140+
).resolves.not.toThrow()
141+
142+
delete process.env[ENV_USE_KUBE_SCHEDULER]
143+
})
144+
133145
test.each([undefined, null, []])(
134146
'should not throw exception when portMapping=%p',
135147
async pm => {

0 commit comments

Comments
 (0)