Skip to content

Commit fab3bc3

Browse files
authored
Merge pull request hyperledger-labs#77 from HagarMeir/master
adding a simple request pool
2 parents 640a36f + ec7f64b commit fab3bc3

File tree

11 files changed

+340
-55
lines changed

11 files changed

+340
-55
lines changed

examples/naive_chain/node.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ func (*Node) RequestID(req []byte) bft.RequestInfo {
4343
}
4444
}
4545

46-
func (*Node) VerifyProposal(proposal bft.Proposal, prevHeader []byte) error {
47-
return nil
46+
func (*Node) VerifyProposal(proposal bft.Proposal, prevHeader []byte) ([]bft.RequestInfo, error) {
47+
return nil, nil
4848
}
4949

50-
func (*Node) VerifyRequest(val []byte) error {
51-
return nil
50+
func (*Node) VerifyRequest(val []byte) (bft.RequestInfo, error) {
51+
return bft.RequestInfo{}, nil
5252
}
5353

5454
func (*Node) VerifyConsenterSig(_ bft.Signature, prop bft.Proposal) error {

internal/bft/controller.go

+22-22
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ type Logger interface {
2424

2525
//go:generate mockery -dir . -name Verifier -case underscore -output ./mocks/
2626
type Verifier interface {
27-
VerifyProposal(proposal types.Proposal, prevHeader []byte) error
28-
VerifyRequest(val []byte) error
27+
VerifyProposal(proposal types.Proposal, prevHeader []byte) ([]types.RequestInfo, error)
28+
VerifyRequest(val []byte) (types.RequestInfo, error)
2929
VerifyConsenterSig(signature types.Signature, prop types.Proposal) error
3030
VerificationSequence() uint64
3131
}
@@ -66,11 +66,6 @@ type Signer interface {
6666
SignProposal(types.Proposal) *types.Signature
6767
}
6868

69-
//go:generate mockery -dir . -name RequestPool -case underscore -output ./mocks/
70-
type RequestPool interface {
71-
Submit(request []byte)
72-
}
73-
7469
//go:generate mockery -dir . -name Batcher -case underscore -output ./mocks/
7570
type Batcher interface {
7671
NextBatch() [][]byte
@@ -83,18 +78,19 @@ type Future interface {
8378

8479
type Controller struct {
8580
// configuration
86-
ID uint64
87-
N uint64
88-
RequestPool RequestPool
89-
Batcher Batcher
90-
Verifier Verifier
91-
Logger Logger
92-
Assembler Assembler
93-
Application Application
94-
FailureDetector FailureDetector
95-
Synchronizer Synchronizer
96-
Comm Comm
97-
Signer Signer
81+
ID uint64
82+
N uint64
83+
RequestPool RequestPool
84+
Batcher Batcher
85+
Verifier Verifier
86+
Logger Logger
87+
Assembler Assembler
88+
Application Application
89+
FailureDetector FailureDetector
90+
Synchronizer Synchronizer
91+
Comm Comm
92+
Signer Signer
93+
RequestInspector RequestInspector
9894

9995
quorum int
10096

@@ -120,15 +116,19 @@ func (c *Controller) leaderID() uint64 {
120116
}
121117

122118
func (c *Controller) computeQuorum() int {
123-
f := int(math.Floor((float64(c.N) - 1.0) / 3.0))
119+
f := int((int(c.N) - 1) / 3)
124120
q := int(math.Ceil((float64(c.N) + float64(f) + 1) / 2.0))
125121
c.Logger.Debugf("The number of nodes (N) is %d, F is %d, and the quorum size is %d", c.N, f, q)
126122
return q
127123
}
128124

129125
// SubmitRequest submits a request to go through consensus
130126
func (c *Controller) SubmitRequest(request []byte) {
131-
c.RequestPool.Submit(request)
127+
err := c.RequestPool.Submit(request)
128+
if err != nil {
129+
info := c.RequestInspector.RequestID(request)
130+
c.Logger.Warnf("Request %v was not submitted, error: %v", info, err)
131+
}
132132
}
133133

134134
// ProcessMessages dispatches the incoming message to the required component
@@ -212,7 +212,7 @@ func (c *Controller) getNextBatch() [][]byte {
212212
}
213213
requests := c.Batcher.NextBatch()
214214
for _, req := range requests {
215-
err := c.Verifier.VerifyRequest(req)
215+
_, err := c.Verifier.VerifyRequest(req) // TODO use returned request info
216216
if err != nil {
217217
c.Logger.Warnf("Ignoring bad request: %v, verifier error is: %v", req, err)
218218
continue

internal/bft/controller_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestQuorum(t *testing.T) {
5757
for _, testCase := range quorums {
5858
t.Run(fmt.Sprintf("%d nodes", testCase.N), func(t *testing.T) {
5959
verifier := &mocks.Verifier{}
60-
verifier.On("VerifyProposal", mock.Anything, mock.Anything).Return(nil)
60+
verifier.On("VerifyProposal", mock.Anything, mock.Anything).Return(nil, nil)
6161
comm := &mocks.Comm{}
6262
comm.On("Broadcast", mock.Anything)
6363
basicLog, err := zap.NewDevelopment()
@@ -117,8 +117,8 @@ func TestLeaderPropose(t *testing.T) {
117117
batcher := &mocks.Batcher{}
118118
batcher.On("NextBatch").Return([][]byte{req})
119119
verifier := &mocks.Verifier{}
120-
verifier.On("VerifyRequest", req).Return(nil)
121-
verifier.On("VerifyProposal", mock.Anything, mock.Anything).Return(nil)
120+
verifier.On("VerifyRequest", req).Return(types.RequestInfo{}, nil)
121+
verifier.On("VerifyProposal", mock.Anything, mock.Anything).Return(nil, nil)
122122
verifier.On("VerifyConsenterSig", mock.Anything, mock.Anything, mock.Anything).Return(nil)
123123
assembler := &mocks.Assembler{}
124124
assembler.On("AssembleProposal", mock.Anything, [][]byte{req}).Return(proposal, [][]byte{})
@@ -180,8 +180,8 @@ func TestLeaderChange(t *testing.T) {
180180
batcher := &mocks.Batcher{}
181181
batcher.On("NextBatch").Return([][]byte{req})
182182
verifier := &mocks.Verifier{}
183-
verifier.On("VerifyRequest", req).Return(nil)
184-
verifier.On("VerifyProposal", proposal, mock.Anything).Return(nil)
183+
verifier.On("VerifyRequest", req).Return(types.RequestInfo{}, nil)
184+
verifier.On("VerifyProposal", proposal, mock.Anything).Return(nil, nil)
185185
assembler := &mocks.Assembler{}
186186
assembler.On("AssembleProposal", mock.Anything, [][]byte{req}).Return(proposal, [][]byte{})
187187
comm := &mocks.Comm{}

internal/bft/mocks/request_inspector.go

+25
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/bft/mocks/verifier.go

+26-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/bft/requestpool.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright IBM Corp. All Rights Reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
package bft
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"sync"
12+
13+
"github.com/SmartBFT-Go/consensus/pkg/types"
14+
"golang.org/x/sync/semaphore"
15+
)
16+
17+
//go:generate mockery -dir . -name RequestInspector -case underscore -output ./mocks/
18+
type RequestInspector interface {
19+
RequestID(req []byte) types.RequestInfo
20+
}
21+
22+
type RequestPool struct {
23+
Log Logger
24+
RequestInspector RequestInspector
25+
queue []Request
26+
semaphore *semaphore.Weighted
27+
lock sync.RWMutex
28+
QueueSize int64
29+
existMap map[string]bool
30+
}
31+
32+
type Request struct {
33+
ID string
34+
Request []byte
35+
ClientID string
36+
}
37+
38+
func (rp *RequestPool) Start() {
39+
rp.queue = make([]Request, 0)
40+
rp.semaphore = semaphore.NewWeighted(rp.QueueSize)
41+
rp.existMap = make(map[string]bool)
42+
}
43+
44+
// Submit a request into the pool, returns an error when request is already in the pool
45+
func (rp *RequestPool) Submit(request []byte) error {
46+
reqInfo := rp.RequestInspector.RequestID(request)
47+
req := Request{
48+
ID: reqInfo.ID,
49+
Request: request,
50+
ClientID: reqInfo.ClientID,
51+
}
52+
if err := rp.semaphore.Acquire(context.Background(), 1); err != nil {
53+
return fmt.Errorf("error in acquiring semaphore: %v", err)
54+
}
55+
rp.lock.Lock()
56+
defer rp.lock.Unlock()
57+
existStr := fmt.Sprintf("%v~%v", reqInfo.ClientID, reqInfo.ID)
58+
if _, exist := rp.existMap[existStr]; exist {
59+
rp.semaphore.Release(1)
60+
err := fmt.Sprintf("a request with ID %v and client ID %v already exists in the pool", reqInfo.ID, reqInfo.ClientID)
61+
rp.Log.Errorf(err)
62+
return fmt.Errorf(err)
63+
}
64+
rp.queue = append(rp.queue, req)
65+
rp.existMap[existStr] = true
66+
return nil
67+
}
68+
69+
// NextRequests return the next requests to be batched
70+
func (rp *RequestPool) NextRequests(n int) []Request {
71+
rp.lock.RLock()
72+
defer rp.lock.RUnlock()
73+
if len(rp.queue) <= n {
74+
return rp.queue
75+
}
76+
return rp.queue[:n]
77+
}
78+
79+
// RemoveRequest removes the given request from the pool
80+
func (rp *RequestPool) RemoveRequest(request Request) error {
81+
rp.lock.Lock()
82+
defer rp.lock.Unlock()
83+
existStr := fmt.Sprintf("%v~%v", request.ClientID, request.ID)
84+
if _, exist := rp.existMap[existStr]; !exist {
85+
err := fmt.Sprintf("Request %v is not in the pool at remove time", request)
86+
rp.Log.Warnf(err)
87+
return fmt.Errorf(err)
88+
}
89+
for i, existingReq := range rp.queue {
90+
if existingReq.ClientID != request.ClientID || existingReq.ID != request.ID {
91+
continue
92+
}
93+
rp.Log.Infof("Removed request %v from request pool", request)
94+
rp.queue = append(rp.queue[:i], rp.queue[i+1:]...)
95+
delete(rp.existMap, existStr)
96+
rp.semaphore.Release(1)
97+
return nil
98+
}
99+
panic("RemoveRequest should have returned earlier")
100+
}

0 commit comments

Comments
 (0)