Skip to content

Commit 80b7406

Browse files
committed
autobatch: thread-safe, debounce, max delay and implement Batching
Revamp the venerable autobatch in different ways: - make it thread safe (test -race is happy) - deduplicate writes (avoid writing the same key multiple time), which make it act as a debounce - introduce a maximum delay before the write happen, to avoid keeping the pending writes in memory if the count trigger is not reached - implement the Batching interface for compatibility with more usecases
1 parent ed11f24 commit 80b7406

File tree

3 files changed

+177
-54
lines changed

3 files changed

+177
-54
lines changed

autobatch/autobatch.go

Lines changed: 143 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
// Package autobatch provides a go-datastore implementation that
22
// automatically batches together writes by holding puts in memory until
3-
// a certain threshold is met.
3+
// a certain threshold is met. It also acts as a debounce.
44
package autobatch
55

66
import (
7+
"log"
8+
"sync"
9+
"time"
10+
711
ds "github.com/ipfs/go-datastore"
812
dsq "github.com/ipfs/go-datastore/query"
913
)
@@ -12,9 +16,13 @@ import (
1216
type Datastore struct {
1317
child ds.Batching
1418

15-
// TODO: discuss making ds.Batch implement the full ds.Datastore interface
16-
buffer map[ds.Key]op
17-
maxBufferEntries int
19+
mu sync.RWMutex
20+
buffer map[ds.Key]op
21+
22+
maxWrite int
23+
maxDelay time.Duration
24+
newWrite chan struct{}
25+
exit chan struct{}
1826
}
1927

2028
type op struct {
@@ -23,28 +31,79 @@ type op struct {
2331
}
2432

2533
// NewAutoBatching returns a new datastore that automatically
26-
// batches writes using the given Batching datastore. The size
27-
// of the memory pool is given by size.
28-
func NewAutoBatching(d ds.Batching, size int) *Datastore {
29-
return &Datastore{
30-
child: d,
31-
buffer: make(map[ds.Key]op, size),
32-
maxBufferEntries: size,
34+
// batches writes using the given Batching datastore. The maximum number of
35+
// write before triggering a batch is given by maxWrite. The maximum delay
36+
// before triggering a batch is given by maxDelay.
37+
func NewAutoBatching(child ds.Batching, maxWrite int, maxDelay time.Duration) *Datastore {
38+
d := &Datastore{
39+
child: child,
40+
buffer: make(map[ds.Key]op, maxWrite),
41+
maxWrite: maxWrite,
42+
maxDelay: maxDelay,
43+
newWrite: make(chan struct{}),
44+
exit: make(chan struct{}),
45+
}
46+
go d.runBatcher()
47+
return d
48+
}
49+
50+
func (d *Datastore) addOp(key ds.Key, op op) {
51+
d.mu.Lock()
52+
d.buffer[key] = op
53+
d.mu.Unlock()
54+
d.newWrite <- struct{}{}
55+
}
56+
57+
func (d *Datastore) runBatcher() {
58+
var timer <-chan time.Time
59+
60+
write := func() {
61+
timer = nil
62+
63+
b, err := d.prepareBatch(nil)
64+
if err != nil {
65+
log.Println(err)
66+
return
67+
}
68+
err = b.Commit()
69+
if err != nil {
70+
log.Println(err)
71+
return
72+
}
73+
}
74+
75+
for {
76+
select {
77+
case <-d.exit:
78+
return
79+
case <-timer:
80+
write()
81+
case <-d.newWrite:
82+
d.mu.RLock()
83+
ready := len(d.buffer)
84+
d.mu.RUnlock()
85+
if ready > d.maxWrite {
86+
write()
87+
}
88+
if timer == nil {
89+
timer = time.After(d.maxDelay)
90+
}
91+
}
3392
}
3493
}
3594

3695
// Delete deletes a key/value
3796
func (d *Datastore) Delete(k ds.Key) error {
38-
d.buffer[k] = op{delete: true}
39-
if len(d.buffer) > d.maxBufferEntries {
40-
return d.Flush()
41-
}
97+
d.addOp(k, op{delete: true})
4298
return nil
4399
}
44100

45101
// Get retrieves a value given a key.
46102
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
103+
d.mu.RLock()
47104
o, ok := d.buffer[k]
105+
d.mu.RUnlock()
106+
48107
if ok {
49108
if o.delete {
50109
return nil, ds.ErrNotFound
@@ -57,69 +116,67 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
57116

58117
// Put stores a key/value.
59118
func (d *Datastore) Put(k ds.Key, val []byte) error {
60-
d.buffer[k] = op{value: val}
61-
if len(d.buffer) > d.maxBufferEntries {
62-
return d.Flush()
63-
}
119+
d.addOp(k, op{value: val})
64120
return nil
65121
}
66122

67123
// Sync flushes all operations on keys at or under the prefix
68124
// from the current batch to the underlying datastore
69125
func (d *Datastore) Sync(prefix ds.Key) error {
70-
b, err := d.child.Batch()
126+
b, err := d.prepareBatch(&prefix)
71127
if err != nil {
72128
return err
73129
}
74-
75-
for k, o := range d.buffer {
76-
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
77-
continue
78-
}
79-
80-
var err error
81-
if o.delete {
82-
err = b.Delete(k)
83-
} else {
84-
err = b.Put(k, o.value)
85-
}
86-
if err != nil {
87-
return err
88-
}
89-
90-
delete(d.buffer, k)
91-
}
92-
93130
return b.Commit()
94131
}
95132

96133
// Flush flushes the current batch to the underlying datastore.
97134
func (d *Datastore) Flush() error {
98-
b, err := d.child.Batch()
135+
b, err := d.prepareBatch(nil)
99136
if err != nil {
100137
return err
101138
}
139+
return b.Commit()
140+
}
141+
142+
func (d *Datastore) prepareBatch(prefix *ds.Key) (ds.Batch, error) {
143+
b, err := d.child.Batch()
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
d.mu.Lock()
102149

103150
for k, o := range d.buffer {
151+
if prefix != nil && !(k.Equal(*prefix) || k.IsDescendantOf(*prefix)) {
152+
continue
153+
}
154+
104155
var err error
105156
if o.delete {
106157
err = b.Delete(k)
107158
} else {
108159
err = b.Put(k, o.value)
109160
}
110161
if err != nil {
111-
return err
162+
d.mu.Unlock()
163+
return nil, err
112164
}
165+
166+
delete(d.buffer, k)
113167
}
114-
// clear out buffer
115-
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)
116168

117-
return b.Commit()
169+
d.mu.Unlock()
170+
171+
return b, nil
118172
}
119173

120174
// Has checks if a key is stored.
121175
func (d *Datastore) Has(k ds.Key) (bool, error) {
176+
d.mu.RLock()
122177
o, ok := d.buffer[k]
178+
d.mu.RUnlock()
179+
123180
if ok {
124181
return !o.delete, nil
125182
}
@@ -129,7 +186,10 @@ func (d *Datastore) Has(k ds.Key) (bool, error) {
129186

130187
// GetSize implements Datastore.GetSize
131188
func (d *Datastore) GetSize(k ds.Key) (int, error) {
189+
d.mu.RLock()
132190
o, ok := d.buffer[k]
191+
d.mu.RUnlock()
192+
133193
if ok {
134194
if o.delete {
135195
return -1, ds.ErrNotFound
@@ -155,6 +215,18 @@ func (d *Datastore) DiskUsage() (uint64, error) {
155215
return ds.DiskUsage(d.child)
156216
}
157217

218+
func (d *Datastore) Batch() (ds.Batch, error) {
219+
b, err := d.child.Batch()
220+
if err != nil {
221+
return nil, err
222+
}
223+
return &batch{
224+
parent: d,
225+
child: b,
226+
toDelete: make(map[ds.Key]struct{}),
227+
}, nil
228+
}
229+
158230
func (d *Datastore) Close() error {
159231
err1 := d.Flush()
160232
err2 := d.child.Close()
@@ -164,5 +236,32 @@ func (d *Datastore) Close() error {
164236
if err2 != nil {
165237
return err2
166238
}
239+
close(d.exit)
240+
close(d.newWrite)
167241
return nil
168242
}
243+
244+
type batch struct {
245+
parent *Datastore
246+
child ds.Batch
247+
toDelete map[ds.Key]struct{}
248+
}
249+
250+
func (b *batch) Put(key ds.Key, value []byte) error {
251+
delete(b.toDelete, key)
252+
return b.child.Put(key, value)
253+
}
254+
255+
func (b *batch) Delete(key ds.Key) error {
256+
b.toDelete[key] = struct{}{}
257+
return b.child.Delete(key)
258+
}
259+
260+
func (b *batch) Commit() error {
261+
b.parent.mu.Lock()
262+
for key := range b.toDelete {
263+
delete(b.parent.buffer, key)
264+
}
265+
b.parent.mu.Unlock()
266+
return b.child.Commit()
267+
}

autobatch/autobatch_test.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ import (
44
"bytes"
55
"fmt"
66
"testing"
7+
"time"
78

89
ds "github.com/ipfs/go-datastore"
10+
"github.com/ipfs/go-datastore/sync"
911
dstest "github.com/ipfs/go-datastore/test"
1012
)
1113

1214
func TestAutobatch(t *testing.T) {
13-
dstest.SubtestAll(t, NewAutoBatching(ds.NewMapDatastore(), 16))
15+
dstest.SubtestAll(t, NewAutoBatching(sync.MutexWrap(ds.NewMapDatastore()), 16, time.Second))
1416
}
1517

1618
func TestFlushing(t *testing.T) {
17-
child := ds.NewMapDatastore()
18-
d := NewAutoBatching(child, 16)
19+
child := sync.MutexWrap(ds.NewMapDatastore())
20+
d := NewAutoBatching(child, 16, 500*time.Millisecond)
1921

2022
var keys []ds.Key
2123
for i := 0; i < 16; i++ {
@@ -70,6 +72,9 @@ func TestFlushing(t *testing.T) {
7072
t.Fatal(err)
7173
}
7274

75+
// flushing is async so we can rely on having it happening immediately
76+
time.Sleep(100 * time.Millisecond)
77+
7378
// should be flushed now, try to get keys from child datastore
7479
for _, k := range keys[:14] {
7580
val, err := child.Get(k)
@@ -102,11 +107,29 @@ func TestFlushing(t *testing.T) {
102107
if !bytes.Equal(val, v) {
103108
t.Fatal("wrong value")
104109
}
110+
111+
// let's test the maximum delay with a single key
112+
key17 := ds.NewKey("test17")
113+
err = d.Put(key17, v)
114+
if err != nil {
115+
t.Fatal(err)
116+
}
117+
118+
time.Sleep(600 * time.Millisecond)
119+
120+
val, err = child.Get(key17)
121+
if err != nil {
122+
t.Fatal(err)
123+
}
124+
125+
if !bytes.Equal(val, v) {
126+
t.Fatal("wrong value")
127+
}
105128
}
106129

107130
func TestSync(t *testing.T) {
108131
child := ds.NewMapDatastore()
109-
d := NewAutoBatching(child, 100)
132+
d := NewAutoBatching(child, 100, time.Second)
110133

111134
put := func(key ds.Key) {
112135
if err := d.Put(key, []byte(key.String())); err != nil {

mount/mount_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package mount_test
33
import (
44
"errors"
55
"testing"
6+
"time"
67

78
datastore "github.com/ipfs/go-datastore"
89
autobatch "github.com/ipfs/go-datastore/autobatch"
@@ -725,14 +726,14 @@ func TestLookupPrio(t *testing.T) {
725726
}
726727

727728
func TestNestedMountSync(t *testing.T) {
728-
internalDSRoot := datastore.NewMapDatastore()
729-
internalDSFoo := datastore.NewMapDatastore()
730-
internalDSFooBar := datastore.NewMapDatastore()
729+
internalDSRoot := sync.MutexWrap(datastore.NewMapDatastore())
730+
internalDSFoo := sync.MutexWrap(datastore.NewMapDatastore())
731+
internalDSFooBar := sync.MutexWrap(datastore.NewMapDatastore())
731732

732733
m := mount.New([]mount.Mount{
733-
{Prefix: datastore.NewKey("/foo"), Datastore: autobatch.NewAutoBatching(internalDSFoo, 10)},
734-
{Prefix: datastore.NewKey("/foo/bar"), Datastore: autobatch.NewAutoBatching(internalDSFooBar, 10)},
735-
{Prefix: datastore.NewKey("/"), Datastore: autobatch.NewAutoBatching(internalDSRoot, 10)},
734+
{Prefix: datastore.NewKey("/foo"), Datastore: autobatch.NewAutoBatching(internalDSFoo, 10, 100*time.Millisecond)},
735+
{Prefix: datastore.NewKey("/foo/bar"), Datastore: autobatch.NewAutoBatching(internalDSFooBar, 10, 100*time.Millisecond)},
736+
{Prefix: datastore.NewKey("/"), Datastore: autobatch.NewAutoBatching(internalDSRoot, 10, 100*time.Millisecond)},
736737
})
737738

738739
// Testing scenarios

0 commit comments

Comments
 (0)