Skip to content

Commit 5d92b4d

Browse files
committed
prevent closing concurrently with other operations.
Add a `sync.RWMutex` protecting the Close operation. Follows the same pattern as in go-ds-badger. Address #29 / ipfs/kubo#6880
1 parent 2e5c197 commit 5d92b4d

File tree

2 files changed

+75
-6
lines changed

2 files changed

+75
-6
lines changed

datastore.go

+37-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package leveldb
33
import (
44
"os"
55
"path/filepath"
6+
"sync"
67

78
ds "github.com/ipfs/go-datastore"
89
dsq "github.com/ipfs/go-datastore/query"
@@ -52,11 +53,12 @@ func NewDatastore(path string, opts *Options) (*Datastore, error) {
5253
return nil, err
5354
}
5455

55-
return &Datastore{
56-
accessor: &accessor{ldb: db, syncWrites: true},
56+
ds := Datastore{
57+
accessor: &accessor{ldb: db, syncWrites: true, closeLk: new(sync.RWMutex)},
5758
DB: db,
5859
path: path,
59-
}, nil
60+
}
61+
return &ds, nil
6062
}
6163

6264
// An extraction of the common interface between LevelDB Transactions and the DB itself.
@@ -74,9 +76,12 @@ type levelDbOps interface {
7476
type accessor struct {
7577
ldb levelDbOps
7678
syncWrites bool
79+
closeLk *sync.RWMutex
7780
}
7881

7982
func (a *accessor) Put(key ds.Key, value []byte) (err error) {
83+
a.closeLk.RLock()
84+
defer a.closeLk.RUnlock()
8085
return a.ldb.Put(key.Bytes(), value, &opt.WriteOptions{Sync: a.syncWrites})
8186
}
8287

@@ -85,6 +90,8 @@ func (a *accessor) Sync(prefix ds.Key) error {
8590
}
8691

8792
func (a *accessor) Get(key ds.Key) (value []byte, err error) {
93+
a.closeLk.RLock()
94+
defer a.closeLk.RUnlock()
8895
val, err := a.ldb.Get(key.Bytes(), nil)
8996
if err != nil {
9097
if err == leveldb.ErrNotFound {
@@ -96,18 +103,24 @@ func (a *accessor) Get(key ds.Key) (value []byte, err error) {
96103
}
97104

98105
func (a *accessor) Has(key ds.Key) (exists bool, err error) {
106+
a.closeLk.RLock()
107+
defer a.closeLk.RUnlock()
99108
return a.ldb.Has(key.Bytes(), nil)
100109
}
101110

102-
func (d *accessor) GetSize(key ds.Key) (size int, err error) {
103-
return ds.GetBackedSize(d, key)
111+
func (a *accessor) GetSize(key ds.Key) (size int, err error) {
112+
return ds.GetBackedSize(a, key)
104113
}
105114

106115
func (a *accessor) Delete(key ds.Key) (err error) {
116+
a.closeLk.RLock()
117+
defer a.closeLk.RUnlock()
107118
return a.ldb.Delete(key.Bytes(), &opt.WriteOptions{Sync: a.syncWrites})
108119
}
109120

110121
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
122+
a.closeLk.RLock()
123+
defer a.closeLk.RUnlock()
111124
var rnge *util.Range
112125

113126
// make a copy of the query for the fallback naive query implementation.
@@ -135,6 +148,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
135148
}
136149
r := dsq.ResultsFromIterator(q, dsq.Iterator{
137150
Next: func() (dsq.Result, bool) {
151+
a.closeLk.RLock()
152+
defer a.closeLk.RUnlock()
138153
if !next() {
139154
return dsq.Result{}, false
140155
}
@@ -149,6 +164,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
149164
return dsq.Result{Entry: e}, true
150165
},
151166
Close: func() error {
167+
a.closeLk.RLock()
168+
defer a.closeLk.RUnlock()
152169
i.Release()
153170
return nil
154171
},
@@ -159,6 +176,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
159176
// DiskUsage returns the current disk size used by this levelDB.
160177
// For in-mem datastores, it will return 0.
161178
func (d *Datastore) DiskUsage() (uint64, error) {
179+
d.closeLk.RLock()
180+
defer d.closeLk.RUnlock()
162181
if d.path == "" { // in-mem
163182
return 0, nil
164183
}
@@ -182,19 +201,23 @@ func (d *Datastore) DiskUsage() (uint64, error) {
182201

183202
// LevelDB needs to be closed.
184203
func (d *Datastore) Close() (err error) {
204+
d.closeLk.Lock()
205+
defer d.closeLk.Unlock()
185206
return d.DB.Close()
186207
}
187208

188209
type leveldbBatch struct {
189210
b *leveldb.Batch
190211
db *leveldb.DB
212+
closeLk *sync.RWMutex
191213
syncWrites bool
192214
}
193215

194216
func (d *Datastore) Batch() (ds.Batch, error) {
195217
return &leveldbBatch{
196218
b: new(leveldb.Batch),
197219
db: d.DB,
220+
closeLk: d.closeLk,
198221
syncWrites: d.syncWrites,
199222
}, nil
200223
}
@@ -205,6 +228,8 @@ func (b *leveldbBatch) Put(key ds.Key, value []byte) error {
205228
}
206229

207230
func (b *leveldbBatch) Commit() error {
231+
b.closeLk.RLock()
232+
defer b.closeLk.RUnlock()
208233
return b.db.Write(b.b, &opt.WriteOptions{Sync: b.syncWrites})
209234
}
210235

@@ -220,18 +245,24 @@ type transaction struct {
220245
}
221246

222247
func (t *transaction) Commit() error {
248+
t.closeLk.RLock()
249+
defer t.closeLk.RUnlock()
223250
return t.tx.Commit()
224251
}
225252

226253
func (t *transaction) Discard() {
254+
t.closeLk.RLock()
255+
defer t.closeLk.RUnlock()
227256
t.tx.Discard()
228257
}
229258

230259
func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
260+
d.closeLk.RLock()
261+
defer d.closeLk.RUnlock()
231262
tx, err := d.DB.OpenTransaction()
232263
if err != nil {
233264
return nil, err
234265
}
235-
accessor := &accessor{ldb: tx, syncWrites: false}
266+
accessor := &accessor{ldb: tx, syncWrites: false, closeLk: d.closeLk}
236267
return &transaction{accessor, tx}, nil
237268
}

ds_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,44 @@ func TestQueryRespectsProcess(t *testing.T) {
147147
addTestCases(t, d, testcases)
148148
}
149149

150+
func TestCloseRace(t *testing.T) {
151+
d, close := newDS(t)
152+
for n := 0; n < 100; n++ {
153+
d.Put(ds.NewKey(fmt.Sprintf("%d", n)), []byte(fmt.Sprintf("test%d", n)))
154+
}
155+
156+
tx, _ := d.NewTransaction(false)
157+
tx.Put(ds.NewKey("txnversion"), []byte("bump"))
158+
159+
closeCh := make(chan interface{})
160+
161+
go func() {
162+
close()
163+
closeCh <- nil
164+
}()
165+
for k := range testcases {
166+
tx.Get(ds.NewKey(k))
167+
}
168+
tx.Commit()
169+
<-closeCh
170+
}
171+
172+
func TestCloseSafety(t *testing.T) {
173+
d, close := newDS(t)
174+
addTestCases(t, d, testcases)
175+
176+
tx, _ := d.NewTransaction(false)
177+
err := tx.Put(ds.NewKey("test"), []byte("test"))
178+
if err != nil {
179+
t.Error("Failed to put in a txn.")
180+
}
181+
close()
182+
err = tx.Commit()
183+
if err == nil {
184+
t.Error("committing after close should fail.")
185+
}
186+
}
187+
150188
func TestQueryRespectsProcessMem(t *testing.T) {
151189
d := newDSMem(t)
152190
addTestCases(t, d, testcases)

0 commit comments

Comments
 (0)