Skip to content

Commit 86facd9

Browse files
committed
Fixed some bugs of Prosumer.
1 parent 9f9a404 commit 86facd9

File tree

2 files changed

+41
-13
lines changed

2 files changed

+41
-13
lines changed

rpc/plugins/push/prosumer.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ type prosumer struct {
4747

4848
func NewProsumer(client *core.Client, id ...string) *Prosumer {
4949
p := &Prosumer{
50-
client: client,
50+
client: client,
51+
RetryInterval: time.Second,
5152
}
5253
if len(id) > 0 && id[0] != "" {
5354
p.SetID(id[0])
@@ -139,12 +140,16 @@ func (p *Prosumer) message() {
139140
for {
140141
topics, err := p.proxy.message()
141142
if err != nil {
142-
if err != core.ErrTimeout {
143+
if !core.IsTimeoutError(err) {
143144
if p.RetryInterval != 0 {
144145
<-time.After(p.RetryInterval)
145146
}
146147
p.onError(err)
147148
}
149+
p.callbacks.Range(func(key, value interface{}) bool {
150+
p.proxy.subscribe(key.(string))
151+
return true
152+
})
148153
continue
149154
}
150155
if topics == nil {

rpc/rpc_test.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ func TestPush(t *testing.T) {
779779
time.Sleep(time.Millisecond * 5)
780780

781781
client1 := rpc.NewClient("tcp://127.0.0.1/")
782-
//client1.Use(log.Plugin.IOHandler)
782+
client1.Use(log.Plugin.IOHandler)
783783
prosumer1 := push.NewProsumer(client1, "1")
784784
prosumer1.OnError = func(e error) {
785785
fmt.Println(e.Error())
@@ -801,17 +801,40 @@ func TestPush(t *testing.T) {
801801
})
802802
time.Sleep(time.Millisecond * 100)
803803
client1.Invoke("hello", []interface{}{"world"})
804-
var wg sync.WaitGroup
805-
n := 1000
806-
wg.Add(n)
807-
for i := 0; i < n; i++ {
808-
go func(i int) {
809-
prosumer2.Push(i, "test", "1")
810-
wg.Done()
811-
}(i)
812-
}
813-
wg.Wait()
804+
prosumer2.Push(1, "test", "1")
805+
// var wg sync.WaitGroup
806+
// n := 1000
807+
// wg.Add(n)
808+
// for i := 0; i < n; i++ {
809+
// go func(i int) {
810+
// prosumer2.Push(i, "test", "1")
811+
// wg.Done()
812+
// }(i)
813+
// }
814+
// wg.Wait()
814815
time.Sleep(time.Millisecond * 100)
816+
817+
server.Close()
818+
819+
time.Sleep(time.Millisecond * 100)
820+
821+
server, _ = net.Listen("tcp", "127.0.0.1:8412")
822+
_ = service.Bind(server)
823+
824+
time.Sleep(time.Millisecond * 1000)
825+
826+
prosumer2.Push(2, "test", "1")
827+
828+
// wg.Add(n)
829+
// for i := 0; i < n; i++ {
830+
// go func(i int) {
831+
// prosumer2.Push(i, "test", "1")
832+
// wg.Done()
833+
// }(i)
834+
// }
835+
// wg.Wait()
836+
time.Sleep(time.Millisecond * 1000)
837+
815838
prosumer1.Unsubscribe("test")
816839
prosumer1.Unsubscribe("test2")
817840

0 commit comments

Comments
 (0)