Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 9ffc5a0

Browse files
committed
add test
Signed-off-by: Yuji Oshima <[email protected]>
1 parent a917a68 commit 9ffc5a0

File tree

2 files changed

+126
-9
lines changed

2 files changed

+126
-9
lines changed

Diff for: examples/application/event-repeater/application.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
event_rpc "github.com/docker/infrakit/pkg/rpc/event"
88
"github.com/docker/infrakit/pkg/spi/application"
99
"github.com/docker/infrakit/pkg/spi/event"
10-
"github.com/docker/infrakit/pkg/template"
1110
"github.com/docker/infrakit/pkg/types"
1211
MQTT "github.com/eclipse/paho.mqtt.golang"
1312
"sync"
@@ -143,11 +142,6 @@ func (e eventRepeater) Stop() {
143142
}
144143

145144
func (e eventRepeater) publishToSink(rr *RepeatRule) error {
146-
templateURL := "str://{{.}}"
147-
engine, err := template.NewTemplate(templateURL, template.Options{})
148-
if err != nil {
149-
return err
150-
}
151145
for {
152146
select {
153147
case <-rr.SinkStopCh:
@@ -157,15 +151,23 @@ func (e eventRepeater) publishToSink(rr *RepeatRule) error {
157151
log.Info("Server disconnected", "topic", rr.SourceTopic)
158152
return nil
159153
}
160-
buff, err := engine.Render(s)
154+
buff, err := s.Bytes()
161155
if err != nil {
162156
return err
163157
}
164158
switch e.Protocol {
165159
case "mqtt":
166-
e.sinkEClient.(MQTT.Client).Publish(rr.SinkTopic, 0, false, buff)
160+
if rr.SinkTopic == "." {
161+
e.sinkEClient.(MQTT.Client).Publish(s.Topic.String(), 0, false, buff)
162+
} else {
163+
e.sinkEClient.(MQTT.Client).Publish(rr.SinkTopic, 0, false, buff)
164+
}
167165
case "stderr":
168-
log.Infof("Publish subtopic %s gettopic %v pubtopic %v message %s\n", rr.SourceTopic, s.Topic, rr.SinkTopic, buff)
166+
if rr.SinkTopic == "." {
167+
log.Infof("Publish subtopic %s gettopic %v pubtopic %v message %s\n", rr.SourceTopic, s.Topic, s.Topic, buff)
168+
} else {
169+
log.Infof("Publish subtopic %s gettopic %v pubtopic %v message %s\n", rr.SourceTopic, s.Topic, rr.SinkTopic, buff)
170+
}
169171
}
170172
}
171173
}
+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
event_rpc "github.com/docker/infrakit/pkg/rpc/event"
6+
rpc_server "github.com/docker/infrakit/pkg/rpc/server"
7+
"github.com/docker/infrakit/pkg/spi/event"
8+
testing_event "github.com/docker/infrakit/pkg/testing/event"
9+
"github.com/docker/infrakit/pkg/types"
10+
MQTT "github.com/eclipse/paho.mqtt.golang"
11+
"github.com/stretchr/testify/require"
12+
"io/ioutil"
13+
"path/filepath"
14+
"testing"
15+
"time"
16+
)
17+
18+
var MQTTTESTSERVER string = "tcp://test.mosquitto.org:1883"
19+
20+
func tempSocket() string {
21+
dir, err := ioutil.TempDir("", "infrakit-test-")
22+
if err != nil {
23+
panic(err)
24+
}
25+
return filepath.Join(dir, "app-impl-test")
26+
}
27+
func runEvent(startPub chan struct{}) (*string, rpc_server.Stoppable, error) {
28+
socketPath := tempSocket()
29+
events := 5
30+
publishChan0 := make(chan chan<- *event.Event)
31+
go func() {
32+
publish := <-publishChan0
33+
defer close(publish)
34+
<-startPub
35+
// here we have the channel and ready to go
36+
for i := 0; i < events; i++ {
37+
<-time.After(50 * time.Millisecond)
38+
fmt.Printf("publish event%d\n", i)
39+
publish <- event.Event{
40+
Topic: types.PathFromString("instance/create"),
41+
ID: fmt.Sprintf("host-%d", i),
42+
}.Init().WithDataMust([]int{1, 2}).Now()
43+
}
44+
}()
45+
m := map[string]interface{}{}
46+
types.Put(types.PathFromString("instance/create"), "instance-create", m)
47+
plugin0 := &testing_event.Plugin{
48+
DoList: func(topic types.Path) ([]string, error) {
49+
return types.List(topic, m), nil
50+
},
51+
Publisher: &testing_event.Publisher{
52+
DoPublishOn: func(c chan<- *event.Event) {
53+
publishChan0 <- c
54+
close(publishChan0)
55+
},
56+
},
57+
}
58+
var impl rpc_server.VersionedInterface = event_rpc.PluginServerWithTypes(
59+
map[string]event.Plugin{
60+
"iktest": plugin0,
61+
})
62+
server, err := rpc_server.StartPluginAtPath(socketPath, impl)
63+
if err != nil {
64+
return nil, nil, err
65+
}
66+
return &socketPath, server, nil
67+
}
68+
69+
func runSub(msgch chan MQTT.Message) (MQTT.Client, error) {
70+
opts := MQTT.NewClientOptions().AddBroker(MQTTTESTSERVER)
71+
client := MQTT.NewClient(opts)
72+
if token := client.Connect(); token.Wait() && token.Error() != nil {
73+
return nil, token.Error()
74+
}
75+
subToken := client.Subscribe(
76+
"iktest/instance/create",
77+
0,
78+
func(client MQTT.Client, msg MQTT.Message) {
79+
msgch <- msg
80+
})
81+
fmt.Printf("mqtt substart")
82+
if subToken.Wait() && subToken.Error() != nil {
83+
return nil, subToken.Error()
84+
}
85+
return client, nil
86+
}
87+
88+
func TestIntegration(t *testing.T) {
89+
startPub := make(chan struct{})
90+
socketPath, erpcsrv, err := runEvent(startPub)
91+
defer erpcsrv.Stop()
92+
require.NoError(t, err)
93+
mqsubch := make(chan MQTT.Message)
94+
mqttClient, err := runSub(mqsubch)
95+
require.NoError(t, err)
96+
defer mqttClient.Disconnect(250)
97+
app := NewEventRepeater(*socketPath, MQTTTESTSERVER, "mqtt", true)
98+
defer app.(*eventRepeater).Stop()
99+
close(startPub)
100+
var subEvent int = 0
101+
loop:
102+
for {
103+
select {
104+
case <-time.After(500 * time.Millisecond):
105+
break loop
106+
case sub := <-mqsubch:
107+
subany := types.AnyBytes(sub.Payload())
108+
subevent := event.Event{}
109+
err := subany.Decode(&subevent)
110+
require.NoError(t, err)
111+
require.Equal(t, subevent.ID, fmt.Sprintf("host-%d", subEvent))
112+
subEvent++
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)