Skip to content

Commit d7407d1

Browse files
authored
Change WC size estimations (#3106)
2 parents de89b7a + dfe2711 commit d7407d1

File tree

9 files changed

+170
-67
lines changed

9 files changed

+170
-67
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Changelog for NeoFS Node
2121

2222
### Changed
2323
- Number of cuncurrenly handled notifications from the chain was increased from 10 to 300 for IR (#3068)
24+
- Write-cache size estimations (#3106)
2425

2526
### Removed
2627
- Drop creating new eacl tables with public keys (#3096)

pkg/local_object_storage/blobstor/fstree/fstree.go

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func addressFromString(s string) (*oid.Address, error) {
139139

140140
// Iterate iterates over all stored objects.
141141
func (t *FSTree) Iterate(objHandler func(addr oid.Address, data []byte, id []byte) error, errorHandler func(addr oid.Address, err error) error) error {
142-
return t.iterate(0, []string{t.RootPath}, objHandler, errorHandler, nil)
142+
return t.iterate(0, []string{t.RootPath}, objHandler, errorHandler, nil, nil)
143143
}
144144

145145
// IterateAddresses iterates over all objects stored in the underlying storage
@@ -152,13 +152,27 @@ func (t *FSTree) IterateAddresses(f func(addr oid.Address) error, ignoreErrors b
152152
errorHandler = func(oid.Address, error) error { return nil }
153153
}
154154

155-
return t.iterate(0, []string{t.RootPath}, nil, errorHandler, f)
155+
return t.iterate(0, []string{t.RootPath}, nil, errorHandler, f, nil)
156+
}
157+
158+
// IterateSizes iterates over all objects stored in the underlying storage
159+
// and passes their addresses and sizes into f. If f returns an error, IterateSizes
160+
// returns it and breaks. ignoreErrors allows to continue if internal errors
161+
// happen.
162+
func (t *FSTree) IterateSizes(f func(addr oid.Address, size uint64) error, ignoreErrors bool) error {
163+
var errorHandler func(oid.Address, error) error
164+
if ignoreErrors {
165+
errorHandler = func(oid.Address, error) error { return nil }
166+
}
167+
168+
return t.iterate(0, []string{t.RootPath}, nil, errorHandler, nil, f)
156169
}
157170

158171
func (t *FSTree) iterate(depth uint64, curPath []string,
159172
objHandler func(oid.Address, []byte, []byte) error,
160173
errorHandler func(oid.Address, error) error,
161-
addrHandler func(oid.Address) error) error {
174+
addrHandler func(oid.Address) error,
175+
sizeHandler func(oid.Address, uint64) error) error {
162176
curName := strings.Join(curPath[1:], "")
163177
dir := filepath.Join(curPath...)
164178
des, err := os.ReadDir(dir)
@@ -177,7 +191,7 @@ func (t *FSTree) iterate(depth uint64, curPath []string,
177191
curPath[l] = des[i].Name()
178192

179193
if !isLast && des[i].IsDir() {
180-
err := t.iterate(depth+1, curPath, objHandler, errorHandler, addrHandler)
194+
err := t.iterate(depth+1, curPath, objHandler, errorHandler, addrHandler, sizeHandler)
181195
if err != nil {
182196
// Must be error from handler in case errors are ignored.
183197
// Need to report.
@@ -199,24 +213,36 @@ func (t *FSTree) iterate(depth uint64, curPath []string,
199213
} else {
200214
var data []byte
201215
p := filepath.Join(curPath...)
202-
data, err = getRawObjectBytes(addr.Object(), p)
203-
if err != nil && errors.Is(err, apistatus.ObjectNotFound{}) {
204-
continue
205-
}
206-
if err == nil {
207-
data, err = t.Decompress(data)
208-
}
209-
if err != nil {
210-
if errorHandler != nil {
211-
err = errorHandler(*addr, err)
212-
if err == nil {
213-
continue
216+
if sizeHandler != nil {
217+
err = filepath.Walk(p, func(path string, info os.FileInfo, _ error) error {
218+
if !info.IsDir() {
219+
err = sizeHandler(*addr, uint64(info.Size()))
220+
if err != nil {
221+
return err
222+
}
214223
}
224+
return nil
225+
})
226+
} else {
227+
data, err = getRawObjectBytes(addr.Object(), p)
228+
if err != nil && errors.Is(err, apistatus.ObjectNotFound{}) {
229+
continue
230+
}
231+
if err == nil {
232+
data, err = t.Decompress(data)
233+
}
234+
if err != nil {
235+
if errorHandler != nil {
236+
err = errorHandler(*addr, err)
237+
if err == nil {
238+
continue
239+
}
240+
}
241+
return fmt.Errorf("read file %q: %w", p, err)
215242
}
216-
return fmt.Errorf("read file %q: %w", p, err)
217-
}
218243

219-
err = objHandler(*addr, data, []byte{})
244+
err = objHandler(*addr, data, []byte{})
245+
}
220246
}
221247

222248
if err != nil {
@@ -456,29 +482,6 @@ func (t *FSTree) GetRange(addr oid.Address, from uint64, length uint64) ([]byte,
456482
return payload[from:to], nil
457483
}
458484

459-
// NumberOfObjects walks the file tree rooted at FSTree's root
460-
// and returns number of stored objects.
461-
func (t *FSTree) NumberOfObjects() (uint64, error) {
462-
var counter uint64
463-
464-
// it is simpler to just consider every file
465-
// that is not directory as an object
466-
err := filepath.WalkDir(t.RootPath,
467-
func(_ string, d fs.DirEntry, _ error) error {
468-
if !d.IsDir() {
469-
counter++
470-
}
471-
472-
return nil
473-
},
474-
)
475-
if err != nil {
476-
return 0, fmt.Errorf("could not walk through %q directory: %w", t.RootPath, err)
477-
}
478-
479-
return counter, nil
480-
}
481-
482485
// Type is fstree storage type used in logs and configuration.
483486
const Type = "fstree"
484487

pkg/local_object_storage/writecache/delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (c *cache) Delete(addr oid.Address) error {
2424
storagelog.StorageTypeField(wcStorageType),
2525
storagelog.OpField("DELETE"),
2626
)
27-
c.objCounters.DecFS()
27+
c.objCounters.Delete(addr)
2828
}
2929

3030
return err
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package writecache
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
"time"
9+
10+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
11+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
12+
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
13+
"github.com/stretchr/testify/require"
14+
"go.etcd.io/bbolt"
15+
)
16+
17+
func TestMigrateFromBolt(t *testing.T) {
18+
c, b, _ := newCache(t)
19+
20+
wc := c.(*cache)
21+
path := filepath.Join(wc.path, dbName)
22+
23+
require.NoError(t, wc.Close())
24+
25+
t.Run("ok, no database", func(t *testing.T) {
26+
require.NoError(t, wc.migrate())
27+
})
28+
29+
db, err := bbolt.Open(path, os.ModePerm, &bbolt.Options{
30+
NoFreelistSync: true,
31+
ReadOnly: false,
32+
Timeout: time.Second,
33+
})
34+
require.NoError(t, err)
35+
36+
t.Run("couldn't open database", func(t *testing.T) {
37+
err := wc.migrate()
38+
require.Error(t, err)
39+
fmt.Println(err)
40+
})
41+
42+
require.NoError(t, db.Close())
43+
t.Run("no default bucket", func(t *testing.T) {
44+
err := wc.migrate()
45+
require.Error(t, err)
46+
fmt.Println(err)
47+
})
48+
49+
db, err = bbolt.Open(path, os.ModePerm, &bbolt.Options{
50+
NoFreelistSync: true,
51+
ReadOnly: false,
52+
Timeout: time.Second,
53+
})
54+
require.NoError(t, err)
55+
56+
obj := objecttest.Object()
57+
58+
var addr oid.Address
59+
addr.SetObject(obj.GetID())
60+
addr.SetContainer(obj.GetContainerID())
61+
62+
require.NoError(t, db.Batch(func(tx *bbolt.Tx) error {
63+
b, err := tx.CreateBucketIfNotExists(defaultBucket)
64+
require.NoError(t, err)
65+
66+
require.NoError(t, b.Put([]byte(addr.String()), obj.Marshal()))
67+
68+
return nil
69+
}))
70+
71+
t.Run("migrate object", func(t *testing.T) {
72+
require.NoError(t, db.Close())
73+
require.NoError(t, wc.migrate())
74+
75+
_, err := wc.Get(addr)
76+
require.Error(t, err, apistatus.ObjectNotFound{})
77+
78+
bObject, err := b.Get(addr, []byte{})
79+
require.NoError(t, err)
80+
require.Equal(t, obj.GetID(), bObject.GetID())
81+
require.Equal(t, obj.GetContainerID(), bObject.GetContainerID())
82+
require.Equal(t, obj.Marshal(), bObject.Marshal())
83+
84+
_, err = os.Stat(path)
85+
require.Error(t, err, os.ErrNotExist)
86+
})
87+
}

pkg/local_object_storage/writecache/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type options struct {
3838
// maxCacheSize is the maximum total size of all objects saved in cache.
3939
// 1 GiB by default.
4040
maxCacheSize uint64
41-
// objCounters contains atomic counters for the number of objects stored in cache.
41+
// objCounters contains object list along with sizes and overall size of cache.
4242
objCounters counters
4343
// noSync is true iff FSTree allows unsynchronized writes.
4444
noSync bool

pkg/local_object_storage/writecache/put.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ func (c *cache) Put(addr oid.Address, obj *objectSDK.Object, data []byte) error
3939

4040
// put writes object to FSTree and pushes it to the flush workers queue.
4141
func (c *cache) put(addr oid.Address, obj objectInfo) error {
42-
cacheSz := c.estimateCacheSize()
43-
if c.maxCacheSize < c.incSizeFS(cacheSz) {
42+
cacheSz := c.objCounters.Size()
43+
objSz := uint64(len(obj.data))
44+
if c.maxCacheSize < cacheSz+objSz {
4445
return ErrOutOfSpace
4546
}
4647

@@ -54,7 +55,7 @@ func (c *cache) put(addr oid.Address, obj objectInfo) error {
5455
c.compressFlags[obj.addr] = struct{}{}
5556
c.mtx.Unlock()
5657
}
57-
c.objCounters.IncFS()
58+
c.objCounters.Add(addr, objSz)
5859
storagelog.Write(c.log,
5960
storagelog.AddressField(obj.addr),
6061
storagelog.StorageTypeField(wcStorageType),

pkg/local_object_storage/writecache/state.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,49 @@ package writecache
22

33
import (
44
"fmt"
5-
"math"
6-
"sync/atomic"
5+
"sync"
6+
7+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
78
)
89

9-
func (c *cache) estimateCacheSize() uint64 {
10-
return c.objCounters.FS() * c.maxObjectSize
10+
type counters struct {
11+
mu sync.Mutex
12+
objMap map[oid.Address]uint64
13+
size uint64
1114
}
1215

13-
func (c *cache) incSizeFS(sz uint64) uint64 {
14-
return sz + c.maxObjectSize
15-
}
16+
func (x *counters) Add(addr oid.Address, size uint64) {
17+
x.mu.Lock()
18+
defer x.mu.Unlock()
1619

17-
type counters struct {
18-
cFS atomic.Uint64
20+
x.size += size
21+
x.objMap[addr] = size
1922
}
2023

21-
func (x *counters) IncFS() {
22-
x.cFS.Add(1)
23-
}
24+
func (x *counters) Delete(addr oid.Address) {
25+
x.mu.Lock()
26+
defer x.mu.Unlock()
2427

25-
func (x *counters) DecFS() {
26-
x.cFS.Add(math.MaxUint32)
28+
x.size -= x.objMap[addr]
29+
delete(x.objMap, addr)
2730
}
2831

29-
func (x *counters) FS() uint64 {
30-
return x.cFS.Load()
32+
func (x *counters) Size() uint64 {
33+
x.mu.Lock()
34+
defer x.mu.Unlock()
35+
return x.size
3136
}
3237

3338
func (c *cache) initCounters() error {
34-
inFS, err := c.fsTree.NumberOfObjects()
39+
var sizeHandler = func(addr oid.Address, size uint64) error {
40+
c.objCounters.Add(addr, size)
41+
return nil
42+
}
43+
44+
err := c.fsTree.IterateSizes(sizeHandler, false)
3545
if err != nil {
3646
return fmt.Errorf("could not read write-cache FS counter: %w", err)
3747
}
3848

39-
c.objCounters.cFS.Store(inFS)
40-
4149
return nil
4250
}

pkg/local_object_storage/writecache/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (c *cache) deleteFromDisk(keys []string) []string {
9797
storagelog.StorageTypeField(wcStorageType),
9898
storagelog.OpField("DELETE"),
9999
)
100-
c.objCounters.DecFS()
100+
c.objCounters.Delete(addr)
101101
}
102102
}
103103

pkg/local_object_storage/writecache/writecache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ func New(opts ...Option) Cache {
101101
log: zap.NewNop(),
102102
maxObjectSize: defaultMaxObjectSize,
103103
maxCacheSize: defaultMaxCacheSize,
104+
objCounters: counters{
105+
objMap: make(map[oid.Address]uint64),
106+
},
104107
},
105108
}
106109

0 commit comments

Comments
 (0)