Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Commit 45797af

Browse files
committed
Merge pull request #742 from jonboulle/742
fleet engine constantly running
2 parents fe8c809 + 8164760 commit 45797af

6 files changed

Lines changed: 92 additions & 49 deletions

File tree

event/bus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ func (eb *EventBus) AddListener(eType Event, lFunc listenerFunc) {
2626
}
2727

2828
// Dispatch calls all listeners registered to the given Event
29-
func (eb *EventBus) Dispatch(ev *Event) {
29+
func (eb *EventBus) Dispatch(ev Event) {
3030
wg := sync.WaitGroup{}
31-
for _, lFunc := range eb.lFuncMap[*ev] {
31+
for _, lFunc := range eb.lFuncMap[ev] {
3232
wg.Add(1)
3333
go func() {
3434
lFunc()

event/bus_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestEventBus(t *testing.T) {
1818
ev := Event("TypeOne")
1919
bus.AddListener(ev, tl.HandleEvent)
2020

21-
bus.Dispatch(&ev)
21+
bus.Dispatch(ev)
2222

2323
select {
2424
case <-tl.evchan:
@@ -34,8 +34,8 @@ func TestEventBusNoDispatch(t *testing.T) {
3434
ev2 := Event("TypeTwo")
3535
bus.AddListener(ev1, tl.HandleEvent)
3636

37-
bus.Dispatch(&ev2)
38-
bus.Dispatch(&ev1)
37+
bus.Dispatch(ev2)
38+
bus.Dispatch(ev1)
3939

4040
close(tl.evchan)
4141

event/event.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package event
22

33
var (
4-
GlobalEvent = Event("GlobalEvent")
5-
JobEvent = Event("JobEvent")
4+
// Occurs when any Job's target is touched
5+
JobTargetChangeEvent = Event("JobTargetChangeEvent")
6+
// Occurs when any Job's target state is touched
7+
JobTargetStateChangeEvent = Event("JobTargetStateChangeEvent")
68
)
79

810
type Event string

registry/event.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,34 @@ func NewEventStream(client etcd.Client, registry Registry) (*EventStream, error)
2626
return &EventStream{client, reg}, nil
2727
}
2828

29-
func (es *EventStream) Stream(idx uint64, sendFunc func(*event.Event), stop chan bool) {
29+
func (es *EventStream) Stream(idx uint64, sendFunc func(event.Event), stop chan bool) {
3030
etcdchan := make(chan *etcd.Result)
3131
go watch(es.etcd, idx, etcdchan, es.registry.keyPrefix, stop)
3232
go filter(etcdchan, es.registry.keyPrefix, sendFunc, stop)
3333
}
3434

35-
func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(*event.Event), stop chan bool) {
36-
parse := func(res *etcd.Result) *event.Event {
35+
func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(event.Event), stop chan bool) {
36+
parse := func(res *etcd.Result) (ev event.Event, ok bool) {
3737
if res == nil || res.Node == nil {
38-
return nil
38+
return
3939
}
4040

41-
if !strings.HasPrefix(res.Node.Key, prefix) {
42-
return nil
41+
// ignore everything but the job namespace
42+
if !strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) {
43+
return
4344
}
4445

45-
var ev event.Event
46-
if strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) {
47-
ev = event.JobEvent
48-
} else {
49-
ev = event.GlobalEvent
46+
_, baseName := path.Split(res.Node.Key)
47+
switch baseName {
48+
case "target-state":
49+
ev = event.JobTargetStateChangeEvent
50+
ok = true
51+
case "target":
52+
ev = event.JobTargetChangeEvent
53+
ok = true
54+
default:
5055
}
51-
52-
return &ev
56+
return
5357
}
5458

5559
for {
@@ -58,14 +62,10 @@ func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(*event.Even
5862
return
5963
case res := <-etcdchan:
6064
log.V(1).Infof("Received %v from etcd watch", res)
61-
62-
ev := parse(res)
63-
if ev == nil {
64-
continue
65+
if ev, ok := parse(res); ok {
66+
log.V(1).Infof("Translated %v to Event(Type=%s)", res, ev)
67+
sendFunc(ev)
6568
}
66-
67-
log.V(1).Infof("Translated %v to Event(Type=%s)", res, ev)
68-
sendFunc(ev)
6969
}
7070
}
7171
}

registry/event_test.go

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,41 +10,81 @@ import (
1010

1111
func TestFilterEtcdEvents(t *testing.T) {
1212
tests := []struct {
13-
in *etcd.Result
14-
ev *event.Event
13+
in string
14+
ev []event.Event
1515
}{
1616
{
17-
in: nil,
18-
ev: nil,
17+
in: "",
18+
ev: []event.Event{},
1919
},
2020
{
21-
in: &etcd.Result{Node: &etcd.Node{Key: "/"}},
22-
ev: nil,
21+
in: "/",
22+
ev: []event.Event{},
2323
},
2424
{
25-
in: &etcd.Result{Node: &etcd.Node{Key: "/fleet"}},
26-
ev: &event.GlobalEvent,
25+
in: "/fleet",
26+
ev: []event.Event{},
2727
},
2828
{
29-
in: &etcd.Result{Node: &etcd.Node{Key: "/fleet/job"}},
30-
ev: &event.JobEvent,
29+
in: "/fleet/job",
30+
ev: []event.Event{},
31+
},
32+
{
33+
in: "/fleet/job/foo/object",
34+
ev: []event.Event{},
35+
},
36+
{
37+
in: "/fleet/machine/asdf",
38+
ev: []event.Event{},
39+
},
40+
{
41+
in: "/fleet/state/asdf",
42+
ev: []event.Event{},
43+
},
44+
{
45+
in: "/fleet/job/asdf/target-state",
46+
ev: []event.Event{event.JobTargetStateChangeEvent},
47+
},
48+
{
49+
in: "/fleet/job/foobarbaz/target-state",
50+
ev: []event.Event{event.JobTargetStateChangeEvent},
51+
},
52+
{
53+
in: "/fleet/job/asdf/target",
54+
ev: []event.Event{event.JobTargetChangeEvent},
3155
},
3256
}
3357

3458
for i, tt := range tests {
35-
etcdchan := make(chan *etcd.Result)
36-
stopchan := make(chan bool)
37-
prefix := "/fleet"
59+
for _, action := range []string{"set", "update", "create", "delete"} {
60+
etcdchan := make(chan *etcd.Result)
61+
stopchan := make(chan bool)
62+
prefix := "/fleet"
3863

39-
send := func(ev *event.Event) {
40-
if !reflect.DeepEqual(tt.ev, ev) {
41-
t.Errorf("case %d: received incorrect event\nexpected %#v\ngot %#v", i, tt.ev, ev)
64+
got := make([]event.Event, 0)
65+
send := func(ev event.Event) {
66+
got = append(got, ev)
4267
}
43-
}
4468

45-
go filter(etcdchan, prefix, send, stopchan)
69+
go filter(etcdchan, prefix, send, stopchan)
4670

47-
etcdchan <- tt.in
48-
close(stopchan)
71+
var res *etcd.Result
72+
if tt.in != "" {
73+
res = &etcd.Result{
74+
Node: &etcd.Node{
75+
Key: tt.in,
76+
},
77+
Action: action,
78+
}
79+
}
80+
etcdchan <- res
81+
82+
if !reflect.DeepEqual(tt.ev, got) {
83+
t.Errorf("case %d: received incorrect event\nexpected %#v\ngot %#v", i, tt.ev, got)
84+
t.Logf("action: %v", action)
85+
}
86+
87+
close(stopchan)
88+
}
4989
}
5090
}

server/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ func New(cfg config.Config) (*Server, error) {
9595
}
9696

9797
eBus := event.NewEventBus()
98-
eBus.AddListener(event.JobEvent, ar.Trigger)
99-
eBus.AddListener(event.GlobalEvent, e.Trigger)
98+
eBus.AddListener(event.JobTargetChangeEvent, ar.Trigger)
99+
eBus.AddListener(event.JobTargetStateChangeEvent, ar.Trigger)
100+
eBus.AddListener(event.JobTargetStateChangeEvent, e.Trigger)
100101

101102
listeners, err := activation.Listeners(false)
102103
if err != nil {

0 commit comments

Comments
 (0)