Skip to content

Commit f85f2bc

Browse files
committedAug 9, 2019
Changed get-latest protocol to have responses with a status code.
Wrapped protocol read/writes in contexts. A little refactoring. Protobuf Makefile builds on Windows too.
1 parent e9b0864 commit f85f2bc

File tree

6 files changed

+208
-92
lines changed

6 files changed

+208
-92
lines changed
 

‎getlatest.go

+74-34
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package namesys
22

33
import (
4-
"bufio"
54
"context"
5+
"errors"
66
"io"
77
"time"
88

@@ -17,27 +17,33 @@ import (
1717
pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
1818
)
1919

20+
var GetLatestErr = errors.New("get-latest: received error")
21+
2022
type getLatestProtocol struct {
23+
ctx context.Context
2124
host host.Host
2225
}
2326

24-
func newGetLatestProtocol(host host.Host, getLocal func(key string) ([]byte, error)) *getLatestProtocol {
25-
p := &getLatestProtocol{host}
27+
func newGetLatestProtocol(ctx context.Context, host host.Host, getLocal func(key string) ([]byte, error)) *getLatestProtocol {
28+
p := &getLatestProtocol{ctx, host}
2629

2730
host.SetStreamHandler(PSGetLatestProto, func(s network.Stream) {
28-
p.Receive(s, getLocal)
31+
p.receive(s, getLocal)
2932
})
3033

3134
return p
3235
}
3336

34-
func (p *getLatestProtocol) Receive(s network.Stream, getLocal func(key string) ([]byte, error)) {
35-
r := ggio.NewDelimitedReader(s, 1<<20)
37+
func (p *getLatestProtocol) receive(s network.Stream, getLocal func(key string) ([]byte, error)) {
3638
msg := &pb.RequestLatest{}
37-
if err := r.ReadMsg(msg); err != nil {
39+
if err := readMsg(p.ctx, s, msg); err != nil {
3840
if err != io.EOF {
39-
s.Reset()
4041
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err)
42+
respProto := pb.RespondLatest{Status: pb.RespondLatest_ERR}
43+
if err := writeMsg(p.ctx, s, &respProto); err != nil {
44+
return
45+
}
46+
helpers.FullClose(s)
4147
} else {
4248
// Just be nice. They probably won't read this
4349
// but it doesn't hurt to send it.
@@ -46,64 +52,98 @@ func (p *getLatestProtocol) Receive(s network.Stream, getLocal func(key string)
4652
return
4753
}
4854

49-
response, err := getLocal(*msg.Identifier)
55+
response, err := getLocal(msg.Identifier)
5056
var respProto pb.RespondLatest
5157

52-
if err != nil || response == nil {
53-
nodata := true
54-
respProto = pb.RespondLatest{Nodata: &nodata}
58+
if err != nil {
59+
respProto = pb.RespondLatest{Status: pb.RespondLatest_NOT_FOUND}
5560
} else {
5661
respProto = pb.RespondLatest{Data: response}
5762
}
5863

59-
if err := writeBytes(s, &respProto); err != nil {
60-
s.Reset()
61-
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), err)
64+
if err := writeMsg(p.ctx, s, &respProto); err != nil {
6265
return
6366
}
6467
helpers.FullClose(s)
6568
}
6669

67-
func (p getLatestProtocol) Send(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
70+
func (p getLatestProtocol) Get(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
6871
peerCtx, cancel := context.WithTimeout(ctx, time.Second*10)
6972
defer cancel()
7073

7174
s, err := p.host.NewStream(peerCtx, pid, PSGetLatestProto)
7275
if err != nil {
7376
return nil, err
7477
}
75-
76-
if err := s.SetDeadline(time.Now().Add(time.Second * 5)); err != nil {
77-
return nil, err
78-
}
79-
8078
defer helpers.FullClose(s)
8179

82-
msg := pb.RequestLatest{Identifier: &key}
80+
msg := &pb.RequestLatest{Identifier: key}
8381

84-
if err := writeBytes(s, &msg); err != nil {
85-
s.Reset()
82+
if err := writeMsg(ctx, s, msg); err != nil {
8683
return nil, err
8784
}
88-
8985
s.Close()
9086

91-
r := ggio.NewDelimitedReader(s, 1<<20)
9287
response := &pb.RespondLatest{}
93-
if err := r.ReadMsg(response); err != nil {
88+
if err := readMsg(ctx, s, response); err != nil {
9489
return nil, err
9590
}
9691

97-
return response.Data, nil
92+
switch response.Status {
93+
case pb.RespondLatest_SUCCESS:
94+
return response.Data, nil
95+
case pb.RespondLatest_NOT_FOUND:
96+
return nil, nil
97+
case pb.RespondLatest_ERR:
98+
return nil, GetLatestErr
99+
default:
100+
return nil, errors.New("get-latest: received unknown status code")
101+
}
98102
}
99103

100-
func writeBytes(w io.Writer, msg proto.Message) error {
101-
bufw := bufio.NewWriter(w)
102-
wc := ggio.NewDelimitedWriter(bufw)
104+
func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
105+
done := make(chan error)
106+
go func() {
107+
wc := ggio.NewDelimitedWriter(s)
103108

104-
if err := wc.WriteMsg(msg); err != nil {
105-
return err
109+
if err := wc.WriteMsg(msg); err != nil {
110+
done <- err
111+
return
112+
}
113+
114+
done <- nil
115+
}()
116+
117+
var retErr error
118+
select {
119+
case retErr = <-done:
120+
case <-ctx.Done():
121+
retErr = ctx.Err()
122+
}
123+
124+
if retErr != nil {
125+
s.Reset()
126+
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr)
106127
}
128+
return retErr
129+
}
107130

108-
return bufw.Flush()
131+
func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
132+
done := make(chan error)
133+
go func() {
134+
r := ggio.NewDelimitedReader(s, 1<<20)
135+
if err := r.ReadMsg(msg); err != nil {
136+
done <- err
137+
return
138+
}
139+
done <- nil
140+
}()
141+
142+
select {
143+
case err := <-done:
144+
return err
145+
case <-ctx.Done():
146+
s.Reset()
147+
return ctx.Err()
148+
}
109149
}

‎getlatest_test.go

+48-8
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ package namesys
33
import (
44
"bytes"
55
"context"
6+
"encoding/binary"
67
"errors"
78
"testing"
89
"time"
910

11+
"github.com/libp2p/go-libp2p-core/helpers"
1012
"github.com/libp2p/go-libp2p-core/host"
13+
14+
pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
1115
)
1216

1317
func connect(t *testing.T, a, b host.Host) {
@@ -41,16 +45,16 @@ func TestGetLatestProtocolTrip(t *testing.T) {
4145
time.Sleep(time.Millisecond * 100)
4246

4347
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
44-
h1 := newGetLatestProtocol(hosts[0], d1.Lookup)
48+
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)
4549

4650
d2 := &datastore{map[string][]byte{"key": []byte("value2")}}
47-
h2 := newGetLatestProtocol(hosts[1], d2.Lookup)
51+
h2 := newGetLatestProtocol(ctx, hosts[1], d2.Lookup)
4852

4953
getLatest(t, ctx, h1, h2, "key", []byte("value2"))
5054
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
5155
}
5256

53-
func TestGetLatestProtocolNil(t *testing.T) {
57+
func TestGetLatestProtocolNotFound(t *testing.T) {
5458
ctx, cancel := context.WithCancel(context.Background())
5559
defer cancel()
5660

@@ -61,15 +65,51 @@ func TestGetLatestProtocolNil(t *testing.T) {
6165
time.Sleep(time.Millisecond * 100)
6266

6367
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
64-
h1 := newGetLatestProtocol(hosts[0], d1.Lookup)
68+
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)
6569

6670
d2 := &datastore{make(map[string][]byte)}
67-
h2 := newGetLatestProtocol(hosts[1], d2.Lookup)
71+
h2 := newGetLatestProtocol(ctx, hosts[1], d2.Lookup)
6872

6973
getLatest(t, ctx, h1, h2, "key", nil)
7074
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
7175
}
7276

77+
func TestGetLatestProtocolErr(t *testing.T) {
78+
ctx, cancel := context.WithCancel(context.Background())
79+
defer cancel()
80+
81+
hosts := newNetHosts(ctx, t, 2)
82+
connect(t, hosts[0], hosts[1])
83+
84+
// wait for hosts to get connected
85+
time.Sleep(time.Millisecond * 100)
86+
87+
d1 := &datastore{make(map[string][]byte)}
88+
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)
89+
90+
// bad send protocol to force an error
91+
s, err := hosts[1].NewStream(ctx, h1.host.ID(), PSGetLatestProto)
92+
if err != nil {
93+
t.Fatal(err)
94+
}
95+
defer helpers.FullClose(s)
96+
97+
buf := make([]byte, binary.MaxVarintLen64)
98+
binary.PutUvarint(buf, ^uint64(0))
99+
if _, err := s.Write(buf); err != nil {
100+
t.Fatal(err)
101+
}
102+
103+
response := &pb.RespondLatest{}
104+
if err := readMsg(ctx, s, response); err != nil {
105+
t.Fatal(err)
106+
}
107+
108+
if response.Status != pb.RespondLatest_ERR {
109+
t.Fatal("should have received an error")
110+
}
111+
}
112+
73113
func TestGetLatestProtocolRepeated(t *testing.T) {
74114
ctx, cancel := context.WithCancel(context.Background())
75115
defer cancel()
@@ -81,10 +121,10 @@ func TestGetLatestProtocolRepeated(t *testing.T) {
81121
time.Sleep(time.Millisecond * 100)
82122

83123
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
84-
h1 := newGetLatestProtocol(hosts[0], d1.Lookup)
124+
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)
85125

86126
d2 := &datastore{make(map[string][]byte)}
87-
h2 := newGetLatestProtocol(hosts[1], d2.Lookup)
127+
h2 := newGetLatestProtocol(ctx, hosts[1], d2.Lookup)
88128

89129
for i := 0; i < 10; i++ {
90130
getLatest(t, ctx, h1, h2, "key", nil)
@@ -94,7 +134,7 @@ func TestGetLatestProtocolRepeated(t *testing.T) {
94134

95135
func getLatest(t *testing.T, ctx context.Context,
96136
requester *getLatestProtocol, responder *getLatestProtocol, key string, expected []byte) {
97-
data, err := requester.Send(ctx, responder.host.ID(), key)
137+
data, err := requester.Get(ctx, responder.host.ID(), key)
98138
if err != nil {
99139
t.Fatal(err)
100140
}

‎pb/Makefile

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
PB = $(wildcard *.proto)
22
GO = $(PB:.proto=.pb.go)
33

4+
ifeq ($(OS),Windows_NT)
5+
GOPATH_DELIMITER = \;
Has a conversation. Original line has a conversation.
6+
else
7+
GOPATH_DELIMITER = :
8+
endif
9+
410
all: $(GO)
511

612
%.pb.go: %.proto
7-
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
13+
protoc --proto_path=$(GOPATH)/src$(GOPATH_DELIMITER). --gogofast_out=. $<
Has a conversation. Original line has a conversation.
814

915
clean:
1016
rm -f *.pb.go

‎pb/message.pb.go

+68-43
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pb/message.proto

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1-
syntax = "proto2";
1+
syntax = "proto3";
22

33
package namesys.pb;
44

55
message RequestLatest {
6-
optional string identifier = 1;
6+
string identifier = 1;
77
}
88

99
message RespondLatest {
10-
optional bytes data = 1;
11-
optional bool nodata = 2;
10+
bytes data = 1;
11+
StatusCode status = 2;
12+
enum StatusCode {
13+
SUCCESS = 0;
14+
NOT_FOUND = 1;
15+
ERR = 2;
16+
}
1217
}

‎pubsub.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func NewPubsubValueStore(ctx context.Context, host host.Host, cr routing.Content
8989
Validator: validator,
9090
}
9191

92-
psValueStore.getLatest = newGetLatestProtocol(host, psValueStore.getLocal)
92+
psValueStore.getLatest = newGetLatestProtocol(ctx, host, psValueStore.getLocal)
9393

9494
go psValueStore.rebroadcast(ctx)
9595

@@ -476,7 +476,7 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, sub *pubsub.Subscr
476476
}
477477
}
478478

479-
return p.getLatest.Send(ctx, pid, key)
479+
return p.getLatest.Get(ctx, pid, key)
480480
}
481481

482482
func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {

0 commit comments

Comments
 (0)
Please sign in to comment.