Skip to content

Commit 4bfa572

Browse files
committed
adding a simple request pool
Signed-off-by: Hagar Meir <[email protected]>
1 parent 640a36f commit 4bfa572

File tree

5 files changed

+267
-12
lines changed

5 files changed

+267
-12
lines changed

internal/bft/controller.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,6 @@ type Signer interface {
6666
SignProposal(types.Proposal) *types.Signature
6767
}
6868

69-
//go:generate mockery -dir . -name RequestPool -case underscore -output ./mocks/
70-
type RequestPool interface {
71-
Submit(request []byte)
72-
}
73-
7469
//go:generate mockery -dir . -name Batcher -case underscore -output ./mocks/
7570
type Batcher interface {
7671
NextBatch() [][]byte
@@ -120,15 +115,18 @@ func (c *Controller) leaderID() uint64 {
120115
}
121116

122117
func (c *Controller) computeQuorum() int {
123-
f := int(math.Floor((float64(c.N) - 1.0) / 3.0))
118+
f := int((int(c.N) - 1) / 3)
124119
q := int(math.Ceil((float64(c.N) + float64(f) + 1) / 2.0))
125120
c.Logger.Debugf("The number of nodes (N) is %d, F is %d, and the quorum size is %d", c.N, f, q)
126121
return q
127122
}
128123

129124
// SubmitRequest submits a request to go through consensus
130125
func (c *Controller) SubmitRequest(request []byte) {
131-
c.RequestPool.Submit(request)
126+
err := c.RequestPool.Submit(request)
127+
if err != nil {
128+
c.Logger.Warnf("Request %v was not submitted, error: %v", request, err)
129+
}
132130
}
133131

134132
// ProcessMessages dispatches the incoming message to the required component

internal/bft/mocks/request_inspector.go

+25
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/bft/requestpool.go

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright IBM Corp. All Rights Reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
package bft
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"sync"
12+
13+
"github.com/SmartBFT-Go/consensus/pkg/types"
14+
"golang.org/x/sync/semaphore"
15+
)
16+
17+
//go:generate mockery -dir . -name RequestInspector -case underscore -output ./mocks/
18+
type RequestInspector interface {
19+
RequestID(req []byte) types.RequestInfo
20+
}
21+
22+
type RequestPool struct {
23+
Log Logger
24+
RequestInspector RequestInspector
25+
queue []Request
26+
semaphore *semaphore.Weighted
27+
lock sync.RWMutex
28+
QueueSize int64
29+
}
30+
31+
type Request struct {
32+
ID string
33+
Request []byte
34+
ClientID string
35+
}
36+
37+
func (rp *RequestPool) Start() {
38+
rp.queue = make([]Request, 0)
39+
rp.semaphore = semaphore.NewWeighted(rp.QueueSize)
40+
}
41+
42+
// Submit a request into the pool, returns an error when request is already in the pool
43+
func (rp *RequestPool) Submit(request []byte) error {
44+
reqInfo := rp.RequestInspector.RequestID(request)
45+
req := Request{
46+
ID: reqInfo.ID,
47+
Request: request,
48+
ClientID: reqInfo.ClientID,
49+
}
50+
if err := rp.semaphore.Acquire(context.Background(), 1); err != nil {
51+
return fmt.Errorf("error in acquiring semaphore: %v", err)
52+
}
53+
rp.lock.Lock()
54+
defer rp.lock.Unlock()
55+
for _, existingReq := range rp.queue {
56+
if existingReq.ClientID == reqInfo.ClientID && existingReq.ID == reqInfo.ID {
57+
rp.semaphore.Release(1)
58+
err := fmt.Sprintf("a request with ID %v and client ID %v already exists in the pool", reqInfo.ID, reqInfo.ClientID)
59+
rp.Log.Errorf(err)
60+
return fmt.Errorf(err)
61+
}
62+
}
63+
rp.queue = append(rp.queue, req)
64+
return nil
65+
}
66+
67+
// NextRequests return the next requests to be batched
68+
func (rp *RequestPool) NextRequests(n int) []Request {
69+
rp.lock.RLock()
70+
defer rp.lock.RUnlock()
71+
if len(rp.queue) <= n {
72+
return rp.queue
73+
}
74+
return rp.queue[:n]
75+
}
76+
77+
// RemoveRequest removes the given request from the pool
78+
func (rp *RequestPool) RemoveRequest(request Request) error {
79+
rp.lock.Lock()
80+
defer rp.lock.Unlock()
81+
removed := false
82+
for i, existingReq := range rp.queue {
83+
if existingReq.ClientID == request.ClientID && existingReq.ID == request.ID {
84+
rp.Log.Infof("Removed request %v from request pool", request)
85+
rp.queue = append(rp.queue[:i], rp.queue[i+1:]...)
86+
removed = true
87+
rp.semaphore.Release(1)
88+
break
89+
}
90+
}
91+
if !removed {
92+
err := fmt.Sprintf("Request %v is not in the pool at remove time", request)
93+
rp.Log.Warnf(err)
94+
return fmt.Errorf(err)
95+
}
96+
return nil
97+
}

internal/bft/requestpool_test.go

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright IBM Corp. All Rights Reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
package bft_test
7+
8+
import (
9+
"fmt"
10+
"sync"
11+
"testing"
12+
13+
"github.com/SmartBFT-Go/consensus/internal/bft"
14+
"github.com/SmartBFT-Go/consensus/internal/bft/mocks"
15+
"github.com/SmartBFT-Go/consensus/pkg/types"
16+
"github.com/stretchr/testify/assert"
17+
"go.uber.org/zap"
18+
)
19+
20+
func TestReqPoolBasic(t *testing.T) {
21+
basicLog, err := zap.NewDevelopment()
22+
assert.NoError(t, err)
23+
log := basicLog.Sugar()
24+
insp := &mocks.RequestInspector{}
25+
byteReq1 := []byte{1}
26+
insp.On("RequestID", byteReq1).Return(types.RequestInfo{ID: "1", ClientID: "1"})
27+
pool := bft.RequestPool{
28+
Log: log,
29+
RequestInspector: insp,
30+
QueueSize: 3,
31+
}
32+
pool.Start()
33+
err = pool.Submit(byteReq1)
34+
assert.NoError(t, err)
35+
req1 := bft.Request{
36+
ID: "1",
37+
ClientID: "1",
38+
Request: byteReq1,
39+
}
40+
err = pool.Submit(byteReq1)
41+
assert.Error(t, err)
42+
err = pool.RemoveRequest(req1)
43+
assert.NoError(t, err)
44+
err = pool.Submit(byteReq1)
45+
assert.NoError(t, err)
46+
err = pool.RemoveRequest(req1)
47+
assert.NoError(t, err)
48+
49+
byteReq2 := []byte{2}
50+
insp.On("RequestID", byteReq2).Return(types.RequestInfo{ID: "2", ClientID: "2"})
51+
err = pool.Submit(byteReq2)
52+
assert.NoError(t, err)
53+
err = pool.Submit(byteReq1)
54+
assert.NoError(t, err)
55+
err = pool.Submit(byteReq1)
56+
assert.Error(t, err)
57+
err = pool.Submit(byteReq2)
58+
assert.Error(t, err)
59+
err = pool.RemoveRequest(req1)
60+
assert.NoError(t, err)
61+
err = pool.Submit(byteReq1)
62+
assert.NoError(t, err)
63+
req2 := bft.Request{
64+
ID: "2",
65+
ClientID: "2",
66+
Request: byteReq2,
67+
}
68+
err = pool.RemoveRequest(req2)
69+
assert.NoError(t, err)
70+
err = pool.Submit(byteReq2)
71+
assert.NoError(t, err)
72+
73+
byteReq3 := []byte{3}
74+
insp.On("RequestID", byteReq3).Return(types.RequestInfo{ID: "3", ClientID: "3"})
75+
err = pool.Submit(byteReq3)
76+
assert.NoError(t, err)
77+
78+
next := pool.NextRequests(4)
79+
assert.Equal(t, "1", next[0].ID)
80+
assert.Equal(t, "2", next[1].ID)
81+
assert.Equal(t, "3", next[2].ID)
82+
assert.Len(t, next, 3)
83+
84+
err = pool.RemoveRequest(req2)
85+
assert.NoError(t, err)
86+
87+
next = pool.NextRequests(4)
88+
assert.Equal(t, "1", next[0].ID)
89+
assert.Equal(t, "3", next[1].ID)
90+
assert.Len(t, next, 2)
91+
92+
next = pool.NextRequests(1)
93+
assert.Equal(t, "1", next[0].ID)
94+
assert.Len(t, next, 1)
95+
96+
}
97+
98+
func TestEventuallySubmit(t *testing.T) {
99+
n := 100
100+
basicLog, err := zap.NewDevelopment()
101+
assert.NoError(t, err)
102+
log := basicLog.Sugar()
103+
insp := &mocks.RequestInspector{}
104+
pool := bft.RequestPool{
105+
Log: log,
106+
RequestInspector: insp,
107+
QueueSize: 50,
108+
}
109+
pool.Start()
110+
wg := sync.WaitGroup{}
111+
wg.Add(2 * n)
112+
for i := 0; i < n; i++ {
113+
go func(i int) {
114+
byteReq := []byte{byte(i)}
115+
str := fmt.Sprintf("%d", i)
116+
insp.On("RequestID", byteReq).Return(types.RequestInfo{ID: str, ClientID: str})
117+
err := pool.Submit(byteReq)
118+
assert.NoError(t, err)
119+
wg.Done()
120+
}(i)
121+
go func(i int) {
122+
byteReq := []byte{byte(i)}
123+
str := fmt.Sprintf("%d", i)
124+
req := bft.Request{
125+
ID: str,
126+
ClientID: str,
127+
Request: byteReq,
128+
}
129+
err := pool.RemoveRequest(req)
130+
for err != nil {
131+
err = pool.RemoveRequest(req)
132+
}
133+
wg.Done()
134+
}(i)
135+
}
136+
wg.Wait()
137+
}

pkg/consensus/consensus.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ func (c *Consensus) Sync() (protos.ViewMetadata, uint64) {
4242
panic("implement me")
4343
}
4444

45-
func (c *Consensus) Submit(request []byte) {
46-
panic("implement me")
47-
}
48-
4945
func (c *Consensus) BatchRemainder(remainder [][]byte) {
5046
panic("implement me")
5147
}
@@ -69,10 +65,12 @@ type Future interface {
6965
}
7066

7167
func (c *Consensus) Start() Future {
68+
pool := algorithm.RequestPool{}
69+
// TODO use request pool in example
7270
c.controller = &algorithm.Controller{
7371
ID: c.SelfID,
7472
N: 4,
75-
RequestPool: c,
73+
RequestPool: pool,
7674
Batcher: c,
7775
Verifier: c.Verifier,
7876
Logger: c.Logger,

0 commit comments

Comments
 (0)