Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Commit 6f60d76

Browse files
committed
agent: replace taskChain with sorted []task
Agents need to execute all unit file load operations before attemping to start anything. The taskChain approach did not provide this safetly. An ordered list of tasks gives us what we need and greatly simplifies the codebase. This is a backport of 7b44072.
1 parent 2d5c6bb commit 6f60d76

4 files changed

Lines changed: 280 additions & 344 deletions

File tree

agent/reconcile.go

Lines changed: 74 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package agent
1818

1919
import (
2020
"fmt"
21+
"sort"
2122
"time"
2223

2324
"github.com/coreos/fleet/job"
@@ -81,9 +82,8 @@ func (ar *AgentReconciler) Reconcile(a *Agent) {
8182
return
8283
}
8384

84-
for tc := range ar.calculateTaskChainsForUnits(dAgentState, cAgentState) {
85-
ar.launchTaskChain(tc, a)
86-
}
85+
tasks := ar.calculateTasksForUnits(dAgentState, cAgentState)
86+
ar.launchTasks(tasks, a)
8787
}
8888

8989
// Purge attempts to unload all Units that have been loaded locally
@@ -98,18 +98,18 @@ func (ar *AgentReconciler) Purge(a *Agent) {
9898
return
9999
}
100100

101+
var tasks []task
101102
for name, _ := range cAgentState {
102-
t := task{
103+
tasks = append(tasks, task{
103104
typ: taskTypeUnloadUnit,
104105
reason: taskReasonPurgingAgent,
105-
}
106-
u := &job.Unit{
107-
Name: name,
108-
}
109-
tc := newTaskChain(u, t)
110-
ar.launchTaskChain(tc, a)
106+
unit: &job.Unit{
107+
Name: name,
108+
},
109+
})
111110
}
112111

112+
ar.launchTasks(tasks, a)
113113
time.Sleep(time.Second)
114114
}
115115
}
@@ -160,36 +160,29 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
160160
return &as, nil
161161
}
162162

163-
// calculateTaskChainsForUnits compares the desired and current state of an Agent.
164-
// The generated taskChains represent what should be done to make the desired
165-
// state match the current state.
166-
func (ar *AgentReconciler) calculateTaskChainsForUnits(dState *AgentState, cState unitStates) <-chan taskChain {
167-
tcChan := make(chan taskChain)
168-
go func() {
169-
jobs := pkg.NewUnsafeSet()
170-
for cName := range cState {
171-
jobs.Add(cName)
172-
}
173-
174-
for dName := range dState.Units {
175-
jobs.Add(dName)
176-
}
163+
// calculateTasksForUnits compares the desired and current state of an Agent.
164+
// The generated tasks represent what, in order, should be done to make the
165+
// desired state match the current state.
166+
func (ar *AgentReconciler) calculateTasksForUnits(dState *AgentState, cState unitStates) []task {
167+
jobs := pkg.NewUnsafeSet()
168+
for cName := range cState {
169+
jobs.Add(cName)
170+
}
177171

178-
for _, name := range jobs.Values() {
179-
tc := ar.calculateTaskChainForUnit(dState, cState, name)
180-
if tc == nil {
181-
continue
182-
}
183-
tcChan <- *tc
184-
}
172+
for dName := range dState.Units {
173+
jobs.Add(dName)
174+
}
185175

186-
close(tcChan)
187-
}()
176+
var tasks []task
177+
for _, name := range jobs.Values() {
178+
tasks = append(tasks, ar.calculateTasksForUnit(dState, cState, name)...)
179+
}
188180

189-
return tcChan
181+
sort.Sort(sortableTasks(tasks))
182+
return tasks
190183
}
191184

192-
func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState unitStates, jName string) *taskChain {
185+
func (ar *AgentReconciler) calculateTasksForUnit(dState *AgentState, cState unitStates, jName string) (tasks []task) {
193186
var dJob *job.Unit
194187
var dJHash string
195188
if dState != nil {
@@ -209,119 +202,115 @@ func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState
209202
return nil
210203
}
211204

205+
u := &job.Unit{
206+
Name: jName,
207+
}
208+
212209
if dJob == nil || dJob.TargetState == job.JobStateInactive {
213210
if cJState == nil {
214211
return nil
215212
}
216-
t := task{
213+
tasks = append(tasks, task{
217214
typ: taskTypeUnloadUnit,
218215
reason: taskReasonLoadedButNotScheduled,
219-
}
220-
u := &job.Unit{
221-
Name: jName,
222-
}
223-
tc := newTaskChain(u, t)
224-
return &tc
216+
unit: u,
217+
})
218+
return
225219
}
226220

227-
u := &job.Unit{
228-
Name: jName,
229-
Unit: dJob.Unit,
230-
}
221+
u.Unit = dJob.Unit
231222

232223
if cJState == nil {
233-
tc := newTaskChain(u)
234-
tc.Add(task{
224+
tasks = append(tasks, task{
235225
typ: taskTypeLoadUnit,
236226
reason: taskReasonScheduledButUnloaded,
227+
unit: u,
237228
})
238229

239230
// as an optimization, queue the unit for launching immediately after loading
240231
if dJob.TargetState == job.JobStateLaunched {
241-
tc.Add(task{
232+
tasks = append(tasks, task{
242233
typ: taskTypeStartUnit,
243234
reason: taskReasonLoadedDesiredStateLaunched,
235+
unit: u,
244236
})
245237
}
246238

247-
return &tc
239+
return
248240
}
249241

250242
if cJHash != dJHash {
251243
log.V(1).Infof("Desired hash %q differs to current hash %s of Job(%s) - unloading", dJHash, cJHash, jName)
252-
tc := newTaskChain(u)
253-
tc.Add(task{
254-
typ: taskTypeUnloadUnit,
255-
reason: taskReasonLoadedButHashDiffers,
256-
})
257-
258244
// queue the correct unit for loading immediately after unloading the old one
259-
tc.Add(task{
260-
typ: taskTypeLoadUnit,
261-
reason: taskReasonScheduledButUnloaded,
262-
})
245+
tasks = append(tasks,
246+
task{
247+
typ: taskTypeUnloadUnit,
248+
reason: taskReasonLoadedButHashDiffers,
249+
unit: u,
250+
},
251+
task{
252+
typ: taskTypeLoadUnit,
253+
reason: taskReasonScheduledButUnloaded,
254+
unit: u,
255+
},
256+
)
263257

264258
// as an optimization, queue the unit for launching immediately after loading
265259
if dJob.TargetState == job.JobStateLaunched {
266-
tc.Add(task{
260+
tasks = append(tasks, task{
267261
typ: taskTypeStartUnit,
268262
reason: taskReasonLoadedDesiredStateLaunched,
263+
unit: u,
269264
})
270265
}
271266

272-
return &tc
267+
return
273268
}
274269

275270
if *cJState == dJob.TargetState {
276271
log.V(1).Infof("Desired state %q matches current state of Job(%s), nothing to do", *cJState, jName)
277272
return nil
278273
}
279274

280-
tc := newTaskChain(u)
281275
if *cJState == job.JobStateInactive {
282-
tc.Add(task{
276+
tasks = append(tasks, task{
283277
typ: taskTypeLoadUnit,
284278
reason: taskReasonScheduledButUnloaded,
279+
unit: u,
285280
})
286281
}
287282

288283
if (*cJState == job.JobStateInactive || *cJState == job.JobStateLoaded) && dJob.TargetState == job.JobStateLaunched {
289-
tc.Add(task{
284+
tasks = append(tasks, task{
290285
typ: taskTypeStartUnit,
291286
reason: taskReasonLoadedDesiredStateLaunched,
287+
unit: u,
292288
})
293289
}
294290

295291
if *cJState == job.JobStateLaunched && dJob.TargetState == job.JobStateLoaded {
296-
tc.Add(task{
292+
tasks = append(tasks, task{
297293
typ: taskTypeStopUnit,
298294
reason: taskReasonLaunchedDesiredStateLoaded,
295+
unit: u,
299296
})
300297
}
301298

302-
if len(tc.tasks) == 0 {
299+
if len(tasks) == 0 {
303300
log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#v", jName, dJob, cJState)
304-
return nil
305301
}
306302

307-
return &tc
303+
return
308304
}
309305

310-
func (ar *AgentReconciler) launchTaskChain(tc taskChain, a *Agent) {
311-
log.V(1).Infof("AgentReconciler attempting task chain %s", tc)
312-
reschan, err := ar.tManager.Do(tc, a)
313-
if err != nil {
314-
log.Infof("AgentReconciler task chain failed: chain=%s err=%v", tc, err)
315-
return
316-
}
317-
318-
go func() {
319-
for res := range reschan {
320-
if res.err == nil {
321-
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, tc.unit.Name, res.task.reason)
322-
} else {
323-
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, tc.unit.Name, res.task.reason, res.err)
324-
}
306+
func (ar *AgentReconciler) launchTasks(tasks []task, a *Agent) {
307+
log.V(1).Infof("AgentReconciler attempting tasks %s", tasks)
308+
results := ar.tManager.Do(tasks, a)
309+
for _, res := range results {
310+
if res.err == nil {
311+
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, res.task.unit.Name, res.task.reason)
312+
} else {
313+
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, res.task.unit.Name, res.task.reason, res.err)
325314
}
326-
}()
315+
}
327316
}

0 commit comments

Comments
 (0)