@@ -13,6 +13,7 @@ import (
13
13
"sync"
14
14
)
15
15
16
+ // NewEventRepeater creates an event repeater application.
16
17
func NewEventRepeater (eSource string , eSink string , protocol string , allowall bool ) application.Plugin {
17
18
sub , err := event_rpc .NewClient (eSource )
18
19
if err != nil {
@@ -71,6 +72,7 @@ type eventRepeater struct {
71
72
eventListMt * sync.Mutex
72
73
}
73
74
75
+ // RepeatRule for each event
74
76
type RepeatRule struct {
75
77
SourceTopic string
76
78
SinkTopic string
@@ -80,8 +82,8 @@ type RepeatRule struct {
80
82
}
81
83
82
84
type messageData struct {
83
- SourceTopic string `json:"sourcetopic", omitempty"`
84
- SinkTopic string `json:"sinktopic", omitempty"`
85
+ SourceTopic string `json:"sourcetopic, omitempty"`
86
+ SinkTopic string `json:"sinktopic, omitempty"`
85
87
}
86
88
87
89
func (e eventRepeater ) Validate (applicationProperties * types.Any ) error {
@@ -92,7 +94,7 @@ func (e eventRepeater) Healthy(applicationProperties *types.Any) (application.He
92
94
return application .Healthy , nil
93
95
}
94
96
95
- func (e eventRepeater ) AddEvent (sourcesTopic string , sinkTopic string ) error {
97
+ func (e eventRepeater ) addEvent (sourcesTopic string , sinkTopic string ) error {
96
98
if sourcesTopic == "" {
97
99
return fmt .Errorf ("Error: %s" , "You must have a topic of source for add repeat event." )
98
100
}
@@ -120,7 +122,7 @@ func (e eventRepeater) AddEvent(sourcesTopic string, sinkTopic string) error {
120
122
return nil
121
123
}
122
124
123
- func (e eventRepeater ) DelEvent (sourcesTopic string ) error {
125
+ func (e eventRepeater ) delEvent (sourcesTopic string ) error {
124
126
if sourcesTopic == "" {
125
127
return fmt .Errorf ("Error: %s" , "You must have a topic of source for delete repeat event." )
126
128
}
@@ -167,7 +169,6 @@ func (e eventRepeater) publishToSink(rr *RepeatRule) error {
167
169
}
168
170
}
169
171
}
170
- return nil
171
172
}
172
173
173
174
func (e eventRepeater ) Update (message * application.Message ) error {
@@ -183,25 +184,25 @@ func (e eventRepeater) Update(message *application.Message) error {
183
184
case application .ADD :
184
185
for _ , d := range dataStruct {
185
186
log .Debugf ("Add message %v \n " , d )
186
- err := e .AddEvent (d .SourceTopic , d .SinkTopic )
187
+ err := e .addEvent (d .SourceTopic , d .SinkTopic )
187
188
if err != nil {
188
189
return err
189
190
}
190
191
}
191
192
case application .DELETE :
192
193
for _ , d := range dataStruct {
193
- err := e .DelEvent (d .SourceTopic )
194
+ err := e .delEvent (d .SourceTopic )
194
195
if err != nil {
195
196
return err
196
197
}
197
198
}
198
199
case application .UPDATE :
199
200
for _ , d := range dataStruct {
200
- err := e .DelEvent (d .SourceTopic )
201
+ err := e .delEvent (d .SourceTopic )
201
202
if err != nil {
202
203
return err
203
204
}
204
- err = e .AddEvent (d .SourceTopic , d .SinkTopic )
205
+ err = e .addEvent (d .SourceTopic , d .SinkTopic )
205
206
if err != nil {
206
207
return err
207
208
}
@@ -218,7 +219,7 @@ func (e eventRepeater) Update(message *application.Message) error {
218
219
219
220
func (e eventRepeater ) serve () error {
220
221
if e .allowAll {
221
- e .AddEvent ("." , "" )
222
+ e .addEvent ("." , "" )
222
223
}
223
224
for {
224
225
select {
0 commit comments