Skip to content
This repository was archived by the owner on May 13, 2022. It is now read-only.

Commit 3d90913

Browse files
author
Casey Kuhlman
authored
Merge pull request #1419 from hyperledger/concurrency
Make RWTree concurrency safe
2 parents 5888d4a + af6f3f0 commit 3d90913

File tree

7 files changed

+177
-107
lines changed

7 files changed

+177
-107
lines changed

.gitattributes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
*.pb.go linguist-generated
2+
yarn.lock linguist-generated

rpc/rpctransact/transact_server.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package rpctransact
22

33
import (
44
"fmt"
5-
"sync"
65
"time"
76

87
"github.com/hyperledger/burrow/logging"
@@ -27,7 +26,6 @@ type transactServer struct {
2726
transactor *execution.Transactor
2827
txCodec txs.Codec
2928
logger *logging.Logger
30-
lock *sync.Mutex
3129
}
3230

3331
func NewTransactServer(state acmstate.Reader, blockchain bcm.BlockchainInfo, transactor *execution.Transactor,
@@ -38,7 +36,6 @@ func NewTransactServer(state acmstate.Reader, blockchain bcm.BlockchainInfo, tra
3836
transactor: transactor,
3937
txCodec: txCodec,
4038
logger: logger.WithScope("NewTransactServer()"),
41-
lock: &sync.Mutex{},
4239
}
4340
}
4441

@@ -104,14 +101,10 @@ func (ts *transactServer) CallTxSim(ctx context.Context, param *payload.CallTx)
104101
if param.Address == nil {
105102
return nil, fmt.Errorf("CallSim requires a non-nil address from which to retrieve code")
106103
}
107-
ts.lock.Lock()
108-
defer ts.lock.Unlock()
109104
return execution.CallSim(ts.state, ts.blockchain, param.Input.Address, *param.Address, param.Data, ts.logger)
110105
}
111106

112107
func (ts *transactServer) CallCodeSim(ctx context.Context, param *CallCodeParam) (*exec.TxExecution, error) {
113-
ts.lock.Lock()
114-
defer ts.lock.Unlock()
115108
return execution.CallCodeSim(ts.state, ts.blockchain, param.FromAddress, param.FromAddress, param.Code, param.Data,
116109
ts.logger)
117110
}

storage/key_value_store.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package storage
2+
3+
import (
4+
"bytes"
5+
)
6+
7+
// KeyOrder maps []byte{} -> -1, []byte(nil) -> 1, and everything else to 0. This encodes the assumptions of the
8+
// KVIterator domain endpoints
9+
func KeyOrder(key []byte) int {
10+
if key == nil {
11+
// Sup
12+
return 1
13+
}
14+
if len(key) == 0 {
15+
// Inf
16+
return -1
17+
}
18+
// Normal key
19+
return 0
20+
}
21+
22+
// Sorts the keys as if they were compared lexicographically with their KeyOrder prepended
23+
func CompareKeys(k1, k2 []byte) int {
24+
ko1 := KeyOrder(k1)
25+
ko2 := KeyOrder(k2)
26+
if ko1 < ko2 {
27+
return -1
28+
}
29+
if ko1 > ko2 {
30+
return 1
31+
}
32+
return bytes.Compare(k1, k2)
33+
}
34+
35+
// NormaliseDomain encodes the assumption that when nil is used as a lower bound is interpreted as low rather than high
36+
func NormaliseDomain(low, high []byte) ([]byte, []byte) {
37+
if len(low) == 0 {
38+
low = []byte{}
39+
}
40+
return low, high
41+
}
File renamed without changes.

storage/rwtree.go

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package storage
22

33
import (
44
"fmt"
5+
"sync"
6+
"sync/atomic"
57

68
"github.com/cosmos/iavl"
7-
89
dbm "github.com/tendermint/tm-db"
910
"github.com/xlab/treeprint"
1011
)
@@ -13,26 +14,40 @@ import (
1314
// recently saved version of the tree - which provides immutable access. Writes are routed to a working tree that is
1415
// mutable. On save the working tree is saved to DB, frozen, and replaces the previous immutable read tree.
1516
type RWTree struct {
17+
// Synchronise 'write side access' - i.e. to the write tree and updated flag (which only read-locks this mutex)
18+
sync.RWMutex
1619
// Working tree accumulating writes
1720
tree *MutableTree
1821
// Read-only tree serving previous state
19-
*ImmutableTree
22+
readTree atomic.Value // *ImmutableTree
2023
// Have any writes occurred since last save
2124
updated bool
2225
}
2326

27+
var _ KVCallbackIterableReader = &RWTree{}
28+
var _ Versioned = &RWTree{}
29+
2430
// Creates a concurrency safe version of an IAVL tree whereby reads are routed to the last saved tree.
2531
// Writes must be serialised (as they are within a commit for example).
2632
func NewRWTree(db dbm.DB, cacheSize int) (*RWTree, error) {
2733
tree, err := NewMutableTree(db, cacheSize)
28-
return &RWTree{
29-
tree: tree,
30-
ImmutableTree: &ImmutableTree{iavl.NewImmutableTree(db, cacheSize)},
31-
}, err
34+
if err != nil {
35+
return nil, err
36+
}
37+
readTree := &ImmutableTree{iavl.NewImmutableTree(db, cacheSize)}
38+
rwt := &RWTree{
39+
tree: tree,
40+
}
41+
rwt.readTree.Store(readTree)
42+
return rwt, nil
3243
}
3344

45+
// Write-side write methods - of the mutable tree - synchronised by write-lock of RWMutex
46+
3447
// Tries to load the execution state from DB, returns nil with no error if no state found
3548
func (rwt *RWTree) Load(version int64, overwriting bool) error {
49+
rwt.Lock()
50+
defer rwt.Unlock()
3651
const errHeader = "RWTree.Load():"
3752
if version <= 0 {
3853
return fmt.Errorf("%s trying to load from non-positive version %d", errHeader, version)
@@ -42,52 +57,92 @@ func (rwt *RWTree) Load(version int64, overwriting bool) error {
4257
return fmt.Errorf("%s loading version %d: %v", errHeader, version, err)
4358
}
4459
// Set readTree at commit point == tree
45-
rwt.ImmutableTree, err = rwt.tree.GetImmutable(version)
60+
readTree, err := rwt.tree.GetImmutable(version)
4661
if err != nil {
4762
return fmt.Errorf("%s loading version %d: %v", errHeader, version, err)
4863
}
64+
rwt.readTree.Store(readTree)
65+
rwt.updated = false
66+
4967
return nil
5068
}
5169

5270
// Save the current write tree making writes accessible from read tree.
5371
func (rwt *RWTree) Save() ([]byte, int64, error) {
72+
rwt.Lock()
73+
defer rwt.Unlock()
5474
// save state at a new version may still be orphaned before we save the version against the hash
5575
hash, version, err := rwt.tree.SaveVersion()
5676
if err != nil {
5777
return nil, 0, fmt.Errorf("could not save RWTree: %v", err)
5878
}
5979
// Take an immutable reference to the tree we just saved for querying
60-
rwt.ImmutableTree, err = rwt.tree.GetImmutable(version)
80+
readTree, err := rwt.tree.GetImmutable(version)
6181
if err != nil {
62-
return nil, 0, fmt.Errorf("RWTree.Save() could not obtain ImmutableTree read tree: %v", err)
82+
return nil, 0, fmt.Errorf("RWTree.Save() could not obtain immutable read tree: %v", err)
6383
}
84+
rwt.readTree.Store(readTree)
6485
rwt.updated = false
6586
return hash, version, nil
6687
}
6788

6889
func (rwt *RWTree) Set(key, value []byte) bool {
90+
rwt.Lock()
91+
defer rwt.Unlock()
6992
rwt.updated = true
7093
return rwt.tree.Set(key, value)
7194
}
7295

7396
func (rwt *RWTree) Delete(key []byte) ([]byte, bool) {
97+
rwt.Lock()
98+
defer rwt.Unlock()
7499
rwt.updated = true
75100
return rwt.tree.Remove(key)
76101
}
77102

103+
// Write-side read methods - of the mutable tree - synchronised by read-lock of RWMutex
104+
78105
// Returns true if there have been any writes since last save
79106
func (rwt *RWTree) Updated() bool {
107+
rwt.RLock()
108+
defer rwt.RUnlock()
80109
return rwt.updated
81110
}
82111

83112
func (rwt *RWTree) GetImmutable(version int64) (*ImmutableTree, error) {
113+
rwt.RLock()
114+
defer rwt.RUnlock()
84115
return rwt.tree.GetImmutable(version)
85116
}
86117

87118
func (rwt *RWTree) IterateWriteTree(start, end []byte, ascending bool, fn func(key []byte, value []byte) error) error {
119+
rwt.RLock()
120+
defer rwt.RUnlock()
88121
return rwt.tree.IterateWriteTree(start, end, ascending, fn)
89122
}
90123

124+
// Read-side read methods - of the immutable read tree (previous saved state) - synchronised solely via atomic.Value
125+
126+
func (rwt *RWTree) Hash() []byte {
127+
return rwt.readTree.Load().(*ImmutableTree).Hash()
128+
}
129+
130+
func (rwt *RWTree) Version() int64 {
131+
return rwt.readTree.Load().(*ImmutableTree).Version()
132+
}
133+
134+
func (rwt *RWTree) Get(key []byte) ([]byte, error) {
135+
return rwt.readTree.Load().(*ImmutableTree).Get(key)
136+
}
137+
138+
func (rwt *RWTree) Has(key []byte) (bool, error) {
139+
return rwt.readTree.Load().(*ImmutableTree).Has(key)
140+
}
141+
142+
func (rwt *RWTree) Iterate(low, high []byte, ascending bool, fn func(key []byte, value []byte) error) error {
143+
return rwt.readTree.Load().(*ImmutableTree).Iterate(low, high, ascending, fn)
144+
}
145+
91146
// Tree printing
92147

93148
func (rwt *RWTree) Dump() string {

storage/rwtree_test.go

Lines changed: 67 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package storage
22

33
import (
44
"fmt"
5+
"math/rand"
56
"runtime/debug"
67
"testing"
8+
"time"
79

810
"github.com/stretchr/testify/require"
911

@@ -151,58 +153,68 @@ func capturePanic(f func() error) (err error) {
151153
return
152154
}
153155

154-
// TODO: fix
155-
//func TestConcurrentReadWriteSave(t *testing.T) {
156-
// rwt, err := NewRWTree(dbm.NewMemDB(), 100)
157-
// require.NoError(t, err)
158-
// n := 100
159-
//
160-
// doneCh := make(chan struct{})
161-
// errCh := make(chan interface{})
162-
//
163-
// go func() {
164-
// for b := byte(0); true; b++ {
165-
// val := []byte{b}
166-
// go func() {
167-
// err := capturePanic(func() error {
168-
// rwt.Set(val, val)
169-
// return nil
170-
// })
171-
// if err != nil {
172-
// errCh <- err
173-
// }
174-
// }()
175-
// go func() {
176-
// err := capturePanic(func() error {
177-
// _, err := rwt.Get(val)
178-
// return err
179-
// })
180-
// if err != nil {
181-
// errCh <- err
182-
// }
183-
// }()
184-
//
185-
// select {
186-
// case <-doneCh:
187-
// return
188-
// default:
189-
// }
190-
// }
191-
// }()
192-
//
193-
// for i := 0; i < n/10; i++ {
194-
// time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
195-
// err := capturePanic(func() error {
196-
// _, _, err := rwt.Save()
197-
// return err
198-
// })
199-
// if err != nil {
200-
// break
201-
// }
202-
// }
203-
// close(doneCh)
204-
//
205-
// for err := range errCh {
206-
// t.Fatal(err)
207-
// }
208-
//}
156+
func TestConcurrentReadWriteSave(t *testing.T) {
157+
rwt, err := NewRWTree(dbm.NewMemDB(), 100)
158+
require.NoError(t, err)
159+
n := 100
160+
161+
doneCh := make(chan struct{})
162+
errCh := make(chan interface{})
163+
164+
// Saturate with concurrent reads and writes
165+
var spin func()
166+
spin = func() {
167+
for i := 0; i < n; i++ {
168+
val := []byte{byte(i)}
169+
go func() {
170+
err := capturePanic(func() error {
171+
rwt.Set(val, val)
172+
return nil
173+
})
174+
if err != nil {
175+
errCh <- err
176+
}
177+
}()
178+
go func() {
179+
err := capturePanic(func() error {
180+
_, err := rwt.Get(val)
181+
return err
182+
})
183+
if err != nil {
184+
errCh <- err
185+
}
186+
}()
187+
188+
}
189+
select {
190+
case <-doneCh:
191+
return
192+
default:
193+
// Avoid starvation
194+
time.Sleep(time.Millisecond)
195+
spin()
196+
}
197+
}
198+
199+
// let's
200+
go spin()
201+
202+
// Ensure Save() is safe with concurrent read/writes
203+
for i := 0; i < n/10; i++ {
204+
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
205+
err := capturePanic(func() error {
206+
_, _, err := rwt.Save()
207+
return err
208+
})
209+
if err != nil {
210+
break
211+
}
212+
}
213+
close(doneCh)
214+
215+
select {
216+
case err := <-errCh:
217+
t.Fatal(err)
218+
default:
219+
}
220+
}

0 commit comments

Comments
 (0)