diff --git a/benchmark/benchmark_gorums.pb.go b/benchmark/benchmark_gorums.pb.go index f2ac8de0..f09a485f 100644 --- a/benchmark/benchmark_gorums.pb.go +++ b/benchmark/benchmark_gorums.pb.go @@ -358,7 +358,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *StartResponse, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.StartServerBenchmark(ctx, req, f) @@ -368,7 +371,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *Result, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.StopServerBenchmark(ctx, req, f) @@ -378,7 +384,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *StartResponse, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.StartBenchmark(ctx, req, f) @@ -388,7 +397,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *MemoryStat, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.StopBenchmark(ctx, req, f) @@ -398,7 +410,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *Echo, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCall(ctx, req, f) @@ -408,7 +423,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *Echo, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.AsyncQuorumCall(ctx, req, f) @@ -418,7 +436,10 @@ func RegisterBenchmarkServer(srv *gorums.Server, impl Benchmark) { once := new(sync.Once) f := func(resp *Echo, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.SlowServer(ctx, req, f) diff --git a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go index 0437ef34..25d53625 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go @@ -63,7 +63,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.GRPCCall(ctx, req, f) @@ -73,7 +76,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCall(ctx, req, f) @@ -83,7 +89,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallPerNodeArg(ctx, req, f) @@ -93,7 +102,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallCustomReturnType(ctx, req, f) @@ -103,7 +115,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallCombo(ctx, req, f) @@ -113,7 +128,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallEmpty(ctx, req, f) @@ -123,7 +141,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *emptypb.Empty, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallEmpty2(ctx, req, f) @@ -153,7 +174,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsync(ctx, req, f) @@ -163,7 +187,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsyncPerNodeArg(ctx, req, f) @@ -173,7 +200,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsyncCustomReturnType(ctx, req, f) @@ -183,7 +213,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsyncCombo(ctx, req, f) @@ -193,7 +226,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsync2(ctx, req, f) @@ -203,7 +239,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *emptypb.Empty, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsyncEmpty(ctx, req, f) @@ -213,7 +252,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QuorumCallAsyncEmpty2(ctx, req, f) @@ -223,7 +265,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.Correctable(ctx, req, f) @@ -233,7 +278,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectablePerNodeArg(ctx, req, f) @@ -243,7 +291,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableCustomReturnType(ctx, req, f) @@ -253,7 +304,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableCombo(ctx, req, f) @@ -263,7 +317,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *emptypb.Empty, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableEmpty(ctx, req, f) @@ -273,7 +330,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableEmpty2(ctx, req, f) @@ -283,7 +343,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableStream(ctx, req, f) @@ -293,7 +356,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableStreamPerNodeArg(ctx, req, f) @@ -303,7 +369,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableStreamCustomReturnType(ctx, req, f) @@ -313,7 +382,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableStreamCombo(ctx, req, f) @@ -323,7 +395,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *emptypb.Empty, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableStreamEmpty(ctx, req, f) @@ -333,7 +408,10 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.CorrectableStreamEmpty2(ctx, req, f) diff --git a/cmd/protoc-gen-gorums/gengorums/template_server.go b/cmd/protoc-gen-gorums/gengorums/template_server.go index e8dea5a7..91ef988f 100644 --- a/cmd/protoc-gen-gorums/gengorums/template_server.go +++ b/cmd/protoc-gen-gorums/gengorums/template_server.go @@ -37,7 +37,10 @@ func Register{{$service}}Server(srv *{{use "gorums.Server" $genFile}}, impl {{$s f := func(resp *{{out $genFile .}}, err error) { {{- /* Only one response message is supported */ -}} once.Do(func() { - finished <- {{use "gorums.WrapMessage" $genFile}}(in.Metadata, resp, err) + select { + case finished <- {{use "gorums.WrapMessage" $genFile}}(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.{{.GoName}}(ctx, req, f) diff --git a/tests/config/config_gorums.pb.go b/tests/config/config_gorums.pb.go index fb3a468c..66436ee0 100644 --- a/tests/config/config_gorums.pb.go +++ b/tests/config/config_gorums.pb.go @@ -165,7 +165,10 @@ func RegisterConfigTestServer(srv *gorums.Server, impl ConfigTest) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.Config(ctx, req, f) diff --git a/tests/dummy/dummy_gorums.pb.go b/tests/dummy/dummy_gorums.pb.go index 3fac90e7..416b42bb 100644 --- a/tests/dummy/dummy_gorums.pb.go +++ b/tests/dummy/dummy_gorums.pb.go @@ -150,7 +150,10 @@ func RegisterDummyServer(srv *gorums.Server, impl Dummy) { once := new(sync.Once) f := func(resp *Empty, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.Test(ctx, req, f) diff --git a/tests/metadata/metadata_gorums.pb.go b/tests/metadata/metadata_gorums.pb.go index a3975829..01111e97 100644 --- a/tests/metadata/metadata_gorums.pb.go +++ b/tests/metadata/metadata_gorums.pb.go @@ -165,7 +165,10 @@ func RegisterMetadataTestServer(srv *gorums.Server, impl MetadataTest) { once := new(sync.Once) f := func(resp *NodeID, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.IDFromMD(ctx, req, f) @@ -175,7 +178,10 @@ func RegisterMetadataTestServer(srv *gorums.Server, impl MetadataTest) { once := new(sync.Once) f := func(resp *IPAddr, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.WhatIP(ctx, req, f) diff --git a/tests/ordering/order_gorums.pb.go b/tests/ordering/order_gorums.pb.go index 03b1cefe..d4483997 100644 --- a/tests/ordering/order_gorums.pb.go +++ b/tests/ordering/order_gorums.pb.go @@ -209,7 +209,10 @@ func RegisterGorumsTestServer(srv *gorums.Server, impl GorumsTest) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QC(ctx, req, f) @@ -219,7 +222,10 @@ func RegisterGorumsTestServer(srv *gorums.Server, impl GorumsTest) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.QCAsync(ctx, req, f) @@ -229,7 +235,10 @@ func RegisterGorumsTestServer(srv *gorums.Server, impl GorumsTest) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.UnaryRPC(ctx, req, f) diff --git a/tests/qf/qf_gorums.pb.go b/tests/qf/qf_gorums.pb.go index b57f1198..19db6ee3 100644 --- a/tests/qf/qf_gorums.pb.go +++ b/tests/qf/qf_gorums.pb.go @@ -195,7 +195,10 @@ func RegisterQuorumFunctionServer(srv *gorums.Server, impl QuorumFunction) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.UseReq(ctx, req, f) @@ -205,7 +208,10 @@ func RegisterQuorumFunctionServer(srv *gorums.Server, impl QuorumFunction) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.IgnoreReq(ctx, req, f) diff --git a/tests/tls/tls_gorums.pb.go b/tests/tls/tls_gorums.pb.go index bce500ff..0a775e0c 100644 --- a/tests/tls/tls_gorums.pb.go +++ b/tests/tls/tls_gorums.pb.go @@ -150,7 +150,10 @@ func RegisterTLSServer(srv *gorums.Server, impl TLS) { once := new(sync.Once) f := func(resp *Response, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.TestTLS(ctx, req, f) diff --git a/tests/unresponsive/unresponsive_gorums.pb.go b/tests/unresponsive/unresponsive_gorums.pb.go index f9aaa15c..37c5325b 100644 --- a/tests/unresponsive/unresponsive_gorums.pb.go +++ b/tests/unresponsive/unresponsive_gorums.pb.go @@ -150,7 +150,10 @@ func RegisterUnresponsiveServer(srv *gorums.Server, impl Unresponsive) { once := new(sync.Once) f := func(resp *Empty, err error) { once.Do(func() { - finished <- gorums.WrapMessage(in.Metadata, resp, err) + select { + case finished <- gorums.WrapMessage(in.Metadata, resp, err): + case <-ctx.Done(): + } }) } impl.TestUnresponsive(ctx, req, f)