Skip to content

Commit 354817a

Browse files
author
Alexander Matyushentsev
authored
fix: sync should apply Namespaces and CRDs before resources that depend on them (argoproj#225)
* fix: sync should apply Namespaces and CRDs before resources that depend on them Signed-off-by: Alexander Matyushentsev <[email protected]>
1 parent c5b7114 commit 354817a

File tree

4 files changed

+136
-52
lines changed

4 files changed

+136
-52
lines changed

pkg/sync/sync_context.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -625,16 +625,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
625625
task.liveObj = sc.liveObj(task.targetObj)
626626
}
627627

628-
// enrich tasks with the result
629-
for _, task := range tasks {
630-
result, ok := sc.syncRes[task.resultKey()]
631-
if ok {
632-
task.syncStatus = result.Status
633-
task.operationState = result.HookPhase
634-
task.message = result.Message
635-
}
636-
}
637-
638628
// check permissions
639629
for _, task := range tasks {
640630
serverRes, err := kube.ServerResourceForGroupVersionKind(sc.disco, task.groupVersionKind())
@@ -683,7 +673,17 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
683673
}
684674
}
685675

686-
sort.Sort(tasks)
676+
tasks.Sort()
677+
678+
// finally enrich tasks with the result
679+
for _, task := range tasks {
680+
result, ok := sc.syncRes[task.resultKey()]
681+
if ok {
682+
task.syncStatus = result.Status
683+
task.operationState = result.HookPhase
684+
task.message = result.Message
685+
}
686+
}
687687

688688
return tasks, successful
689689
}

pkg/sync/sync_task.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,7 @@ type syncTask struct {
2323
syncStatus common.ResultCode
2424
operationState common.OperationPhase
2525
message string
26-
}
27-
28-
// isDependsOn returns true if given task depends on current task and should be executed after
29-
func (t *syncTask) isDependsOn(other *syncTask) bool {
30-
otherObj := other.obj()
31-
thisGVK := t.obj().GroupVersionKind()
32-
otherGVK := otherObj.GroupVersionKind()
33-
34-
if isCRDOfGroupKind(thisGVK.Group, thisGVK.Kind, otherObj) {
35-
return true
36-
}
37-
38-
if otherGVK.Group == "" && otherGVK.Kind == kube.NamespaceKind && otherObj.GetName() == t.obj().GetNamespace() {
39-
return true
40-
}
41-
return false
26+
waveOverride *int
4227
}
4328

4429
func ternary(val bool, a, b string) string {
@@ -73,6 +58,9 @@ func (t *syncTask) obj() *unstructured.Unstructured {
7358
}
7459

7560
func (t *syncTask) wave() int {
61+
if t.waveOverride != nil {
62+
return *t.waveOverride
63+
}
7664
return syncwaves.Wave(t.obj())
7765
}
7866

pkg/sync/sync_tasks.go

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package sync
22

33
import (
4+
"fmt"
5+
"sort"
46
"strings"
57

8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
610
"github.com/argoproj/gitops-engine/pkg/sync/common"
11+
"github.com/argoproj/gitops-engine/pkg/utils/kube"
712
)
813

914
// kindOrder represents the correct order of Kubernetes resources within a manifest
@@ -74,12 +79,6 @@ func (s syncTasks) Less(i, j int) bool {
7479
tA := s[i]
7580
tB := s[j]
7681

77-
if tA.isDependsOn(tB) {
78-
return false
79-
} else if tB.isDependsOn(tA) {
80-
return true
81-
}
82-
8382
d := syncPhaseOrder[tA.phase] - syncPhaseOrder[tB.phase]
8483
if d != 0 {
8584
return d < 0
@@ -103,6 +102,73 @@ func (s syncTasks) Less(i, j int) bool {
103102
return a.GetName() < b.GetName()
104103
}
105104

105+
func (s syncTasks) Sort() {
106+
sort.Sort(s)
107+
// make sure namespaces are created before resources referencing namespaces
108+
s.adjustDeps(func(obj *unstructured.Unstructured) (string, bool) {
109+
return obj.GetName(), obj.GetKind() == kube.NamespaceKind && obj.GroupVersionKind().Group == ""
110+
}, func(obj *unstructured.Unstructured) (string, bool) {
111+
return obj.GetNamespace(), obj.GetNamespace() != ""
112+
})
113+
// make sure CRDs are created before CRs
114+
s.adjustDeps(func(obj *unstructured.Unstructured) (string, bool) {
115+
if kube.IsCRD(obj) {
116+
crdGroup, ok, err := unstructured.NestedString(obj.Object, "spec", "group")
117+
if err != nil || !ok {
118+
return "", false
119+
}
120+
crdKind, ok, err := unstructured.NestedString(obj.Object, "spec", "names", "kind")
121+
if err != nil || !ok {
122+
return "", false
123+
}
124+
return fmt.Sprintf("%s/%s", crdGroup, crdKind), true
125+
}
126+
return "", false
127+
}, func(obj *unstructured.Unstructured) (string, bool) {
128+
gk := obj.GroupVersionKind()
129+
return fmt.Sprintf("%s/%s", gk.Group, gk.Kind), true
130+
})
131+
}
132+
133+
// adjust order of tasks and bubble up tasks which are dependencies of other tasks
134+
// (e.g. namespace sync should happen before resources that resides in that namespace)
135+
func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) {
136+
// store dependency key and first occurrence of resource referencing the dependency
137+
firstIndexByDepKey := map[string]int{}
138+
139+
for i, t := range s {
140+
if t.targetObj == nil {
141+
continue
142+
}
143+
144+
if depKey, ok := isDep(t.targetObj); ok {
145+
// if tasks is a dependency then insert if before first task that reference it
146+
if index, ok := firstIndexByDepKey[depKey]; ok {
147+
// wave and sync phase of dependency resource must be same as wave and phase of resource that depend on it
148+
wave := s[index].wave()
149+
t.waveOverride = &wave
150+
t.phase = s[index].phase
151+
152+
for j := i; j > index; j-- {
153+
s[j] = s[j-1]
154+
}
155+
s[index] = t
156+
// increase previously collected indexes by 1
157+
for ns, firstIndex := range firstIndexByDepKey {
158+
if firstIndex >= index {
159+
firstIndexByDepKey[ns] = firstIndex + 1
160+
}
161+
}
162+
}
163+
} else if depKey, ok := doesRefDep(t.targetObj); ok {
164+
// if task is referencing the dependency then store first index of it
165+
if _, ok := firstIndexByDepKey[depKey]; !ok {
166+
firstIndexByDepKey[depKey] = i
167+
}
168+
}
169+
}
170+
}
171+
106172
func (s syncTasks) Filter(predicate func(task *syncTask) bool) (tasks syncTasks) {
107173
for _, task := range s {
108174
if predicate(task) {

pkg/sync/sync_tasks_test.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ func TestAnySyncTasks(t *testing.T) {
3333
return task.name() == "does-not-exist"
3434
})
3535
assert.False(t, res)
36-
3736
}
3837

3938
func TestAllSyncTasks(t *testing.T) {
@@ -369,37 +368,76 @@ func TestSyncNamespaceAgainstCRD(t *testing.T) {
369368
}
370369

371370
func TestSyncTasksSort_NamespaceAndObjectInNamespace(t *testing.T) {
372-
deployment := &syncTask{
371+
hook1 := &syncTask{
373372
phase: common.SyncPhasePreSync,
374373
targetObj: &unstructured.Unstructured{
375374
Object: map[string]interface{}{
376375
"kind": "Job",
377376
"metadata": map[string]interface{}{
378-
"namespace": "myNamespace",
379-
"name": "mySyncHookJob",
377+
"namespace": "myNamespace1",
378+
"name": "mySyncHookJob1",
380379
},
381380
},
382381
}}
383-
namespace := &syncTask{
382+
hook2 := &syncTask{
383+
phase: common.SyncPhasePreSync,
384+
targetObj: &unstructured.Unstructured{
385+
Object: map[string]interface{}{
386+
"kind": "Job",
387+
"metadata": map[string]interface{}{
388+
"namespace": "myNamespace2",
389+
"name": "mySyncHookJob2",
390+
},
391+
},
392+
}}
393+
namespace1 := &syncTask{
384394
targetObj: &unstructured.Unstructured{
385395
Object: map[string]interface{}{
386396
"kind": "Namespace",
387397
"metadata": map[string]interface{}{
388-
"name": "myNamespace",
398+
"name": "myNamespace1",
399+
"annotations": map[string]string{
400+
"argocd.argoproj.io/sync-wave": "1",
401+
},
402+
},
403+
},
404+
},
405+
}
406+
namespace2 := &syncTask{
407+
targetObj: &unstructured.Unstructured{
408+
Object: map[string]interface{}{
409+
"kind": "Namespace",
410+
"metadata": map[string]interface{}{
411+
"name": "myNamespace2",
412+
"annotations": map[string]string{
413+
"argocd.argoproj.io/sync-wave": "2",
414+
},
389415
},
390416
},
391417
},
392418
}
393419

394-
unsorted := syncTasks{deployment, namespace}
395-
sort.Sort(unsorted)
420+
unsorted := syncTasks{hook1, hook2, namespace1, namespace2}
421+
unsorted.Sort()
396422

397-
assert.Equal(t, syncTasks{namespace, deployment}, unsorted)
423+
assert.Equal(t, syncTasks{namespace1, hook1, namespace2, hook2}, unsorted)
424+
assert.Equal(t, 0, namespace1.wave())
425+
assert.Equal(t, common.SyncPhase(common.SyncPhasePreSync), namespace1.phase)
426+
assert.Equal(t, 0, namespace2.wave())
427+
assert.Equal(t, common.SyncPhase(common.SyncPhasePreSync), namespace2.phase)
398428
}
399429

400430
func TestSyncTasksSort_CRDAndCR(t *testing.T) {
401-
crd := &syncTask{
431+
cr := &syncTask{
402432
phase: common.SyncPhasePreSync,
433+
targetObj: &unstructured.Unstructured{
434+
Object: map[string]interface{}{
435+
"kind": "Workflow",
436+
"apiVersion": "argoproj.io/v1",
437+
},
438+
},
439+
}
440+
crd := &syncTask{
403441
targetObj: &unstructured.Unstructured{
404442
Object: map[string]interface{}{
405443
"apiVersion": "apiextensions.k8s.io/v1",
@@ -412,17 +450,9 @@ func TestSyncTasksSort_CRDAndCR(t *testing.T) {
412450
},
413451
},
414452
}}
415-
cr := &syncTask{
416-
targetObj: &unstructured.Unstructured{
417-
Object: map[string]interface{}{
418-
"kind": "Workflow",
419-
"apiVersion": "argoproj.io/v1",
420-
},
421-
},
422-
}
423453

424454
unsorted := syncTasks{cr, crd}
425-
sort.Sort(unsorted)
455+
unsorted.Sort()
426456

427457
assert.Equal(t, syncTasks{crd, cr}, unsorted)
428458
}

0 commit comments

Comments
 (0)