Skip to content

Commit f9ef012

Browse files
fix(broadcast): client rpc got wrong method
1 parent 7816a96 commit f9ef012

File tree

4 files changed

+32
-67
lines changed

4 files changed

+32
-67
lines changed

broadcast.go

+3-26
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,6 @@ func (srv *broadcastServer) handleClientResponses() {
9797
func (srv *broadcastServer) handle(response responseMsg) {
9898
broadcastID := response.getBroadcastID()
9999
req, handled := srv.clientReqs.GetSet(broadcastID)
100-
//srv.clientReqsMutex.Lock()
101-
//defer srv.clientReqsMutex.Unlock()
102-
//req, ok := srv.clientReqs[response.getBroadcastID()]
103-
//if !ok {
104-
// // this server has not received a request directly from a client
105-
// // hence, the response should be ignored
106-
// return
107-
//} else if req.status == unhandled {
108-
// // first time it is handled
109-
// req.status = response.getType()
110-
//} else if req.status == clientResponse || req.status == timeout {
111-
// // already handled, but got the other response type in the pair: clientResponse & timeout
112-
// // or a duplicate
113-
// req.status = done
114-
// return
115-
//} else {
116-
// // already handled and can be removed
117-
// return
118-
//}
119100
if handled {
120101
// this server has not received a request directly from a client
121102
// hence, the response should be ignored
@@ -135,14 +116,10 @@ func (srv *broadcastServer) handle(response responseMsg) {
135116
log.Println("NOT VALID")
136117
return
137118
}
138-
if req.metadata.BroadcastMsg.Sender == BroadcastClient {
139-
SendMessage(req.ctx, req.finished, WrapMessage(req.metadata, protoreflect.ProtoMessage(response.getResponse()), response.getError()))
140-
}
141-
if req.metadata.BroadcastMsg.OriginAddr == "" {
142-
return
143-
}
144-
if handler, ok := srv.clientHandlers[req.metadata.BroadcastMsg.OriginMethod]; ok {
119+
if handler, ok := srv.clientHandlers[req.metadata.BroadcastMsg.OriginMethod]; ok && req.metadata.BroadcastMsg.OriginAddr != "" {
145120
handler(req.metadata.BroadcastMsg.OriginAddr, broadcastID, response.getResponse())
121+
} else if req.metadata.BroadcastMsg.Sender == BroadcastClient {
122+
SendMessage(req.ctx, req.finished, WrapMessage(req.metadata, protoreflect.ProtoMessage(response.getResponse()), response.getError()))
146123
}
147124
}
148125

clientserver.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ func ServerClientRPC(method string) func(addr, broadcastID string, in protorefle
128128
if len(tmp) >= 1 {
129129
m = tmp[len(tmp)-1]
130130
}
131-
method = "/protos.ClientServer/Client" + m
131+
clientMethod := "/protos.ClientServer/Client" + m
132132
cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
133133
if err != nil {
134134
return nil, err
135135
}
136136
out := new(any)
137137
md := metadata.Pairs(BroadcastID, broadcastID)
138138
ctx := metadata.NewOutgoingContext(context.Background(), md)
139-
err = cc.Invoke(ctx, method, in, out, opts...)
139+
err = cc.Invoke(ctx, clientMethod, in, out, opts...)
140140
if err != nil {
141141
return nil, err
142142
}

cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

handler.go

+26-38
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package gorums
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"time"
87

98
"github.com/relab/gorums/ordering"
@@ -29,43 +28,32 @@ func BroadcastHandler[T RequestTypes, V iBroadcastStruct](impl implementationFun
2928
defer srv.broadcastSrv.Unlock()
3029
req := in.Message.(T)
3130
defer ctx.Release()
32-
// guard:
33-
// - A broadcastID should be non-empty:
34-
// - Maybe the request should be unique? Remove duplicates of the same broadcast? <- Most likely no (up to the implementer)
35-
if err := srv.broadcastSrv.validateMessage(in); err != nil {
36-
log.Fatalln(err, in.Metadata)
37-
return
38-
}
39-
// this does not work yet:
40-
//ctx.update(in.Metadata)
41-
// it is better if the client provide this data in the request:
42-
//if in.Metadata.BroadcastMsg.Sender == "client" && in.Metadata.BroadcastMsg.OriginAddr == "" {
43-
// p, _ := peer.FromContext(ctx)
44-
// in.Metadata.BroadcastMsg.OriginAddr = p.Addr.String()
45-
//}
46-
addOriginMethod(in.Metadata)
47-
broadcastMetadata := newBroadcastMetadata(in.Metadata)
48-
// the client can specify middleware, e.g. authentication, to return early.
49-
if err := srv.broadcastSrv.runMiddleware(broadcastMetadata); err != nil {
50-
// return if any of the middlewares return an error
51-
return
52-
}
53-
//srv.broadcastSrv.b.reset()
54-
//srv.broadcastSrv.b.setMetadata(broadcastMetadata)
55-
// add the request as a client request
56-
srv.broadcastSrv.bNew.setMetadata(broadcastMetadata)
57-
srv.broadcastSrv.addClientRequest(in.Metadata, ctx, finished)
58-
impl(ctx, req, srv.broadcastSrv.bNew.(V))
59-
//srv.broadcastSrv.determineBroadcast(broadcastMetadata)
60-
//// verify whether a server or a client sent the request
61-
if in.Metadata.BroadcastMsg.Sender == BroadcastClient {
62-
go srv.broadcastSrv.timeoutClientResponse(ctx, in, finished)
63-
// //SendMessage(ctx, finished, WrapMessage(in.Metadata, protoreflect.ProtoMessage(nil), nil))
64-
} /*else {
65-
// // server to server communication does not need response?
66-
// SendMessage(ctx, finished, WrapMessage(in.Metadata, protoreflect.ProtoMessage(nil), err))
67-
//}*/
68-
//srv.broadcastSrv.determineReturnToClient(ctx, in.Metadata.BroadcastMsg.GetBroadcastID())
31+
doneChan := make(chan struct{})
32+
go func() {
33+
defer func() { close(doneChan) }()
34+
// guard:
35+
// - A broadcastID should be non-empty:
36+
// - Maybe the request should be unique? Remove duplicates of the same broadcast? <- Most likely no (up to the implementer)
37+
if err := srv.broadcastSrv.validateMessage(in); err != nil {
38+
return
39+
}
40+
addOriginMethod(in.Metadata)
41+
broadcastMetadata := newBroadcastMetadata(in.Metadata)
42+
// the client can specify middleware, e.g. authentication, to return early.
43+
if err := srv.broadcastSrv.runMiddleware(broadcastMetadata); err != nil {
44+
// return if any of the middlewares return an error
45+
return
46+
}
47+
srv.broadcastSrv.bNew.setMetadata(broadcastMetadata)
48+
// add the request as a client request
49+
srv.broadcastSrv.addClientRequest(in.Metadata, ctx, finished)
50+
impl(ctx, req, srv.broadcastSrv.bNew.(V))
51+
//// verify whether a server or a client sent the request
52+
//if in.Metadata.BroadcastMsg.Sender == BroadcastClient {
53+
// go srv.broadcastSrv.timeoutClientResponse(ctx, in, finished)
54+
//}
55+
}()
56+
<-doneChan
6957
}
7058
}
7159

0 commit comments

Comments
 (0)