Skip to content

Commit 34e4270

Browse files
Merge pull request #249 from lpiwowar/18.0-fr1
[18.0-fr1][OSPRH-10386] Update logic for workflows
2 parents 41bb402 + 1f04563 commit 34e4270

File tree

5 files changed

+378
-300
lines changed

5 files changed

+378
-300
lines changed

controllers/ansibletest_controller.go

Lines changed: 58 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package controllers
1818

1919
import (
2020
"context"
21+
"errors"
22+
"fmt"
2123
"strconv"
2224
"time"
2325

@@ -38,7 +40,6 @@ import (
3840
rbacv1 "k8s.io/api/rbac/v1"
3941
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
4042
ctrl "sigs.k8s.io/controller-runtime"
41-
"sigs.k8s.io/controller-runtime/pkg/client"
4243
"sigs.k8s.io/controller-runtime/pkg/log"
4344
)
4445

@@ -73,9 +74,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
7374
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
7475
Log := r.GetLogger(ctx)
7576

76-
// How much time should we wait before calling Reconcile loop when there is a failure
77-
requeueAfter := time.Second * 60
78-
7977
// Fetch the ansible instance
8078
instance := &testv1beta1.AnsibleTest{}
8179
err := r.Client.Get(ctx, req.NamespacedName, instance)
@@ -86,11 +84,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8684
return ctrl.Result{}, err
8785
}
8886

89-
workflowActive := false
90-
if len(instance.Spec.Workflow) > 0 {
91-
workflowActive = true
92-
}
93-
9487
// Create a helper
9588
helper, err := helper.NewHelper(
9689
instance,
@@ -147,33 +140,55 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
147140

148141
}
149142

150-
// Ensure that there is an external counter and read its value
151-
// We use the external counter to keep track of the workflow steps
152-
r.WorkflowStepCounterCreate(ctx, instance, helper)
153-
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
154-
if externalWorkflowCounter == -1 {
155-
return ctrl.Result{RequeueAfter: requeueAfter}, nil
156-
}
143+
workflowLength := len(instance.Spec.Workflow)
144+
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)
157145

158-
// Each job that is being executed by the test operator has
159-
currentWorkflowStep := 0
160-
runningAnsibleJob := &batchv1.Job{}
161-
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
162-
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
163-
if err != nil && !k8s_errors.IsNotFound(err) {
146+
switch nextAction {
147+
case Failure:
164148
return ctrl.Result{}, err
165-
} else if err == nil {
166-
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
167-
}
168149

169-
if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
170-
// The job created by the instance was completed. Release the lock
171-
// so that other instances can spawn a job.
172-
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
173-
Log.Info("Job completed")
150+
case Wait:
151+
Log.Info(InfoWaitingOnJob)
152+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil
153+
154+
case EndTesting:
155+
// All jobs created by the instance were completed. Release the lock
156+
// so that other instances can spawn their jobs.
174157
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
175-
return ctrl.Result{}, err
158+
Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName))
159+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
176160
}
161+
162+
instance.Status.Conditions.MarkTrue(
163+
condition.DeploymentReadyCondition,
164+
condition.DeploymentReadyMessage)
165+
166+
Log.Info(InfoTestingCompleted)
167+
return ctrl.Result{}, nil
168+
169+
case CreateFirstJob:
170+
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
171+
if !lockAcquired {
172+
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
173+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
174+
}
175+
176+
Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))
177+
178+
case CreateNextJob:
179+
// Confirm that we still hold the lock. This is useful to check if for
180+
// example somebody / something deleted the lock and it got claimed by
181+
// another instance. This is considered to be an error state.
182+
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
183+
if !lockAcquired {
184+
Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName)
185+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
186+
}
187+
188+
Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))
189+
190+
default:
191+
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
177192
}
178193

179194
// Service account, role, binding
@@ -206,9 +221,9 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
206221

207222
serviceLabels := map[string]string{
208223
common.AppSelector: ansibletest.ServiceName,
209-
"workflowStep": strconv.Itoa(externalWorkflowCounter),
210-
"instanceName": instance.Name,
211-
"operator": "test-operator",
224+
workflowStepLabel: strconv.Itoa(nextWorkflowStep),
225+
instanceNameLabel: instance.Name,
226+
operatorNameLabel: "test-operator",
212227
}
213228

214229
// Create PersistentVolumeClaim
@@ -227,47 +242,26 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
227242
}
228243
// Create PersistentVolumeClaim - end
229244

230-
// If the current job is executing the last workflow step -> do not create another job
231-
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
232-
return ctrl.Result{}, nil
233-
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
234-
return ctrl.Result{}, nil
235-
}
236-
237-
// We are about to start job that spawns the pod with tests.
238-
// This lock ensures that there is always only one pod running.
239-
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
240-
if !lockAcquired {
241-
Log.Info("Can not acquire lock")
242-
requeueAfter := time.Second * 60
243-
return ctrl.Result{RequeueAfter: requeueAfter}, err
244-
}
245-
Log.Info("Lock acquired")
246-
247-
if workflowActive {
248-
r.WorkflowStepCounterIncrease(ctx, instance, helper)
249-
}
250-
251245
instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)
252246

253247
// Create a new job
254248
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
255-
jobName := r.GetJobName(instance, externalWorkflowCounter)
256-
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
249+
jobName := r.GetJobName(instance, nextWorkflowStep)
250+
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
257251
logsPVCName := r.GetPVCLogsName(instance, 0)
258252
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
259-
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
253+
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
260254
if err != nil {
261255
return ctrl.Result{}, err
262256
}
263257

264-
if externalWorkflowCounter < len(instance.Spec.Workflow) {
265-
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
266-
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
258+
if nextWorkflowStep < len(instance.Spec.Workflow) {
259+
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
260+
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
267261
}
268262

269-
if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
270-
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
263+
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
264+
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
271265
}
272266
}
273267

@@ -279,7 +273,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
279273
mountCerts,
280274
envVars,
281275
workflowOverrideParams,
282-
externalWorkflowCounter,
276+
nextWorkflowStep,
283277
containerImage,
284278
privileged,
285279
)
@@ -316,7 +310,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
316310
return ctrlResult, nil
317311
}
318312
// Create a new job - end
319-
320313
Log.Info("Reconciled Service successfully")
321314
return ctrl.Result{}, nil
322315
}

0 commit comments

Comments
 (0)