From db985434ecef68d711c1b92de12684d24b40ec8c Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Sun, 27 Jun 2021 18:25:39 +0200 Subject: [PATCH 1/7] correctable: watch should close if level == target Otherwise, if the target level is the highest level, then the watch will never close. --- correctable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/correctable.go b/correctable.go index b864a470..20075b7a 100644 --- a/correctable.go +++ b/correctable.go @@ -41,7 +41,7 @@ func (c *Correctable) Watch(level int) <-chan struct{} { ch := make(chan struct{}) c.mu.Lock() defer c.mu.Unlock() - if level < c.level { + if level <= c.level { close(ch) return ch } From 9ccdb1f7a298a5cf614371faac212f7ac37111a0 Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Sun, 27 Jun 2021 18:33:10 +0200 Subject: [PATCH 2/7] correctable: enable streaming This commit makes the correctable stream call type actually support streaming. --- async.go | 2 +- channel.go | 66 ++++--- .../dev/zorums_server_gorums.pb.go | 166 ++++++------------ .../gengorums/template_server.go | 16 +- correctable.go | 8 +- multicast.go | 2 +- quorumcall.go | 2 +- rpc.go | 2 +- server.go | 14 ++ unicast.go | 4 +- 10 files changed, 134 insertions(+), 148 deletions(-) diff --git a/async.go b/async.go index 2081a449..053344b2 100644 --- a/async.go +++ b/async.go @@ -44,7 +44,7 @@ func (c Configuration) AsyncCall(ctx context.Context, d QuorumCallData) *Async { continue // don't send if no msg } } - n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan) + n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan, false) } fut := &Async{c: make(chan struct{}, 1)} diff --git a/channel.go b/channel.go index cffda694..df22621a 100644 --- a/channel.go +++ b/channel.go @@ -28,33 +28,38 @@ type response struct { err error } +type responseRouter struct { + c chan<- response + streaming bool +} + type channel struct { - sendQ chan request - nodeID uint32 - mu sync.Mutex - lastError error - latency time.Duration - backoffCfg backoff.Config - rand *rand.Rand - gorumsClient ordering.GorumsClient - gorumsStream ordering.Gorums_NodeStreamClient - streamMut sync.RWMutex - streamBroken atomicFlag - parentCtx context.Context - streamCtx context.Context - cancelStream context.CancelFunc - responseRouter map[uint64]chan<- response - responseMut sync.Mutex + sendQ chan request + nodeID uint32 + mu sync.Mutex + lastError error + latency time.Duration + backoffCfg backoff.Config + rand *rand.Rand + gorumsClient ordering.GorumsClient + gorumsStream ordering.Gorums_NodeStreamClient + streamMut sync.RWMutex + streamBroken atomicFlag + parentCtx context.Context + streamCtx context.Context + cancelStream context.CancelFunc + responseRouters map[uint64]responseRouter + responseMut sync.Mutex } func newChannel(n *Node) *channel { return &channel{ - sendQ: make(chan request, n.mgr.opts.sendBuffer), - backoffCfg: n.mgr.opts.backoff, - nodeID: n.ID(), - latency: -1 * time.Second, - rand: rand.New(rand.NewSource(time.Now().UnixNano())), - responseRouter: make(map[uint64]chan<- response), + sendQ: make(chan request, n.mgr.opts.sendBuffer), + backoffCfg: n.mgr.opts.backoff, + nodeID: n.ID(), + latency: -1 * time.Second, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + responseRouters: make(map[uint64]responseRouter), } } @@ -75,21 +80,28 @@ func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { func (c *channel) routeResponse(msgID uint64, resp response) { c.responseMut.Lock() defer c.responseMut.Unlock() - if ch, ok := c.responseRouter[msgID]; ok { - ch <- resp - delete(c.responseRouter, msgID) + if router, ok := c.responseRouters[msgID]; ok { + router.c <- resp + // delete the router if we are only expecting a single message + if !router.streaming { + delete(c.responseRouters, msgID) + } } } -func (c *channel) enqueue(req request, responseChan chan<- response) { +func (c *channel) enqueue(req request, responseChan chan<- response, streaming bool) { if responseChan != nil { c.responseMut.Lock() - c.responseRouter[req.msg.Metadata.MessageID] = responseChan + c.responseRouters[req.msg.Metadata.MessageID] = responseRouter{responseChan, streaming} c.responseMut.Unlock() } c.sendQ <- req } +func (c *channel) deleteRouter(msgID uint64) { + delete(c.responseRouters, msgID) +} + func (c *channel) sendMsg(req request) (err error) { // unblock the waiting caller unless noSendWaiting is enabled defer func() { diff --git a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go index 03b7d34a..65d0badc 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go @@ -45,12 +45,12 @@ type ZorumsService interface { CorrectableCombo(ctx gorums.ServerCtx, request *Request) (response *Response, err error) CorrectableEmpty(ctx gorums.ServerCtx, request *Request) (response *emptypb.Empty, err error) CorrectableEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty) (response *Response, err error) - CorrectableStream(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - CorrectableStreamPerNodeArg(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - CorrectableStreamCustomReturnType(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - CorrectableStreamCombo(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - CorrectableStreamEmpty(ctx gorums.ServerCtx, request *Request) (response *emptypb.Empty, err error) - CorrectableStreamEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty) (response *Response, err error) + CorrectableStream(request *Request, send func(response *Response) error) error + CorrectableStreamPerNodeArg(request *Request, send func(response *Response) error) error + CorrectableStreamCustomReturnType(request *Request, send func(response *Response) error) error + CorrectableStreamCombo(request *Request, send func(response *Response) error) error + CorrectableStreamEmpty(request *Request, send func(response *emptypb.Empty) error) error + CorrectableStreamEmpty2(request *emptypb.Empty, send func(response *Response) error) error Unicast(ctx gorums.ServerCtx, request *Request) Unicast2(ctx gorums.ServerCtx, request *Request) } @@ -60,64 +60,43 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.GRPCCall(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCall", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCall(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallPerNodeArg", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallPerNodeArg(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallCustomReturnType", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallCustomReturnType(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallCombo", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallCombo(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*emptypb.Empty) defer ctx.Release() resp, err := impl.QuorumCallEmpty(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallEmpty2(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.Multicast", func(ctx gorums.ServerCtx, in *gorums.Message, _ chan<- *gorums.Message) { req := in.Message.(*Request) @@ -148,171 +127,138 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallAsync(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsyncPerNodeArg", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallAsyncPerNodeArg(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsyncCustomReturnType", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallAsyncCustomReturnType(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsyncCombo", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallAsyncCombo(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsync2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallAsync2(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsyncEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QuorumCallAsyncEmpty(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsyncEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*emptypb.Empty) defer ctx.Release() resp, err := impl.QuorumCallAsyncEmpty2(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.Correctable", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.Correctable(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectablePerNodeArg", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.CorrectablePerNodeArg(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectableCustomReturnType", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.CorrectableCustomReturnType(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectableCombo", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.CorrectableCombo(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectableEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.CorrectableEmpty(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectableEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*emptypb.Empty) defer ctx.Release() resp, err := impl.CorrectableEmpty2(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectableStream", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - resp, err := impl.CorrectableStream(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): + err := impl.CorrectableStream(req, func(resp *Response) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) } }) srv.RegisterHandler("dev.ZorumsService.CorrectableStreamPerNodeArg", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - resp, err := impl.CorrectableStreamPerNodeArg(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): + err := impl.CorrectableStreamPerNodeArg(req, func(resp *Response) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) } }) srv.RegisterHandler("dev.ZorumsService.CorrectableStreamCustomReturnType", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - resp, err := impl.CorrectableStreamCustomReturnType(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): + err := impl.CorrectableStreamCustomReturnType(req, func(resp *Response) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) } }) srv.RegisterHandler("dev.ZorumsService.CorrectableStreamCombo", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - resp, err := impl.CorrectableStreamCombo(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): + err := impl.CorrectableStreamCombo(req, func(resp *Response) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) } }) srv.RegisterHandler("dev.ZorumsService.CorrectableStreamEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - resp, err := impl.CorrectableStreamEmpty(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): + err := impl.CorrectableStreamEmpty(req, func(resp *emptypb.Empty) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) } }) srv.RegisterHandler("dev.ZorumsService.CorrectableStreamEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*emptypb.Empty) defer ctx.Release() - resp, err := impl.CorrectableStreamEmpty2(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): + err := impl.CorrectableStreamEmpty2(req, func(resp *Response) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) } }) srv.RegisterHandler("dev.ZorumsService.Unicast", func(ctx gorums.ServerCtx, in *gorums.Message, _ chan<- *gorums.Message) { diff --git a/cmd/protoc-gen-gorums/gengorums/template_server.go b/cmd/protoc-gen-gorums/gengorums/template_server.go index aedd515c..ca7f9e73 100644 --- a/cmd/protoc-gen-gorums/gengorums/template_server.go +++ b/cmd/protoc-gen-gorums/gengorums/template_server.go @@ -13,6 +13,8 @@ type {{$service}} interface { {{- range .Methods}} {{- if isOneway .}} {{.GoName}}(ctx {{$context}}, request *{{in $genFile .}}) + {{- else if correctableStream .}} + {{.GoName}}(request *{{in $genFile .}}, send func(response *{{out $genFile .}}) error) error {{- else}} {{.GoName}}(ctx {{$context}}, request *{{in $genFile .}}) (response *{{out $genFile .}}, err error) {{- end}} @@ -24,6 +26,8 @@ type {{$service}} interface { var registerInterface = ` {{$genFile := .GenFile}} {{$gorumsMessage := use "gorums.Message" .GenFile}} +{{$wrapMessage := use "gorums.WrapMessage" $genFile}} +{{$sendMessage := use "gorums.SendMessage" $genFile}} {{range .Services -}} {{$service := .GoName}} func Register{{$service}}Server(srv *{{use "gorums.Server" $genFile}}, impl {{$service}}) { @@ -33,12 +37,16 @@ func Register{{$service}}Server(srv *{{use "gorums.Server" $genFile}}, impl {{$s defer ctx.Release() {{- if isOneway .}} impl.{{.GoName}}(ctx, req) + {{- else if correctableStream .}} + err := impl.{{.GoName}}(req, func(resp *{{out $genFile .}}) error { + return {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(in.Metadata, resp, nil)) + }) + if err != nil { + {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(in.Metadata, nil, err)) + } {{- else }} resp, err := impl.{{.GoName}}(ctx, req) - select { - case finished <- {{use "gorums.WrapMessage" $genFile}}(in.Metadata, resp, err): - case <-ctx.Done(): - } + {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(in.Metadata, resp, err)) {{- end}} }) {{- end}} diff --git a/correctable.go b/correctable.go index 20075b7a..572000e7 100644 --- a/correctable.go +++ b/correctable.go @@ -95,7 +95,7 @@ func (c Configuration) CorrectableCall(ctx context.Context, d CorrectableCallDat continue // don't send if no msg } } - n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan) + n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan, d.ServerStream) } corr := &Correctable{donech: make(chan struct{}, 1)} @@ -110,6 +110,12 @@ func (c Configuration) CorrectableCall(ctx context.Context, d CorrectableCallDat replies = make(map[uint32]protoreflect.ProtoMessage) ) + if d.ServerStream { + for _, n := range c { + defer n.channel.deleteRouter(md.MessageID) + } + } + for { select { case r := <-replyChan: diff --git a/multicast.go b/multicast.go index 73af7ecd..6b50eb9a 100644 --- a/multicast.go +++ b/multicast.go @@ -27,7 +27,7 @@ func (c Configuration) Multicast(ctx context.Context, d QuorumCallData, opts ... continue // don't send if no msg } } - n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}, opts: o}, replyChan) + n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}, opts: o}, replyChan, false) sentMsgs++ } diff --git a/quorumcall.go b/quorumcall.go index 6cfddce0..d154ce49 100644 --- a/quorumcall.go +++ b/quorumcall.go @@ -31,7 +31,7 @@ func (c Configuration) QuorumCall(ctx context.Context, d QuorumCallData) (resp p continue // don't send if no msg } } - n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan) + n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan, false) } var ( diff --git a/rpc.go b/rpc.go index 797e5b7b..594d12a4 100644 --- a/rpc.go +++ b/rpc.go @@ -15,7 +15,7 @@ type CallData struct { func (n *Node) RPCCall(ctx context.Context, d CallData) (resp protoreflect.ProtoMessage, err error) { md := &ordering.Metadata{MessageID: n.mgr.getMsgID(), Method: d.Method} replyChan := make(chan response, 1) - n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: d.Message}}, replyChan) + n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: d.Message}}, replyChan, false) select { case r := <-replyChan: diff --git a/server.go b/server.go index 25ac0d5a..1e10724e 100644 --- a/server.go +++ b/server.go @@ -32,7 +32,21 @@ func newOrderingServer(opts *serverOptions) *orderingServer { return s } +// SendMessage attempts to send a message on a channel. +// +// This function should be used by generated code only. +func SendMessage(ctx context.Context, c chan<- *Message, msg *Message) error { + select { + case c <- msg: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + // WrapMessage wraps the metadata, response and error status in a gorumsMessage +// +// This function should be used by generated code only. func WrapMessage(md *ordering.Metadata, resp protoreflect.ProtoMessage, err error) *Message { errStatus, ok := status.FromError(err) if !ok { diff --git a/unicast.go b/unicast.go index 2d06f25c..4b0f1ab2 100644 --- a/unicast.go +++ b/unicast.go @@ -17,13 +17,13 @@ func (n *Node) Unicast(ctx context.Context, d CallData, opts ...CallOption) { req := request{ctx: ctx, msg: &Message{Metadata: md, Message: d.Message}, opts: o} if o.noSendWaiting { - n.channel.enqueue(req, nil) + n.channel.enqueue(req, nil, false) return // don't wait for message to be sent } // newReply must be called before adding req to sendQ replyChan := make(chan response, 1) - n.channel.enqueue(req, replyChan) + n.channel.enqueue(req, replyChan, false) // channel sends an empty reply on replyChan when the message has been sent // wait until the message has been sent <-replyChan From 496fdbad3d44543582469c1252aea44266ad6e55 Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Sun, 27 Jun 2021 18:33:23 +0200 Subject: [PATCH 3/7] Add tests for correctables --- tests/Makefile | 6 +- tests/correctable/correctable.pb.go | 216 ++++++++++++++++++ tests/correctable/correctable.proto | 22 ++ tests/correctable/correctable_gorums.pb.go | 249 +++++++++++++++++++++ tests/correctable/correctable_test.go | 104 +++++++++ 5 files changed, 595 insertions(+), 2 deletions(-) create mode 100644 tests/correctable/correctable.pb.go create mode 100644 tests/correctable/correctable.proto create mode 100644 tests/correctable/correctable_gorums.pb.go create mode 100644 tests/correctable/correctable_test.go diff --git a/tests/Makefile b/tests/Makefile index e2047565..5b8f51ca 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,7 +1,7 @@ # Tests that should be run each time -RUNTESTS := qf ordering metadata tls unresponsive dummy oneway config +RUNTESTS := qf ordering metadata tls unresponsive dummy oneway config correctable -.PHONY: all qf ordering metadata tls unresponsive dummy oneway config +.PHONY: all qf ordering metadata tls unresponsive dummy oneway config correctable all: $(RUNTESTS) @@ -21,6 +21,8 @@ oneway: oneway/oneway.pb.go oneway/oneway_gorums.pb.go config: config/config.pb.go config/config_gorums.pb.go +correctable: correctable/correctable.pb.go correctable/correctable_gorums.pb.go + %.pb.go : %.proto @protoc -I=..:. --go_out=paths=source_relative:. $< diff --git a/tests/correctable/correctable.pb.go b/tests/correctable/correctable.pb.go new file mode 100644 index 00000000..8cc2063c --- /dev/null +++ b/tests/correctable/correctable.pb.go @@ -0,0 +1,216 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.16.0 +// source: correctable/correctable.proto + +package correctable + +import ( + _ "github.com/relab/gorums" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CorrectableRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CorrectableRequest) Reset() { + *x = CorrectableRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_correctable_correctable_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CorrectableRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CorrectableRequest) ProtoMessage() {} + +func (x *CorrectableRequest) ProtoReflect() protoreflect.Message { + mi := &file_correctable_correctable_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CorrectableRequest.ProtoReflect.Descriptor instead. +func (*CorrectableRequest) Descriptor() ([]byte, []int) { + return file_correctable_correctable_proto_rawDescGZIP(), []int{0} +} + +type CorrectableResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Level int32 `protobuf:"varint,1,opt,name=Level,proto3" json:"Level,omitempty"` +} + +func (x *CorrectableResponse) Reset() { + *x = CorrectableResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_correctable_correctable_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CorrectableResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CorrectableResponse) ProtoMessage() {} + +func (x *CorrectableResponse) ProtoReflect() protoreflect.Message { + mi := &file_correctable_correctable_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CorrectableResponse.ProtoReflect.Descriptor instead. +func (*CorrectableResponse) Descriptor() ([]byte, []int) { + return file_correctable_correctable_proto_rawDescGZIP(), []int{1} +} + +func (x *CorrectableResponse) GetLevel() int32 { + if x != nil { + return x.Level + } + return 0 +} + +var File_correctable_correctable_proto protoreflect.FileDescriptor + +var file_correctable_correctable_proto_rawDesc = []byte{ + 0x0a, 0x1d, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2f, 0x63, 0x6f, + 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0b, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x1a, 0x0c, 0x67, 0x6f, + 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x14, 0x0a, 0x12, 0x43, 0x6f, + 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4c, 0x65, 0x76, 0x65, 0x6c, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x32, 0xc9, 0x01, + 0x0a, 0x0f, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x65, 0x73, + 0x74, 0x12, 0x56, 0x0a, 0x0b, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x12, 0x1f, 0x2e, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x43, + 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x20, 0x2e, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, + 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x12, 0x5e, 0x0a, 0x11, 0x43, 0x6f, 0x72, + 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1f, + 0x2e, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x43, 0x6f, 0x72, + 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x43, 0x6f, + 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x30, 0x01, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, 0x6f, + 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x72, 0x72, 0x65, + 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_correctable_correctable_proto_rawDescOnce sync.Once + file_correctable_correctable_proto_rawDescData = file_correctable_correctable_proto_rawDesc +) + +func file_correctable_correctable_proto_rawDescGZIP() []byte { + file_correctable_correctable_proto_rawDescOnce.Do(func() { + file_correctable_correctable_proto_rawDescData = protoimpl.X.CompressGZIP(file_correctable_correctable_proto_rawDescData) + }) + return file_correctable_correctable_proto_rawDescData +} + +var file_correctable_correctable_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_correctable_correctable_proto_goTypes = []interface{}{ + (*CorrectableRequest)(nil), // 0: correctable.CorrectableRequest + (*CorrectableResponse)(nil), // 1: correctable.CorrectableResponse +} +var file_correctable_correctable_proto_depIdxs = []int32{ + 0, // 0: correctable.CorrectableTest.Correctable:input_type -> correctable.CorrectableRequest + 0, // 1: correctable.CorrectableTest.CorrectableStream:input_type -> correctable.CorrectableRequest + 1, // 2: correctable.CorrectableTest.Correctable:output_type -> correctable.CorrectableResponse + 1, // 3: correctable.CorrectableTest.CorrectableStream:output_type -> correctable.CorrectableResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_correctable_correctable_proto_init() } +func file_correctable_correctable_proto_init() { + if File_correctable_correctable_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_correctable_correctable_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CorrectableRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_correctable_correctable_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CorrectableResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_correctable_correctable_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_correctable_correctable_proto_goTypes, + DependencyIndexes: file_correctable_correctable_proto_depIdxs, + MessageInfos: file_correctable_correctable_proto_msgTypes, + }.Build() + File_correctable_correctable_proto = out.File + file_correctable_correctable_proto_rawDesc = nil + file_correctable_correctable_proto_goTypes = nil + file_correctable_correctable_proto_depIdxs = nil +} diff --git a/tests/correctable/correctable.proto b/tests/correctable/correctable.proto new file mode 100644 index 00000000..0476b34e --- /dev/null +++ b/tests/correctable/correctable.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; +package correctable; + +option go_package = "github.com/relab/gorums/tests/correctable"; + +import "gorums.proto"; + +service CorrectableTest{ + rpc Correctable(CorrectableRequest) returns (CorrectableResponse) { + option (gorums.correctable) = true; + } + rpc CorrectableStream(CorrectableRequest) returns (stream CorrectableResponse) { + option (gorums.correctable) = true; + } +} + +message CorrectableRequest {} + +message CorrectableResponse { + int32 Level = 1; +} + diff --git a/tests/correctable/correctable_gorums.pb.go b/tests/correctable/correctable_gorums.pb.go new file mode 100644 index 00000000..6228aee2 --- /dev/null +++ b/tests/correctable/correctable_gorums.pb.go @@ -0,0 +1,249 @@ +// Code generated by protoc-gen-gorums. DO NOT EDIT. +// versions: +// protoc-gen-gorums v0.5.0-devel +// protoc v3.16.0 +// source: correctable/correctable.proto + +package correctable + +import ( + context "context" + fmt "fmt" + gorums "github.com/relab/gorums" + encoding "google.golang.org/grpc/encoding" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = gorums.EnforceVersion(5 - gorums.MinVersion) + // Verify that the gorums runtime is sufficiently up-to-date. + _ = gorums.EnforceVersion(gorums.MaxVersion - 5) +) + +// A Configuration represents a static set of nodes on which quorum remote +// procedure calls may be invoked. +type Configuration struct { + gorums.Configuration + qspec QuorumSpec +} + +// Nodes returns a slice of each available node. IDs are returned in the same +// order as they were provided in the creation of the Manager. +func (c *Configuration) Nodes() []*Node { + nodes := make([]*Node, 0, c.Size()) + for _, n := range c.Configuration { + nodes = append(nodes, &Node{n}) + } + return nodes +} + +// And returns a NodeListOption that can be used to create a new configuration combining c and d. +func (c Configuration) And(d *Configuration) gorums.NodeListOption { + return c.Configuration.And(d.Configuration) +} + +// Except returns a NodeListOption that can be used to create a new configuration +// from c without the nodes in rm. +func (c Configuration) Except(rm *Configuration) gorums.NodeListOption { + return c.Configuration.Except(rm.Configuration) +} + +func init() { + if encoding.GetCodec(gorums.ContentSubtype) == nil { + encoding.RegisterCodec(gorums.NewCodec()) + } +} + +// Manager maintains a connection pool of nodes on +// which quorum calls can be performed. +type Manager struct { + *gorums.Manager +} + +// NewManager returns a new Manager for managing connection to nodes added +// to the manager. This function accepts manager options used to configure +// various aspects of the manager. +func NewManager(opts ...gorums.ManagerOption) (mgr *Manager) { + mgr = &Manager{} + mgr.Manager = gorums.NewManager(opts...) + return mgr +} + +// NewConfiguration returns a configuration based on the provided list of nodes (required) +// and an optional quorum specification. The QuorumSpec is necessary for call types that +// must process replies. For configurations only used for unicast or multicast call types, +// a QuorumSpec is not needed. The QuorumSpec interface is also a ConfigOption. +// Nodes can be supplied using WithNodeMap or WithNodeList, or WithNodeIDs. +// A new configuration can also be created from an existing configuration, +// using the And, WithNewNodes, Except, and WithoutNodes methods. +func (m *Manager) NewConfiguration(opts ...gorums.ConfigOption) (c *Configuration, err error) { + if len(opts) < 1 || len(opts) > 2 { + return nil, fmt.Errorf("wrong number of options: %d", len(opts)) + } + c = &Configuration{} + for _, opt := range opts { + switch v := opt.(type) { + case gorums.NodeListOption: + c.Configuration, err = gorums.NewConfiguration(m.Manager, v) + if err != nil { + return nil, err + } + case QuorumSpec: + // Must be last since v may match QuorumSpec if it is interface{} + c.qspec = v + default: + return nil, fmt.Errorf("unknown option type: %v", v) + } + } + // return an error if the QuorumSpec interface is not empty and no implementation was provided. + var test interface{} = struct{}{} + if _, empty := test.(QuorumSpec); !empty && c.qspec == nil { + return nil, fmt.Errorf("missing required QuorumSpec") + } + return c, nil +} + +// Nodes returns a slice of available nodes on this manager. +// IDs are returned in the order they were added at creation of the manager. +func (m *Manager) Nodes() []*Node { + gorumsNodes := m.Manager.Nodes() + nodes := make([]*Node, 0, len(gorumsNodes)) + for _, n := range gorumsNodes { + nodes = append(nodes, &Node{n}) + } + return nodes +} + +type Node struct { + *gorums.Node +} + +// Correctable asynchronously invokes a correctable quorum call on each node +// in configuration c and returns a CorrectableCorrectableResponse, which can be used +// to inspect any replies or errors when available. +func (c *Configuration) Correctable(ctx context.Context, in *CorrectableRequest) *CorrectableCorrectableResponse { + cd := gorums.CorrectableCallData{ + Message: in, + Method: "correctable.CorrectableTest.Correctable", + ServerStream: false, + } + cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, int, bool) { + r := make(map[uint32]*CorrectableResponse, len(replies)) + for k, v := range replies { + r[k] = v.(*CorrectableResponse) + } + return c.qspec.CorrectableQF(req.(*CorrectableRequest), r) + } + + corr := c.Configuration.CorrectableCall(ctx, cd) + return &CorrectableCorrectableResponse{corr} +} + +// CorrectableStream asynchronously invokes a correctable quorum call on each node +// in configuration c and returns a CorrectableStreamCorrectableResponse, which can be used +// to inspect any replies or errors when available. +// This method supports server-side preliminary replies (correctable stream). +func (c *Configuration) CorrectableStream(ctx context.Context, in *CorrectableRequest) *CorrectableStreamCorrectableResponse { + cd := gorums.CorrectableCallData{ + Message: in, + Method: "correctable.CorrectableTest.CorrectableStream", + ServerStream: true, + } + cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, int, bool) { + r := make(map[uint32]*CorrectableResponse, len(replies)) + for k, v := range replies { + r[k] = v.(*CorrectableResponse) + } + return c.qspec.CorrectableStreamQF(req.(*CorrectableRequest), r) + } + + corr := c.Configuration.CorrectableCall(ctx, cd) + return &CorrectableStreamCorrectableResponse{corr} +} + +// QuorumSpec is the interface of quorum functions for CorrectableTest. +type QuorumSpec interface { + gorums.ConfigOption + + // CorrectableQF is the quorum function for the Correctable + // correctable quorum call method. The in parameter is the request object + // supplied to the Correctable method at call time, and may or may not + // be used by the quorum function. If the in parameter is not needed + // you should implement your quorum function with '_ *CorrectableRequest'. + CorrectableQF(in *CorrectableRequest, replies map[uint32]*CorrectableResponse) (*CorrectableResponse, int, bool) + + // CorrectableStreamQF is the quorum function for the CorrectableStream + // correctable stream quorum call method. The in parameter is the request object + // supplied to the CorrectableStream method at call time, and may or may not + // be used by the quorum function. If the in parameter is not needed + // you should implement your quorum function with '_ *CorrectableRequest'. + CorrectableStreamQF(in *CorrectableRequest, replies map[uint32]*CorrectableResponse) (*CorrectableResponse, int, bool) +} + +// CorrectableTest is the server-side API for the CorrectableTest Service +type CorrectableTest interface { + Correctable(ctx gorums.ServerCtx, request *CorrectableRequest) (response *CorrectableResponse, err error) + CorrectableStream(request *CorrectableRequest, send func(response *CorrectableResponse) error) error +} + +func RegisterCorrectableTestServer(srv *gorums.Server, impl CorrectableTest) { + srv.RegisterHandler("correctable.CorrectableTest.Correctable", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { + req := in.Message.(*CorrectableRequest) + defer ctx.Release() + resp, err := impl.Correctable(ctx, req) + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) + }) + srv.RegisterHandler("correctable.CorrectableTest.CorrectableStream", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { + req := in.Message.(*CorrectableRequest) + defer ctx.Release() + err := impl.CorrectableStream(req, func(resp *CorrectableResponse) error { + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + }) + if err != nil { + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) + } + }) +} + +type internalCorrectableResponse struct { + nid uint32 + reply *CorrectableResponse + err error +} + +// CorrectableCorrectableResponse is a correctable object for processing replies. +type CorrectableCorrectableResponse struct { + *gorums.Correctable +} + +// Get returns the reply, level and any error associated with the +// called method. The method does not block until a (possibly +// intermediate) reply or error is available. Level is set to LevelNotSet if no +// reply has yet been received. The Done or Watch methods should be used to +// ensure that a reply is available. +func (c *CorrectableCorrectableResponse) Get() (*CorrectableResponse, int, error) { + resp, level, err := c.Correctable.Get() + if err != nil { + return nil, level, err + } + return resp.(*CorrectableResponse), level, err +} + +// CorrectableStreamCorrectableResponse is a correctable object for processing replies. +type CorrectableStreamCorrectableResponse struct { + *gorums.Correctable +} + +// Get returns the reply, level and any error associated with the +// called method. The method does not block until a (possibly +// intermediate) reply or error is available. Level is set to LevelNotSet if no +// reply has yet been received. The Done or Watch methods should be used to +// ensure that a reply is available. +func (c *CorrectableStreamCorrectableResponse) Get() (*CorrectableResponse, int, error) { + resp, level, err := c.Correctable.Get() + if err != nil { + return nil, level, err + } + return resp.(*CorrectableResponse), level, err +} diff --git a/tests/correctable/correctable_test.go b/tests/correctable/correctable_test.go new file mode 100644 index 00000000..24a0c63a --- /dev/null +++ b/tests/correctable/correctable_test.go @@ -0,0 +1,104 @@ +package correctable + +import ( + "context" + "testing" + "time" + + "github.com/relab/gorums" + "google.golang.org/grpc" +) + +// run a test on a correctable call. +// n is the number of replicas, and div is a divider. +// the target level is n, and the level is calculated by the quorum function +// by dividing the sum of levels from the servers with the divider. +func run(t *testing.T, n int, div int, corr func(*Configuration) *gorums.Correctable) { + addrs, teardown := gorums.TestSetup(t, n, func(i int) gorums.ServerIface { + gorumsSrv := gorums.NewServer() + RegisterCorrectableTestServer(gorumsSrv, &testSrv{n}) + return gorumsSrv + }) + defer teardown() + + mgr := NewManager(gorums.WithDialTimeout(time.Second), gorums.WithGrpcDialOptions( + grpc.WithInsecure(), grpc.WithBlock(), + )) + + cfg, err := mgr.NewConfiguration(qspec{div, n}, gorums.WithNodeList(addrs)) + if err != nil { + t.Fatal(err) + } + + res := corr(cfg) + + for i := 1; i <= n; i++ { + <-res.Watch(i) + } + + <-res.Done() +} + +func TestCorrectable(t *testing.T) { + run(t, 4, 1, func(c *Configuration) *gorums.Correctable { + corr := c.Correctable(context.Background(), &CorrectableRequest{}) + return corr.Correctable + }) +} + +func TestCorrectableStream(t *testing.T) { + run(t, 4, 4, func(c *Configuration) *gorums.Correctable { + corr := c.CorrectableStream(context.Background(), &CorrectableRequest{}) + return corr.Correctable + }) +} + +type qspec struct { + div int + doneLevel int +} + +func (q qspec) q(replies map[uint32]*CorrectableResponse) (*CorrectableResponse, int, bool) { + sum := 0 + for _, reply := range replies { + sum += int(reply.Level) + } + level := sum / q.div + return &CorrectableResponse{Level: int32(level)}, level, level >= q.doneLevel +} + +// CorrectableStreamQF is the quorum function for the CorrectableStream +// correctable stream quorum call method. The in parameter is the request object +// supplied to the CorrectableStream method at call time, and may or may not +// be used by the quorum function. If the in parameter is not needed +// you should implement your quorum function with '_ *CorrectableRequest'. +func (q qspec) CorrectableStreamQF(_ *CorrectableRequest, replies map[uint32]*CorrectableResponse) (*CorrectableResponse, int, bool) { + return q.q(replies) +} + +// CorrectableQF is the quorum function for the Correctable +// correctable quorum call method. The in parameter is the request object +// supplied to the Correctable method at call time, and may or may not +// be used by the quorum function. If the in parameter is not needed +// you should implement your quorum function with '_ *CorrectableRequest'. +func (q qspec) CorrectableQF(_ *CorrectableRequest, replies map[uint32]*CorrectableResponse) (*CorrectableResponse, int, bool) { + return q.q(replies) +} + +type testSrv struct { + n int +} + +func (srv testSrv) CorrectableStream(request *CorrectableRequest, send func(response *CorrectableResponse) error) error { + for i := 0; i < srv.n; i++ { + err := send(&CorrectableResponse{Level: int32(i + 1)}) + if err != nil { + return err + } + } + return nil +} + +func (srv testSrv) Correctable(ctx gorums.ServerCtx, request *CorrectableRequest) (response *CorrectableResponse, err error) { + return &CorrectableResponse{Level: 1}, nil +} From 2a947429ae3f43d1dba5cae8e8e8c6f8d58f2009 Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Mon, 28 Jun 2021 11:08:45 +0200 Subject: [PATCH 4/7] add the context parameter to correctable handlers --- .../dev/zorums_server_gorums.pb.go | 24 +++++++++---------- .../gengorums/template_server.go | 4 ++-- tests/config/config_gorums.pb.go | 5 +--- tests/correctable/correctable_gorums.pb.go | 4 ++-- tests/correctable/correctable_test.go | 4 ++-- tests/dummy/dummy_gorums.pb.go | 5 +--- tests/metadata/metadata_gorums.pb.go | 10 ++------ tests/ordering/order_gorums.pb.go | 15 +++--------- tests/qf/qf_gorums.pb.go | 10 ++------ tests/tls/tls_gorums.pb.go | 5 +--- tests/unresponsive/unresponsive_gorums.pb.go | 5 +--- 11 files changed, 29 insertions(+), 62 deletions(-) diff --git a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go index 65d0badc..2194d11a 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go @@ -45,12 +45,12 @@ type ZorumsService interface { CorrectableCombo(ctx gorums.ServerCtx, request *Request) (response *Response, err error) CorrectableEmpty(ctx gorums.ServerCtx, request *Request) (response *emptypb.Empty, err error) CorrectableEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty) (response *Response, err error) - CorrectableStream(request *Request, send func(response *Response) error) error - CorrectableStreamPerNodeArg(request *Request, send func(response *Response) error) error - CorrectableStreamCustomReturnType(request *Request, send func(response *Response) error) error - CorrectableStreamCombo(request *Request, send func(response *Response) error) error - CorrectableStreamEmpty(request *Request, send func(response *emptypb.Empty) error) error - CorrectableStreamEmpty2(request *emptypb.Empty, send func(response *Response) error) error + CorrectableStream(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error + CorrectableStreamPerNodeArg(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error + CorrectableStreamCustomReturnType(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error + CorrectableStreamCombo(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error + CorrectableStreamEmpty(ctx gorums.ServerCtx, request *Request, send func(response *emptypb.Empty) error) error + CorrectableStreamEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty, send func(response *Response) error) error Unicast(ctx gorums.ServerCtx, request *Request) Unicast2(ctx gorums.ServerCtx, request *Request) } @@ -204,7 +204,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStream", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - err := impl.CorrectableStream(req, func(resp *Response) error { + err := impl.CorrectableStream(ctx, req, func(resp *Response) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { @@ -214,7 +214,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStreamPerNodeArg", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - err := impl.CorrectableStreamPerNodeArg(req, func(resp *Response) error { + err := impl.CorrectableStreamPerNodeArg(ctx, req, func(resp *Response) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { @@ -224,7 +224,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStreamCustomReturnType", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - err := impl.CorrectableStreamCustomReturnType(req, func(resp *Response) error { + err := impl.CorrectableStreamCustomReturnType(ctx, req, func(resp *Response) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { @@ -234,7 +234,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStreamCombo", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - err := impl.CorrectableStreamCombo(req, func(resp *Response) error { + err := impl.CorrectableStreamCombo(ctx, req, func(resp *Response) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { @@ -244,7 +244,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStreamEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - err := impl.CorrectableStreamEmpty(req, func(resp *emptypb.Empty) error { + err := impl.CorrectableStreamEmpty(ctx, req, func(resp *emptypb.Empty) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { @@ -254,7 +254,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStreamEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*emptypb.Empty) defer ctx.Release() - err := impl.CorrectableStreamEmpty2(req, func(resp *Response) error { + err := impl.CorrectableStreamEmpty2(ctx, req, func(resp *Response) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { diff --git a/cmd/protoc-gen-gorums/gengorums/template_server.go b/cmd/protoc-gen-gorums/gengorums/template_server.go index ca7f9e73..ad6054a2 100644 --- a/cmd/protoc-gen-gorums/gengorums/template_server.go +++ b/cmd/protoc-gen-gorums/gengorums/template_server.go @@ -14,7 +14,7 @@ type {{$service}} interface { {{- if isOneway .}} {{.GoName}}(ctx {{$context}}, request *{{in $genFile .}}) {{- else if correctableStream .}} - {{.GoName}}(request *{{in $genFile .}}, send func(response *{{out $genFile .}}) error) error + {{.GoName}}(ctx {{$context}}, request *{{in $genFile .}}, send func(response *{{out $genFile .}}) error) error {{- else}} {{.GoName}}(ctx {{$context}}, request *{{in $genFile .}}) (response *{{out $genFile .}}, err error) {{- end}} @@ -38,7 +38,7 @@ func Register{{$service}}Server(srv *{{use "gorums.Server" $genFile}}, impl {{$s {{- if isOneway .}} impl.{{.GoName}}(ctx, req) {{- else if correctableStream .}} - err := impl.{{.GoName}}(req, func(resp *{{out $genFile .}}) error { + err := impl.{{.GoName}}(ctx, req, func(resp *{{out $genFile .}}) error { return {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(in.Metadata, resp, nil)) }) if err != nil { diff --git a/tests/config/config_gorums.pb.go b/tests/config/config_gorums.pb.go index a0c4d7b7..959472d8 100644 --- a/tests/config/config_gorums.pb.go +++ b/tests/config/config_gorums.pb.go @@ -163,10 +163,7 @@ func RegisterConfigTestServer(srv *gorums.Server, impl ConfigTest) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.Config(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } diff --git a/tests/correctable/correctable_gorums.pb.go b/tests/correctable/correctable_gorums.pb.go index 6228aee2..2f5d5589 100644 --- a/tests/correctable/correctable_gorums.pb.go +++ b/tests/correctable/correctable_gorums.pb.go @@ -184,7 +184,7 @@ type QuorumSpec interface { // CorrectableTest is the server-side API for the CorrectableTest Service type CorrectableTest interface { Correctable(ctx gorums.ServerCtx, request *CorrectableRequest) (response *CorrectableResponse, err error) - CorrectableStream(request *CorrectableRequest, send func(response *CorrectableResponse) error) error + CorrectableStream(ctx gorums.ServerCtx, request *CorrectableRequest, send func(response *CorrectableResponse) error) error } func RegisterCorrectableTestServer(srv *gorums.Server, impl CorrectableTest) { @@ -197,7 +197,7 @@ func RegisterCorrectableTestServer(srv *gorums.Server, impl CorrectableTest) { srv.RegisterHandler("correctable.CorrectableTest.CorrectableStream", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*CorrectableRequest) defer ctx.Release() - err := impl.CorrectableStream(req, func(resp *CorrectableResponse) error { + err := impl.CorrectableStream(ctx, req, func(resp *CorrectableResponse) error { return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) }) if err != nil { diff --git a/tests/correctable/correctable_test.go b/tests/correctable/correctable_test.go index 24a0c63a..0821fd03 100644 --- a/tests/correctable/correctable_test.go +++ b/tests/correctable/correctable_test.go @@ -89,7 +89,7 @@ type testSrv struct { n int } -func (srv testSrv) CorrectableStream(request *CorrectableRequest, send func(response *CorrectableResponse) error) error { +func (srv testSrv) CorrectableStream(_ gorums.ServerCtx, request *CorrectableRequest, send func(response *CorrectableResponse) error) error { for i := 0; i < srv.n; i++ { err := send(&CorrectableResponse{Level: int32(i + 1)}) if err != nil { @@ -99,6 +99,6 @@ func (srv testSrv) CorrectableStream(request *CorrectableRequest, send func(resp return nil } -func (srv testSrv) Correctable(ctx gorums.ServerCtx, request *CorrectableRequest) (response *CorrectableResponse, err error) { +func (srv testSrv) Correctable(_ gorums.ServerCtx, request *CorrectableRequest) (response *CorrectableResponse, err error) { return &CorrectableResponse{Level: 1}, nil } diff --git a/tests/dummy/dummy_gorums.pb.go b/tests/dummy/dummy_gorums.pb.go index 1f0a33b6..12f19e7d 100644 --- a/tests/dummy/dummy_gorums.pb.go +++ b/tests/dummy/dummy_gorums.pb.go @@ -148,9 +148,6 @@ func RegisterDummyServer(srv *gorums.Server, impl Dummy) { req := in.Message.(*Empty) defer ctx.Release() resp, err := impl.Test(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } diff --git a/tests/metadata/metadata_gorums.pb.go b/tests/metadata/metadata_gorums.pb.go index c2fc30b4..093eebf9 100644 --- a/tests/metadata/metadata_gorums.pb.go +++ b/tests/metadata/metadata_gorums.pb.go @@ -163,18 +163,12 @@ func RegisterMetadataTestServer(srv *gorums.Server, impl MetadataTest) { req := in.Message.(*emptypb.Empty) defer ctx.Release() resp, err := impl.IDFromMD(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("metadata.MetadataTest.WhatIP", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*emptypb.Empty) defer ctx.Release() resp, err := impl.WhatIP(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } diff --git a/tests/ordering/order_gorums.pb.go b/tests/ordering/order_gorums.pb.go index e71c4855..7c1b5463 100644 --- a/tests/ordering/order_gorums.pb.go +++ b/tests/ordering/order_gorums.pb.go @@ -207,28 +207,19 @@ func RegisterGorumsTestServer(srv *gorums.Server, impl GorumsTest) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QC(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("ordering.GorumsTest.QCAsync", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.QCAsync(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("ordering.GorumsTest.UnaryRPC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.UnaryRPC(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } diff --git a/tests/qf/qf_gorums.pb.go b/tests/qf/qf_gorums.pb.go index 50a1e65e..725a8529 100644 --- a/tests/qf/qf_gorums.pb.go +++ b/tests/qf/qf_gorums.pb.go @@ -193,19 +193,13 @@ func RegisterQuorumFunctionServer(srv *gorums.Server, impl QuorumFunction) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.UseReq(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("qf.QuorumFunction.IgnoreReq", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.IgnoreReq(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } diff --git a/tests/tls/tls_gorums.pb.go b/tests/tls/tls_gorums.pb.go index 21eea860..88d7f454 100644 --- a/tests/tls/tls_gorums.pb.go +++ b/tests/tls/tls_gorums.pb.go @@ -148,9 +148,6 @@ func RegisterTLSServer(srv *gorums.Server, impl TLS) { req := in.Message.(*Request) defer ctx.Release() resp, err := impl.TestTLS(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } diff --git a/tests/unresponsive/unresponsive_gorums.pb.go b/tests/unresponsive/unresponsive_gorums.pb.go index 307efb61..0f05e142 100644 --- a/tests/unresponsive/unresponsive_gorums.pb.go +++ b/tests/unresponsive/unresponsive_gorums.pb.go @@ -148,9 +148,6 @@ func RegisterUnresponsiveServer(srv *gorums.Server, impl Unresponsive) { req := in.Message.(*Empty) defer ctx.Release() resp, err := impl.TestUnresponsive(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) } From 04db53bbf62833836747fffa1ee49c8b3a25e9a6 Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Mon, 28 Jun 2021 11:23:18 +0200 Subject: [PATCH 5/7] regenerate benchmark and example protos --- benchmark/benchmark_gorums.pb.go | 35 +++++---------------- examples/storage/proto/storage_gorums.pb.go | 20 +++--------- 2 files changed, 11 insertions(+), 44 deletions(-) diff --git a/benchmark/benchmark_gorums.pb.go b/benchmark/benchmark_gorums.pb.go index b65efbf5..0221cee3 100644 --- a/benchmark/benchmark_gorums.pb.go +++ b/benchmark/benchmark_gorums.pb.go @@ -356,64 +356,43 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { req := in.Message.(*StartRequest) defer ctx.Release() resp, err := impl.StartServerBenchmark(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.StopServerBenchmark", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*StopRequest) defer ctx.Release() resp, err := impl.StopServerBenchmark(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.StartBenchmark", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*StartRequest) defer ctx.Release() resp, err := impl.StartBenchmark(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.StopBenchmark", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*StopRequest) defer ctx.Release() resp, err := impl.StopBenchmark(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.QuorumCall", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Echo) defer ctx.Release() resp, err := impl.QuorumCall(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.AsyncQuorumCall", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Echo) defer ctx.Release() resp, err := impl.AsyncQuorumCall(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.SlowServer", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Echo) defer ctx.Release() resp, err := impl.SlowServer(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("benchmark.Benchmark.Multicast", func(ctx gorums.ServerCtx, in *gorums.Message, _ chan<- *gorums.Message) { req := in.Message.(*TimedMsg) diff --git a/examples/storage/proto/storage_gorums.pb.go b/examples/storage/proto/storage_gorums.pb.go index 6ea3ffb3..96f19176 100644 --- a/examples/storage/proto/storage_gorums.pb.go +++ b/examples/storage/proto/storage_gorums.pb.go @@ -239,37 +239,25 @@ func RegisterStorageServer(srv *gorums.Server, impl Storage) { req := in.Message.(*ReadRequest) defer ctx.Release() resp, err := impl.ReadRPC(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("storage.Storage.WriteRPC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*WriteRequest) defer ctx.Release() resp, err := impl.WriteRPC(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("storage.Storage.ReadQC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*ReadRequest) defer ctx.Release() resp, err := impl.ReadQC(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("storage.Storage.WriteQC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*WriteRequest) defer ctx.Release() resp, err := impl.WriteQC(ctx, req) - select { - case finished <- gorums.WrapMessage(in.Metadata, resp, err): - case <-ctx.Done(): - } + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("storage.Storage.WriteMulticast", func(ctx gorums.ServerCtx, in *gorums.Message, _ chan<- *gorums.Message) { req := in.Message.(*WriteRequest) From d09bcc35ad9459b00a817e21cd2aa79e3afb8022 Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Mon, 28 Jun 2021 14:52:24 +0200 Subject: [PATCH 6/7] prevent correctable test from blocking forever --- tests/correctable/correctable_test.go | 28 +++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/correctable/correctable_test.go b/tests/correctable/correctable_test.go index 0821fd03..41fdaa8f 100644 --- a/tests/correctable/correctable_test.go +++ b/tests/correctable/correctable_test.go @@ -13,7 +13,7 @@ import ( // n is the number of replicas, and div is a divider. // the target level is n, and the level is calculated by the quorum function // by dividing the sum of levels from the servers with the divider. -func run(t *testing.T, n int, div int, corr func(*Configuration) *gorums.Correctable) { +func run(t *testing.T, n int, div int, corr func(context.Context, *Configuration) *gorums.Correctable) { addrs, teardown := gorums.TestSetup(t, n, func(i int) gorums.ServerIface { gorumsSrv := gorums.NewServer() RegisterCorrectableTestServer(gorumsSrv, &testSrv{n}) @@ -30,25 +30,37 @@ func run(t *testing.T, n int, div int, corr func(*Configuration) *gorums.Correct t.Fatal(err) } - res := corr(cfg) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + res := corr(ctx, cfg) + + donech := res.Done() for i := 1; i <= n; i++ { - <-res.Watch(i) + select { + case <-res.Watch(i): + case <-donech: + } } - <-res.Done() + <-donech + + _, _, err = res.Get() + if err != nil { + t.Error(err) + } } func TestCorrectable(t *testing.T) { - run(t, 4, 1, func(c *Configuration) *gorums.Correctable { - corr := c.Correctable(context.Background(), &CorrectableRequest{}) + run(t, 4, 1, func(ctx context.Context, c *Configuration) *gorums.Correctable { + corr := c.Correctable(ctx, &CorrectableRequest{}) return corr.Correctable }) } func TestCorrectableStream(t *testing.T) { - run(t, 4, 4, func(c *Configuration) *gorums.Correctable { - corr := c.CorrectableStream(context.Background(), &CorrectableRequest{}) + run(t, 4, 4, func(ctx context.Context, c *Configuration) *gorums.Correctable { + corr := c.CorrectableStream(ctx, &CorrectableRequest{}) return corr.Correctable }) } From 009c0f58fd93a142656e73801e02a7418c77a641 Mon Sep 17 00:00:00 2001 From: John Ingve Olsen Date: Tue, 29 Jun 2021 13:49:09 +0200 Subject: [PATCH 7/7] fix data race --- .../dev/zorums_server_gorums.pb.go | 26 ++++++++++++++----- .../gengorums/template_server.go | 4 ++- tests/correctable/correctable_gorums.pb.go | 6 ++++- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go index 2194d11a..62a4b23a 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go @@ -8,6 +8,8 @@ package dev import ( gorums "github.com/relab/gorums" + ordering "github.com/relab/gorums/ordering" + proto "google.golang.org/protobuf/proto" emptypb "google.golang.org/protobuf/types/known/emptypb" ) @@ -205,7 +207,9 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() err := impl.CorrectableStream(ctx, req, func(resp *Response) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) @@ -215,7 +219,9 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() err := impl.CorrectableStreamPerNodeArg(ctx, req, func(resp *Response) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) @@ -225,7 +231,9 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() err := impl.CorrectableStreamCustomReturnType(ctx, req, func(resp *Response) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) @@ -235,7 +243,9 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() err := impl.CorrectableStreamCombo(ctx, req, func(resp *Response) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) @@ -245,7 +255,9 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*Request) defer ctx.Release() err := impl.CorrectableStreamEmpty(ctx, req, func(resp *emptypb.Empty) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) @@ -255,7 +267,9 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { req := in.Message.(*emptypb.Empty) defer ctx.Release() err := impl.CorrectableStreamEmpty2(ctx, req, func(resp *Response) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err)) diff --git a/cmd/protoc-gen-gorums/gengorums/template_server.go b/cmd/protoc-gen-gorums/gengorums/template_server.go index ad6054a2..b6bd0e66 100644 --- a/cmd/protoc-gen-gorums/gengorums/template_server.go +++ b/cmd/protoc-gen-gorums/gengorums/template_server.go @@ -39,7 +39,9 @@ func Register{{$service}}Server(srv *{{use "gorums.Server" $genFile}}, impl {{$s impl.{{.GoName}}(ctx, req) {{- else if correctableStream .}} err := impl.{{.GoName}}(ctx, req, func(resp *{{out $genFile .}}) error { - return {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := {{use "proto.Clone" $genFile}}(in.Metadata) + return {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(md.(*{{use "ordering.Metadata" $genFile}}), resp, nil)) }) if err != nil { {{$sendMessage}}(ctx, finished, {{$wrapMessage}}(in.Metadata, nil, err)) diff --git a/tests/correctable/correctable_gorums.pb.go b/tests/correctable/correctable_gorums.pb.go index 2f5d5589..308b6fd8 100644 --- a/tests/correctable/correctable_gorums.pb.go +++ b/tests/correctable/correctable_gorums.pb.go @@ -10,7 +10,9 @@ import ( context "context" fmt "fmt" gorums "github.com/relab/gorums" + ordering "github.com/relab/gorums/ordering" encoding "google.golang.org/grpc/encoding" + proto "google.golang.org/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" ) @@ -198,7 +200,9 @@ func RegisterCorrectableTestServer(srv *gorums.Server, impl CorrectableTest) { req := in.Message.(*CorrectableRequest) defer ctx.Release() err := impl.CorrectableStream(ctx, req, func(resp *CorrectableResponse) error { - return gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, nil)) + // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg + md := proto.Clone(in.Metadata) + return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) }) if err != nil { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, nil, err))