Skip to content

Commit 796cf79

Browse files
committed
fixed generic callback function for Subscribe
1 parent cfb85d8 commit 796cf79

File tree

2 files changed

+39
-19
lines changed

2 files changed

+39
-19
lines changed

rpc/plugins/push/prosumer.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,24 @@ func NewProsumer(client *core.Client, id ...string) *Prosumer {
5656
return p
5757
}
5858

59+
func (p *Prosumer) onError(err error) {
60+
if p.OnError != nil {
61+
p.OnError(err)
62+
}
63+
}
64+
65+
func (p *Prosumer) onSubscribe(topic string) {
66+
if p.OnSubscribe != nil {
67+
p.OnSubscribe(topic)
68+
}
69+
}
70+
71+
func (p *Prosumer) onUnsubscribe(topic string) {
72+
if p.OnUnsubscribe != nil {
73+
p.OnUnsubscribe(topic)
74+
}
75+
}
76+
5977
func (p *Prosumer) Client() *core.Client {
6078
return p.client
6179
}
@@ -99,14 +117,19 @@ func (p *Prosumer) call(callback Callback, message Message) {
99117
default:
100118
v := reflect.ValueOf(callback)
101119
t := v.Type()
102-
switch t.NumIn() {
103-
case 1:
104-
if data, err := io.Convert(message.Data, t.In(0)); err != nil {
105-
v.Call([]reflect.Value{reflect.ValueOf(data)})
120+
if n := t.NumIn(); n >= 1 {
121+
data, err := io.Convert(message.Data, t.In(0))
122+
if err != nil {
123+
p.onError(err)
124+
return
106125
}
107-
case 2:
108-
if data, err := io.Convert(message.Data, t.In(0)); err != nil {
126+
switch n {
127+
case 1:
128+
v.Call([]reflect.Value{reflect.ValueOf(data)})
129+
case 2:
109130
v.Call([]reflect.Value{reflect.ValueOf(data), reflect.ValueOf(message.From)})
131+
default:
132+
panic("invalid callback: " + t.String())
110133
}
111134
}
112135
}
@@ -120,9 +143,7 @@ func (p *Prosumer) message() {
120143
if p.RetryInterval != 0 {
121144
<-time.After(p.RetryInterval)
122145
}
123-
if p.OnError != nil {
124-
p.OnError(err)
125-
}
146+
p.onError(err)
126147
}
127148
continue
128149
}
@@ -138,9 +159,7 @@ func (p *Prosumer) Subscribe(topic string, callback Callback) (result bool, err
138159
p.callbacks.Store(topic, callback)
139160
result, err = p.proxy.subscribe(topic)
140161
go p.message()
141-
if p.OnSubscribe != nil {
142-
p.OnSubscribe(topic)
143-
}
162+
p.onSubscribe(topic)
144163
}
145164
return
146165
}
@@ -149,9 +168,7 @@ func (p *Prosumer) Unsubscribe(topic string) (result bool, err error) {
149168
if p.ID() != "" {
150169
result, err = p.proxy.unsubscribe(topic)
151170
p.callbacks.Delete(topic)
152-
if p.OnUnsubscribe != nil {
153-
p.OnUnsubscribe(topic)
154-
}
171+
p.onUnsubscribe(topic)
155172
}
156173
return
157174
}

rpc/rpc_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -779,19 +779,22 @@ func TestPush(t *testing.T) {
779779
time.Sleep(time.Millisecond * 5)
780780

781781
client1 := rpc.NewClient("tcp://127.0.0.1/")
782-
client1.Use(log.Plugin.IOHandler)
782+
//client1.Use(log.Plugin.IOHandler)
783783
prosumer1 := push.NewProsumer(client1, "1")
784+
prosumer1.OnError = func(e error) {
785+
fmt.Println(e.Error())
786+
}
784787
prosumer1.OnSubscribe = func(topic string) {
785788
fmt.Println(topic, "is subscribed.")
786789
}
787790
prosumer1.OnUnsubscribe = func(topic string) {
788791
fmt.Println(topic, "is unsubscribed.")
789792
}
790793
client2 := rpc.NewClient("tcp://127.0.0.1/")
791-
client2.Use(log.Plugin.IOHandler)
794+
//client2.Use(log.Plugin.IOHandler)
792795
prosumer2 := push.NewProsumer(client2, "2")
793-
prosumer1.Subscribe("test", func(data string) {
794-
fmt.Println(data)
796+
prosumer1.Subscribe("test", func(data int, from string) {
797+
fmt.Printf("%v from %v\n", data, from)
795798
})
796799
prosumer1.Subscribe("test2", func(message push.Message) {
797800
fmt.Println(message)

0 commit comments

Comments
 (0)