Skip to content

Commit 15c8d66

Browse files
test(broadcast): added test for broadcasthandler
1 parent 432fcd8 commit 15c8d66

File tree

3 files changed

+184
-110
lines changed

3 files changed

+184
-110
lines changed

broadcast.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type broadcastServer struct {
1818
id string
1919
addr string
2020
broadcastedMsgs map[string]map[string]bool
21-
methods map[string]broadcastFunc
21+
handlers map[string]broadcastFunc
2222
broadcastChan chan *broadcastMsg
2323
responseChan chan responseMsg
2424
clientHandlers map[string]func(addr, broadcastID string, req protoreflect.ProtoMessage, opts ...grpc.CallOption) (any, error)
@@ -40,7 +40,7 @@ func newBroadcastServer() *broadcastServer {
4040
broadcastedMsgs: make(map[string]map[string]bool),
4141
clientHandlers: make(map[string]func(addr, broadcastID string, req protoreflect.ProtoMessage, opts ...grpc.CallOption) (any, error)),
4242
broadcastChan: make(chan *broadcastMsg, 1000),
43-
methods: make(map[string]broadcastFunc),
43+
handlers: make(map[string]broadcastFunc),
4444
responseChan: make(chan responseMsg),
4545
clientReqs: NewRequestMap(),
4646
middlewares: make([]func(BroadcastMetadata) error, 0),
@@ -64,7 +64,7 @@ func (srv *broadcastServer) alreadyBroadcasted(broadcastID string, method string
6464

6565
func (srv *broadcastServer) run() {
6666
for msg := range srv.broadcastChan {
67-
if handler, ok := srv.methods[msg.method]; ok {
67+
if handler, ok := srv.handlers[msg.method]; ok {
6868
handler(msg.ctx, msg.request, msg.metadata, msg.srvAddrs)
6969
}
7070
msg.setFinished()

broadcast_test.go

+180-106
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
package gorums_test
1+
package gorums
22

33
import (
4-
"net"
4+
"context"
5+
"sync"
56
"testing"
6-
"time"
77

8-
"github.com/relab/gorums"
9-
"google.golang.org/grpc"
10-
"google.golang.org/grpc/credentials/insecure"
8+
"github.com/google/uuid"
9+
"github.com/relab/gorums/ordering"
1110
"google.golang.org/protobuf/reflect/protoreflect"
1211
)
1312

14-
type testBroadcastRequest struct{}
13+
type testBroadcastRequest struct {
14+
value string
15+
}
1516

1617
func (t *testBroadcastRequest) ProtoReflect() protoreflect.Message {
1718
return nil
@@ -23,141 +24,214 @@ func (t *testBroadcastResponse) ProtoReflect() protoreflect.Message {
2324
return nil
2425
}
2526

26-
type testBroadcastImpl interface {
27-
Broadcast(ctx gorums.ServerCtx, request *testBroadcastRequest, broadcast *testBroadcast)
28-
}
29-
30-
type testBroadcastServer struct {
31-
*gorums.Server
32-
numMsgs int
33-
}
34-
27+
// type testBroadcastImpl interface {
28+
// Broadcast(ctx gorums.ServerCtx, request *testBroadcastRequest, broadcast *testBroadcast)
29+
// }
30+
//
31+
// type testBroadcastServer struct {
32+
// *gorums.Server
33+
// numMsgs int
34+
// }
35+
//
36+
// func newTestBroadcastServer() *testBroadcastServer {
37+
// return &testBroadcastServer{
38+
// Server: gorums.NewServer(),
39+
// }
40+
// }
3541
type testBroadcast struct {
36-
*gorums.BroadcastStruct
37-
sp *gorums.SpBroadcast
38-
metadata gorums.BroadcastMetadata
42+
*BroadcastStruct
43+
sp *SpBroadcast
44+
metadata BroadcastMetadata
3945
}
4046

41-
func configureHandlers(b *testBroadcast) func(bh gorums.BroadcastHandlerFunc, ch gorums.BroadcastReturnToClientHandlerFunc) {
42-
return func(bh gorums.BroadcastHandlerFunc, ch gorums.BroadcastReturnToClientHandlerFunc) {
47+
func configureHandlers(b *testBroadcast) func(bh BroadcastHandlerFunc, ch BroadcastReturnToClientHandlerFunc) {
48+
return func(bh BroadcastHandlerFunc, ch BroadcastReturnToClientHandlerFunc) {
4349
b.sp.BroadcastHandler = bh
4450
b.sp.ReturnToClientHandler = ch
4551
}
4652
}
4753

48-
func configureMetadata(b *testBroadcast) func(metadata gorums.BroadcastMetadata) {
49-
return func(metadata gorums.BroadcastMetadata) {
54+
func configureMetadata(b *testBroadcast) func(metadata BroadcastMetadata) {
55+
return func(metadata BroadcastMetadata) {
5056
b.metadata = metadata
5157
}
5258
}
5359

54-
func (b *testBroadcast) Broadcast(req *testBroadcastRequest, opts ...gorums.BroadcastOption) {
55-
data := gorums.NewBroadcastOptions()
60+
func (b *testBroadcast) Broadcast(req *testBroadcastRequest, opts ...BroadcastOption) {
61+
data := NewBroadcastOptions()
5662
for _, opt := range opts {
5763
opt(&data)
5864
}
5965
b.sp.BroadcastHandler("broadcast", req, b.metadata, data)
6066
}
6167

62-
// Returns a readonly struct of the metadata used in the broadcast.
6368
//
64-
// Note: Some of the data are equal across the cluster, such as BroadcastID.
65-
// Other fields are local, such as SenderAddr.
66-
func (b *testBroadcast) GetMetadata() gorums.BroadcastMetadata {
67-
return b.metadata
68-
}
69+
//// Returns a readonly struct of the metadata used in the broadcast.
70+
////
71+
//// Note: Some of the data are equal across the cluster, such as BroadcastID.
72+
//// Other fields are local, such as SenderAddr.
73+
//func (b *testBroadcast) GetMetadata() gorums.BroadcastMetadata {
74+
// return b.metadata
75+
//}
76+
//
77+
//func (es *testBroadcastServer) Broadcast(ctx gorums.ServerCtx, request *testBroadcastRequest, broadcast *testBroadcast) {
78+
// es.numMsgs++
79+
// broadcast.Broadcast(&testBroadcastRequest{})
80+
//}
81+
//
82+
//func createBroadcastServer(ownAddr string, srvAddrs []string) *testBroadcastServer {
83+
// srv := &testBroadcastServer{
84+
// Server: gorums.NewServer(),
85+
// numMsgs: 0,
86+
// }
87+
// srv.RegisterHandler("broadcast", gorums.BroadcastHandler(srv.Broadcast, srv.Server))
88+
//
89+
// lis, err := net.Listen("tcp", ownAddr)
90+
// if err != nil {
91+
// return nil
92+
// }
93+
//
94+
// go func() { _ = srv.Serve(lis) }()
95+
// //defer srv.Stop()
96+
//
97+
// srv.RegisterView(ownAddr, srvAddrs)
98+
// srv.ListenForBroadcast()
99+
// return srv
100+
//}
101+
//
102+
//type testBroadcastConfiguration struct {
103+
// gorums.RawConfiguration
104+
// qspec *testBroadcastQSpec
105+
// srv *clientServerImpl
106+
//}
107+
//
108+
//type clientServerImpl struct {
109+
// *gorums.ClientServer
110+
// grpcServer *grpc.Server
111+
//}
112+
//
113+
//func (c *testBroadcastConfiguration) RegisterClientServer(lis net.Listener, opts ...grpc.ServerOption) error {
114+
// srvImpl := &clientServerImpl{
115+
// grpcServer: grpc.NewServer(opts...),
116+
// }
117+
// srv, err := gorums.NewClientServer(lis)
118+
// if err != nil {
119+
// return err
120+
// }
121+
// //srvImpl.grpcServer.RegisterService(&clientServer_ServiceDesc, srvImpl)
122+
// go srvImpl.grpcServer.Serve(lis)
123+
// srvImpl.ClientServer = srv
124+
// c.srv = srvImpl
125+
// return nil
126+
//}
127+
//
128+
//type testBroadcastQSpec struct {
129+
// quorumSize int
130+
//}
131+
//
132+
//func newQSpec(qSize int) *testBroadcastQSpec {
133+
// return &testBroadcastQSpec{
134+
// quorumSize: qSize,
135+
// }
136+
//}
137+
//func (qs *testBroadcastQSpec) BroadcastQF(reqs []*testBroadcastResponse) (*testBroadcastResponse, bool) {
138+
// if len(reqs) < qs.quorumSize {
139+
// return nil, false
140+
// }
141+
// return reqs[0], true
142+
//}
143+
//
144+
//func getConfig(srvAddresses []string, numSrvs int) *testBroadcastConfiguration {
145+
// mgr := gorums.NewRawManager(
146+
// gorums.WithDialTimeout(time.Second),
147+
// gorums.WithGrpcDialOptions(
148+
// grpc.WithBlock(),
149+
// grpc.WithTransportCredentials(insecure.NewCredentials()),
150+
// ),
151+
// )
152+
// c := &testBroadcastConfiguration{}
153+
// c.RawConfiguration, _ = gorums.NewRawConfiguration(mgr, gorums.WithNodeList(srvAddresses))
154+
// c.qspec = newQSpec(numSrvs)
155+
// //c.RegisterClientServer(gorums.WithListener("localhost:8080"))
156+
// return c
157+
//}
158+
159+
//func TestBroadcast(t *testing.T) {
160+
// srv := gorums.NewServer()
161+
//
162+
// lis, err := net.Listen("tcp", ":0")
163+
// if err != nil {
164+
// t.Fatal(err)
165+
// }
166+
//
167+
// go func() { _ = srv.Serve(lis) }()
168+
// defer srv.Stop()
169+
//
170+
//}
69171

70-
func (es *testBroadcastServer) Broadcast(ctx gorums.ServerCtx, request *testBroadcastRequest, broadcast *testBroadcast) {
71-
es.numMsgs++
72-
broadcast.Broadcast(&testBroadcastRequest{})
172+
type testBroadcastServer struct {
173+
*Server
174+
numMsgs int
175+
req *testBroadcastRequest
73176
}
74177

75-
func createBroadcastServer(ownAddr string, srvAddrs []string) *testBroadcastServer {
178+
func newTestBroadcastServer() *testBroadcastServer {
76179
srv := &testBroadcastServer{
77-
Server: gorums.NewServer(),
78-
numMsgs: 0,
180+
Server: NewServer(),
181+
req: &testBroadcastRequest{},
79182
}
80-
srv.RegisterHandler("broadcast", gorums.BroadcastHandler(srv.Broadcast, srv.Server))
81-
82-
lis, err := net.Listen("tcp", ownAddr)
83-
if err != nil {
84-
return nil
183+
b := &testBroadcast{
184+
BroadcastStruct: NewBroadcastStruct(),
185+
sp: NewSpBroadcastStruct(),
85186
}
86-
87-
go func() { _ = srv.Serve(lis) }()
88-
//defer srv.Stop()
89-
90-
srv.RegisterView(ownAddr, srvAddrs)
91-
srv.ListenForBroadcast()
187+
srv.RegisterBroadcastStruct(b, configureHandlers(b), configureMetadata(b))
92188
return srv
93189
}
94190

95-
type testBroadcastConfiguration struct {
96-
gorums.RawConfiguration
97-
qspec *testBroadcastQSpec
98-
srv *clientServerImpl
191+
func (srv *testBroadcastServer) Broadcast(ctx ServerCtx, request *testBroadcastRequest, broadcast *testBroadcast) {
192+
srv.numMsgs++
193+
srv.req = request
194+
//broadcast.Broadcast(&testBroadcastRequest{})
99195
}
100196

101-
type clientServerImpl struct {
102-
*gorums.ClientServer
103-
grpcServer *grpc.Server
104-
}
105-
106-
func (c *testBroadcastConfiguration) RegisterClientServer(lis net.Listener, opts ...grpc.ServerOption) error {
107-
srvImpl := &clientServerImpl{
108-
grpcServer: grpc.NewServer(opts...),
197+
func createReq(val string) (ServerCtx, *Message, chan<- *Message) {
198+
var mut sync.Mutex
199+
mut.Lock()
200+
srvCtx := ServerCtx{
201+
Context: context.Background(),
202+
once: new(sync.Once),
203+
mut: &mut,
109204
}
110-
srv, err := gorums.NewClientServer(lis)
111-
if err != nil {
112-
return err
205+
req := newMessage(requestType)
206+
req.Metadata = &ordering.Metadata{
207+
BroadcastMsg: &ordering.BroadcastMsg{
208+
BroadcastID: uuid.New().String(),
209+
},
113210
}
114-
//srvImpl.grpcServer.RegisterService(&clientServer_ServiceDesc, srvImpl)
115-
go srvImpl.grpcServer.Serve(lis)
116-
srvImpl.ClientServer = srv
117-
c.srv = srvImpl
118-
return nil
119-
}
120-
121-
type testBroadcastQSpec struct {
122-
quorumSize int
123-
}
124-
125-
func newQSpec(qSize int) *testBroadcastQSpec {
126-
return &testBroadcastQSpec{
127-
quorumSize: qSize,
128-
}
129-
}
130-
func (qs *testBroadcastQSpec) BroadcastQF(reqs []*testBroadcastResponse) (*testBroadcastResponse, bool) {
131-
if len(reqs) < qs.quorumSize {
132-
return nil, false
211+
req.Message = &testBroadcastRequest{
212+
value: val,
133213
}
134-
return reqs[0], true
214+
finished := make(chan *Message, 0)
215+
return srvCtx, req, finished
135216
}
136217

137-
func getConfig(srvAddresses []string, numSrvs int) *testBroadcastConfiguration {
138-
mgr := gorums.NewRawManager(
139-
gorums.WithDialTimeout(time.Second),
140-
gorums.WithGrpcDialOptions(
141-
grpc.WithBlock(),
142-
grpc.WithTransportCredentials(insecure.NewCredentials()),
143-
),
144-
)
145-
c := &testBroadcastConfiguration{}
146-
c.RawConfiguration, _ = gorums.NewRawConfiguration(mgr, gorums.WithNodeList(srvAddresses))
147-
c.qspec = newQSpec(numSrvs)
148-
//c.RegisterClientServer(gorums.WithListener("localhost:8080"))
149-
return c
150-
}
218+
func TestBroadcastHandler(t *testing.T) {
219+
handlerName := "broadcast"
151220

152-
func TestBroadcast(t *testing.T) {
153-
srv := gorums.NewServer()
221+
// create a server
222+
srv := newTestBroadcastServer()
223+
// register the broadcast handler. Similar to proto option: broadcast
224+
srv.RegisterHandler(handlerName, BroadcastHandler(srv.Broadcast, srv.Server))
154225

155-
lis, err := net.Listen("tcp", ":0")
156-
if err != nil {
157-
t.Fatal(err)
158-
}
159-
160-
go func() { _ = srv.Serve(lis) }()
161-
defer srv.Stop()
226+
// create a request
227+
vals := []string{"test1", "test2", "test3"}
228+
for _, val := range vals {
229+
srvCtx, req, finished := createReq(val)
230+
// call the server handler
231+
srv.srv.handlers[handlerName](srvCtx, req, finished)
162232

233+
if srv.req.value != val {
234+
t.Errorf("request.value = %s, expected %s", srv.req.value, val)
235+
}
236+
}
163237
}

handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (srv *broadcastServer) registerReturnToClientHandler(method string, handler
128128
}
129129

130130
func (srv *broadcastServer) registerBroadcastFunc(method string) {
131-
srv.methods[method] = func(ctx context.Context, in RequestTypes, md BroadcastMetadata, srvAddrs []string) {
131+
srv.handlers[method] = func(ctx context.Context, in RequestTypes, md BroadcastMetadata, srvAddrs []string) {
132132
cd := broadcastCallData{
133133
Message: in,
134134
Method: method,

0 commit comments

Comments
 (0)