Skip to content

Commit ea48232

Browse files
committed
prevent closing concurrently with other operations.
Add a `sync.RWMutex` protecting the Close operation. Follows the same pattern as in go-ds-badger. Address #29 / ipfs/kubo#6880
1 parent 2e5c197 commit ea48232

File tree

1 file changed

+38
-7
lines changed

1 file changed

+38
-7
lines changed

datastore.go

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package leveldb
33
import (
44
"os"
55
"path/filepath"
6+
"sync"
67

78
ds "github.com/ipfs/go-datastore"
89
dsq "github.com/ipfs/go-datastore/query"
@@ -16,8 +17,9 @@ import (
1617

1718
type Datastore struct {
1819
*accessor
19-
DB *leveldb.DB
20-
path string
20+
DB *leveldb.DB
21+
path string
22+
closeLk sync.RWMutex
2123
}
2224

2325
var _ ds.Datastore = (*Datastore)(nil)
@@ -52,11 +54,13 @@ func NewDatastore(path string, opts *Options) (*Datastore, error) {
5254
return nil, err
5355
}
5456

55-
return &Datastore{
57+
ds := Datastore{
5658
accessor: &accessor{ldb: db, syncWrites: true},
5759
DB: db,
5860
path: path,
59-
}, nil
61+
}
62+
ds.accessor.closeLk = &ds.closeLk
63+
return &ds, nil
6064
}
6165

6266
// An extraction of the common interface between LevelDB Transactions and the DB itself.
@@ -74,9 +78,12 @@ type levelDbOps interface {
7478
type accessor struct {
7579
ldb levelDbOps
7680
syncWrites bool
81+
closeLk *sync.RWMutex
7782
}
7883

7984
func (a *accessor) Put(key ds.Key, value []byte) (err error) {
85+
a.closeLk.RLock()
86+
defer a.closeLk.RUnlock()
8087
return a.ldb.Put(key.Bytes(), value, &opt.WriteOptions{Sync: a.syncWrites})
8188
}
8289

@@ -85,6 +92,8 @@ func (a *accessor) Sync(prefix ds.Key) error {
8592
}
8693

8794
func (a *accessor) Get(key ds.Key) (value []byte, err error) {
95+
a.closeLk.RLock()
96+
defer a.closeLk.RUnlock()
8897
val, err := a.ldb.Get(key.Bytes(), nil)
8998
if err != nil {
9099
if err == leveldb.ErrNotFound {
@@ -96,18 +105,26 @@ func (a *accessor) Get(key ds.Key) (value []byte, err error) {
96105
}
97106

98107
func (a *accessor) Has(key ds.Key) (exists bool, err error) {
108+
a.closeLk.RLock()
109+
defer a.closeLk.RUnlock()
99110
return a.ldb.Has(key.Bytes(), nil)
100111
}
101112

102-
func (d *accessor) GetSize(key ds.Key) (size int, err error) {
103-
return ds.GetBackedSize(d, key)
113+
func (a *accessor) GetSize(key ds.Key) (size int, err error) {
114+
a.closeLk.RLock()
115+
defer a.closeLk.RUnlock()
116+
return ds.GetBackedSize(a, key)
104117
}
105118

106119
func (a *accessor) Delete(key ds.Key) (err error) {
120+
a.closeLk.RLock()
121+
defer a.closeLk.RUnlock()
107122
return a.ldb.Delete(key.Bytes(), &opt.WriteOptions{Sync: a.syncWrites})
108123
}
109124

110125
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
126+
a.closeLk.RLock()
127+
defer a.closeLk.RUnlock()
111128
var rnge *util.Range
112129

113130
// make a copy of the query for the fallback naive query implementation.
@@ -159,6 +176,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
159176
// DiskUsage returns the current disk size used by this levelDB.
160177
// For in-mem datastores, it will return 0.
161178
func (d *Datastore) DiskUsage() (uint64, error) {
179+
d.closeLk.RLock()
180+
defer d.closeLk.RUnlock()
162181
if d.path == "" { // in-mem
163182
return 0, nil
164183
}
@@ -182,19 +201,23 @@ func (d *Datastore) DiskUsage() (uint64, error) {
182201

183202
// LevelDB needs to be closed.
184203
func (d *Datastore) Close() (err error) {
204+
d.closeLk.Lock()
205+
defer d.closeLk.Unlock()
185206
return d.DB.Close()
186207
}
187208

188209
type leveldbBatch struct {
189210
b *leveldb.Batch
190211
db *leveldb.DB
212+
closeLk *sync.RWMutex
191213
syncWrites bool
192214
}
193215

194216
func (d *Datastore) Batch() (ds.Batch, error) {
195217
return &leveldbBatch{
196218
b: new(leveldb.Batch),
197219
db: d.DB,
220+
closeLk: &d.closeLk,
198221
syncWrites: d.syncWrites,
199222
}, nil
200223
}
@@ -205,6 +228,8 @@ func (b *leveldbBatch) Put(key ds.Key, value []byte) error {
205228
}
206229

207230
func (b *leveldbBatch) Commit() error {
231+
b.closeLk.RLock()
232+
defer b.closeLk.RUnlock()
208233
return b.db.Write(b.b, &opt.WriteOptions{Sync: b.syncWrites})
209234
}
210235

@@ -220,18 +245,24 @@ type transaction struct {
220245
}
221246

222247
func (t *transaction) Commit() error {
248+
t.closeLk.RLock()
249+
defer t.closeLk.RUnlock()
223250
return t.tx.Commit()
224251
}
225252

226253
func (t *transaction) Discard() {
254+
t.closeLk.RLock()
255+
defer t.closeLk.RUnlock()
227256
t.tx.Discard()
228257
}
229258

230259
func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
260+
d.closeLk.RLock()
261+
defer d.closeLk.RUnlock()
231262
tx, err := d.DB.OpenTransaction()
232263
if err != nil {
233264
return nil, err
234265
}
235-
accessor := &accessor{ldb: tx, syncWrites: false}
266+
accessor := &accessor{ldb: tx, syncWrites: false, closeLk: &d.closeLk}
236267
return &transaction{accessor, tx}, nil
237268
}

0 commit comments

Comments
 (0)