@@ -47,7 +47,7 @@ const (
47
47
48
48
// metricsGatheringInterval specifies the interval to retrieve pebble database
49
49
// compaction, io and pause stats to report to the user.
50
- metricsGatheringInterval = 3 * time .Second
50
+ metricsGatheringInterval = 100 * time . Millisecond // 3 * time.Second
51
51
)
52
52
53
53
// Database is a persistent key-value store based on the pebble storage engine.
@@ -71,6 +71,9 @@ type Database struct {
71
71
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
72
72
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
73
73
74
+ compDebtGauge metrics.Gauge
75
+ compInProgressGauge metrics.Gauge
76
+
74
77
levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels
75
78
76
79
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
@@ -138,7 +141,7 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {
138
141
139
142
// New returns a wrapped pebble DB object. The namespace is the prefix that the
140
143
// metrics reporting should use for surfacing internal stats.
141
- func New (file string , cache int , handles int , namespace string , readonly bool , ephemeral bool ) (* Database , error ) {
144
+ func New (file string , cache int , handles int , namespace string , readonly bool , ephemeral bool , extraOptions * ExtraOptions ) (* Database , error ) {
142
145
// Ensure we have some minimal caching and file guarantees
143
146
if cache < minCache {
144
147
cache = minCache
@@ -181,7 +184,16 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
181
184
quitChan : make (chan chan error ),
182
185
writeOptions : & pebble.WriteOptions {Sync : ! ephemeral },
183
186
}
184
- opt := & pebble.Options {
187
+
188
+ if extraOptions == nil {
189
+ extraOptions = & ExtraOptions {}
190
+ }
191
+ if extraOptions .MaxConcurrentCompactions == nil {
192
+ extraOptions .MaxConcurrentCompactions = func () int { return runtime .NumCPU () }
193
+ }
194
+
195
+ var opt * pebble.Options
196
+ opt = & pebble.Options {
185
197
// Pebble has a single combined cache area and the write
186
198
// buffers are taken from this too. Assign all available
187
199
// memory allowance for cache.
@@ -195,16 +207,16 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
195
207
// MemTableStopWritesThreshold places a hard limit on the size
196
208
// of the existent MemTables(including the frozen one).
197
209
// Note, this must be the number of tables not the size of all memtables
198
- // according to https://github.com/cockroachdb/pebble/blob/master/options .go#L738-L742
210
+ // according to https://github.com/cockroachdb/pebble/blob/master/extraOptions .go#L738-L742
199
211
// and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903.
200
212
MemTableStopWritesThreshold : memTableLimit ,
201
213
202
214
// The default compaction concurrency(1 thread),
203
215
// Here use all available CPUs for faster compaction.
204
- MaxConcurrentCompactions : func () int { return runtime . NumCPU () } ,
216
+ MaxConcurrentCompactions : extraOptions . MaxConcurrentCompactions ,
205
217
206
- // Per-level options . Options for at least one level must be specified. The
207
- // options for the last level are used for all subsequent levels.
218
+ // Per-level extraOptions . Options for at least one level must be specified. The
219
+ // extraOptions for the last level are used for all subsequent levels.
208
220
Levels : []pebble.LevelOptions {
209
221
{TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
210
222
{TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
@@ -222,11 +234,32 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
222
234
WriteStallEnd : db .onWriteStallEnd ,
223
235
},
224
236
Logger : panicLogger {}, // TODO(karalabe): Delete when this is upstreamed in Pebble
237
+
238
+ BytesPerSync : extraOptions .BytesPerSync ,
239
+ L0CompactionFileThreshold : extraOptions .L0CompactionFileThreshold ,
240
+ L0CompactionThreshold : extraOptions .L0CompactionThreshold ,
241
+ L0StopWritesThreshold : extraOptions .L0StopWritesThreshold ,
242
+ LBaseMaxBytes : extraOptions .LBaseMaxBytes ,
243
+ DisableAutomaticCompactions : extraOptions .DisableAutomaticCompactions ,
244
+ WALBytesPerSync : extraOptions .WALBytesPerSync ,
245
+ WALDir : extraOptions .WALDir ,
246
+ WALMinSyncInterval : extraOptions .WALMinSyncInterval ,
247
+ TargetByteDeletionRate : extraOptions .TargetByteDeletionRate ,
225
248
}
249
+
226
250
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
227
251
// for more details.
228
252
opt .Experimental .ReadSamplingMultiplier = - 1
229
253
254
+ if opt .Experimental .ReadSamplingMultiplier != 0 {
255
+ opt .Experimental .ReadSamplingMultiplier = extraOptions .Experimental .ReadSamplingMultiplier
256
+ }
257
+ opt .Experimental .L0CompactionConcurrency = extraOptions .Experimental .L0CompactionConcurrency
258
+ opt .Experimental .CompactionDebtConcurrency = extraOptions .Experimental .CompactionDebtConcurrency
259
+ opt .Experimental .ReadCompactionRate = extraOptions .Experimental .ReadCompactionRate
260
+ opt .Experimental .MaxWriterConcurrency = extraOptions .Experimental .MaxWriterConcurrency
261
+ opt .Experimental .ForceWriterParallelism = extraOptions .Experimental .ForceWriterParallelism
262
+
230
263
// Open the db and recover any potential corruptions
231
264
innerDB , err := pebble .Open (file , opt )
232
265
if err != nil {
@@ -248,6 +281,9 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
248
281
db .seekCompGauge = metrics .NewRegisteredGauge (namespace + "compact/seek" , nil )
249
282
db .manualMemAllocGauge = metrics .NewRegisteredGauge (namespace + "memory/manualalloc" , nil )
250
283
284
+ db .compDebtGauge = metrics .NewRegisteredGauge (namespace + "compact/debt" , nil )
285
+ db .compInProgressGauge = metrics .NewRegisteredGauge (namespace + "compact/inprogress" , nil )
286
+
251
287
// Start up the metrics gathering and return
252
288
go db .meter (metricsGatheringInterval , namespace )
253
289
return db , nil
@@ -525,6 +561,9 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
525
561
d .level0CompGauge .Update (level0CompCount )
526
562
d .seekCompGauge .Update (stats .Compact .ReadCount )
527
563
564
+ d .compDebtGauge .Update (int64 (stats .Compact .EstimatedDebt ))
565
+ d .compInProgressGauge .Update (stats .Compact .NumInProgress )
566
+
528
567
for i , level := range stats .Levels {
529
568
// Append metrics for additional layers
530
569
if i >= len (d .levelsGauge ) {
0 commit comments