@@ -70,6 +70,25 @@ type Database struct {
70
70
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
71
71
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
72
72
73
+ compDebtGauge metrics.Gauge
74
+ compInProgressGauge metrics.Gauge
75
+
76
+ commitCountMeter metrics.Meter
77
+ commitTotalDurationMeter metrics.Meter
78
+ commitSemaphoreWaitMeter metrics.Meter
79
+ commitMemTableWriteStallMeter metrics.Meter
80
+ commitL0ReadAmpWriteStallMeter metrics.Meter
81
+ commitWALRotationMeter metrics.Meter
82
+ commitWaitMeter metrics.Meter
83
+
84
+ commitCount atomic.Int64
85
+ commitTotalDuration atomic.Int64
86
+ commitSemaphoreWait atomic.Int64
87
+ commitMemTableWriteStall atomic.Int64
88
+ commitL0ReadAmpWriteStall atomic.Int64
89
+ commitWALRotation atomic.Int64
90
+ commitWait atomic.Int64
91
+
73
92
levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels
74
93
75
94
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
@@ -137,7 +156,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {
137
156
138
157
// New returns a wrapped pebble DB object. The namespace is the prefix that the
139
158
// metrics reporting should use for surfacing internal stats.
140
- func New (file string , cache int , handles int , namespace string , readonly bool , ephemeral bool ) (* Database , error ) {
159
+ func New (file string , cache int , handles int , namespace string , readonly bool , ephemeral bool , extraOptions * ExtraOptions ) (* Database , error ) {
160
+ if extraOptions == nil {
161
+ extraOptions = & ExtraOptions {}
162
+ }
163
+ if extraOptions .MemTableStopWritesThreshold <= 0 {
164
+ extraOptions .MemTableStopWritesThreshold = 2
165
+ }
166
+ if extraOptions .MaxConcurrentCompactions == nil {
167
+ extraOptions .MaxConcurrentCompactions = func () int { return runtime .NumCPU () }
168
+ }
169
+ var levels []pebble.LevelOptions
170
+ if len (extraOptions .Levels ) == 0 {
171
+ levels = []pebble.LevelOptions {
172
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
173
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
174
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
175
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
176
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
177
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
178
+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
179
+ }
180
+ } else {
181
+ for _ , level := range extraOptions .Levels {
182
+ levels = append (levels , pebble.LevelOptions {
183
+ BlockSize : level .BlockSize ,
184
+ IndexBlockSize : level .IndexBlockSize ,
185
+ TargetFileSize : level .TargetFileSize ,
186
+ FilterPolicy : bloom .FilterPolicy (10 ),
187
+ })
188
+ }
189
+ }
190
+
141
191
// Ensure we have some minimal caching and file guarantees
142
192
if cache < minCache {
143
193
cache = minCache
@@ -162,7 +212,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
162
212
163
213
// Two memory tables is configured which is identical to leveldb,
164
214
// including a frozen memory table and another live one.
165
- memTableLimit := 2
215
+ memTableLimit := extraOptions . MemTableStopWritesThreshold
166
216
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
167
217
168
218
// The memory table size is currently capped at maxMemTableSize-1 due to a
@@ -200,19 +250,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
200
250
201
251
// The default compaction concurrency(1 thread),
202
252
// Here use all available CPUs for faster compaction.
203
- MaxConcurrentCompactions : func () int { return runtime . NumCPU () } ,
253
+ MaxConcurrentCompactions : extraOptions . MaxConcurrentCompactions ,
204
254
205
- // Per-level options. Options for at least one level must be specified. The
206
- // options for the last level are used for all subsequent levels.
207
- Levels : []pebble.LevelOptions {
208
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
209
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
210
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
211
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
212
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
213
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
214
- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
215
- },
255
+ // Per-level extraOptions. Options for at least one level must be specified. The
256
+ // extraOptions for the last level are used for all subsequent levels.
257
+ Levels : levels ,
216
258
ReadOnly : readonly ,
217
259
EventListener : & pebble.EventListener {
218
260
CompactionBegin : db .onCompactionBegin ,
@@ -221,11 +263,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
221
263
WriteStallEnd : db .onWriteStallEnd ,
222
264
},
223
265
Logger : panicLogger {}, // TODO(karalabe): Delete when this is upstreamed in Pebble
266
+
267
+ BytesPerSync : extraOptions .BytesPerSync ,
268
+ L0CompactionFileThreshold : extraOptions .L0CompactionFileThreshold ,
269
+ L0CompactionThreshold : extraOptions .L0CompactionThreshold ,
270
+ L0StopWritesThreshold : extraOptions .L0StopWritesThreshold ,
271
+ LBaseMaxBytes : extraOptions .LBaseMaxBytes ,
272
+ DisableAutomaticCompactions : extraOptions .DisableAutomaticCompactions ,
273
+ WALBytesPerSync : extraOptions .WALBytesPerSync ,
274
+ WALDir : extraOptions .WALDir ,
275
+ WALMinSyncInterval : extraOptions .WALMinSyncInterval ,
276
+ TargetByteDeletionRate : extraOptions .TargetByteDeletionRate ,
224
277
}
225
278
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
226
279
// for more details.
227
280
opt .Experimental .ReadSamplingMultiplier = - 1
228
281
282
+ if opt .Experimental .ReadSamplingMultiplier != 0 {
283
+ opt .Experimental .ReadSamplingMultiplier = extraOptions .Experimental .ReadSamplingMultiplier
284
+ }
285
+ opt .Experimental .L0CompactionConcurrency = extraOptions .Experimental .L0CompactionConcurrency
286
+ opt .Experimental .CompactionDebtConcurrency = extraOptions .Experimental .CompactionDebtConcurrency
287
+ opt .Experimental .ReadCompactionRate = extraOptions .Experimental .ReadCompactionRate
288
+ opt .Experimental .MaxWriterConcurrency = extraOptions .Experimental .MaxWriterConcurrency
289
+ opt .Experimental .ForceWriterParallelism = extraOptions .Experimental .ForceWriterParallelism
290
+
229
291
// Open the db and recover any potential corruptions
230
292
innerDB , err := pebble .Open (file , opt )
231
293
if err != nil {
@@ -247,6 +309,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
247
309
db .seekCompGauge = metrics .GetOrRegisterGauge (namespace + "compact/seek" , nil )
248
310
db .manualMemAllocGauge = metrics .GetOrRegisterGauge (namespace + "memory/manualalloc" , nil )
249
311
312
+ db .compDebtGauge = metrics .GetOrRegisterGauge (namespace + "compact/debt" , nil )
313
+ db .compInProgressGauge = metrics .GetOrRegisterGauge (namespace + "compact/inprogress" , nil )
314
+
315
+ db .commitCountMeter = metrics .GetOrRegisterMeter (namespace + "commit/counter" , nil )
316
+ db .commitTotalDurationMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/total" , nil )
317
+ db .commitSemaphoreWaitMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/semaphorewait" , nil )
318
+ db .commitMemTableWriteStallMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/memtablewritestall" , nil )
319
+ db .commitL0ReadAmpWriteStallMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/l0readampwritestall" , nil )
320
+ db .commitWALRotationMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/walrotation" , nil )
321
+ db .commitWaitMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/commitwait" , nil )
322
+
250
323
// Start up the metrics gathering and return
251
324
go db .meter (metricsGatheringInterval , namespace )
252
325
return db , nil
@@ -459,6 +532,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
459
532
compReads [2 ]int64
460
533
461
534
nWrites [2 ]int64
535
+
536
+ commitCounts [2 ]int64
537
+ commitTotalDurations [2 ]int64
538
+ commitSemaphoreWaits [2 ]int64
539
+ commitMemTableWriteStalls [2 ]int64
540
+ commitL0ReadAmpWriteStalls [2 ]int64
541
+ commitWALRotations [2 ]int64
542
+ commitWaits [2 ]int64
462
543
)
463
544
464
545
// Iterate ad infinitum and collect the stats
@@ -474,6 +555,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
474
555
writeDelayTime = d .writeDelayTime .Load ()
475
556
nonLevel0CompCount = int64 (d .nonLevel0Comp .Load ())
476
557
level0CompCount = int64 (d .level0Comp .Load ())
558
+
559
+ commitCount = d .commitCount .Load ()
560
+ commitTotalDuration = d .commitTotalDuration .Load ()
561
+ commitSemaphoreWait = d .commitSemaphoreWait .Load ()
562
+ commitMemTableWriteStall = d .commitMemTableWriteStall .Load ()
563
+ commitL0ReadAmpWriteStall = d .commitL0ReadAmpWriteStall .Load ()
564
+ commitWALRotation = d .commitWALRotation .Load ()
565
+ commitWait = d .commitWait .Load ()
477
566
)
478
567
writeDelayTimes [i % 2 ] = writeDelayTime
479
568
writeDelayCounts [i % 2 ] = writeDelayCount
@@ -524,6 +613,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
524
613
d .level0CompGauge .Update (level0CompCount )
525
614
d .seekCompGauge .Update (stats .Compact .ReadCount )
526
615
616
+ commitCounts [i % 2 ] = commitCount
617
+ commitTotalDurations [i % 2 ] = commitTotalDuration
618
+ commitSemaphoreWaits [i % 2 ] = commitSemaphoreWait
619
+ commitMemTableWriteStalls [i % 2 ] = commitMemTableWriteStall
620
+ commitL0ReadAmpWriteStalls [i % 2 ] = commitL0ReadAmpWriteStall
621
+ commitWALRotations [i % 2 ] = commitWALRotation
622
+ commitWaits [i % 2 ] = commitWait
623
+
624
+ d .commitCountMeter .Mark (commitCounts [i % 2 ] - commitCounts [(i - 1 )% 2 ])
625
+ d .commitTotalDurationMeter .Mark (commitTotalDurations [i % 2 ] - commitTotalDurations [(i - 1 )% 2 ])
626
+ d .commitSemaphoreWaitMeter .Mark (commitSemaphoreWaits [i % 2 ] - commitSemaphoreWaits [(i - 1 )% 2 ])
627
+ d .commitMemTableWriteStallMeter .Mark (commitMemTableWriteStalls [i % 2 ] - commitMemTableWriteStalls [(i - 1 )% 2 ])
628
+ d .commitL0ReadAmpWriteStallMeter .Mark (commitL0ReadAmpWriteStalls [i % 2 ] - commitL0ReadAmpWriteStalls [(i - 1 )% 2 ])
629
+ d .commitWALRotationMeter .Mark (commitWALRotations [i % 2 ] - commitWALRotations [(i - 1 )% 2 ])
630
+ d .commitWaitMeter .Mark (commitWaits [i % 2 ] - commitWaits [(i - 1 )% 2 ])
631
+
632
+ d .compDebtGauge .Update (int64 (stats .Compact .EstimatedDebt ))
633
+ d .compInProgressGauge .Update (stats .Compact .NumInProgress )
634
+
527
635
for i , level := range stats .Levels {
528
636
// Append metrics for additional layers
529
637
if i >= len (d .levelsGauge ) {
@@ -578,7 +686,20 @@ func (b *batch) Write() error {
578
686
if b .db .closed {
579
687
return pebble .ErrClosed
580
688
}
581
- return b .b .Commit (b .db .writeOptions )
689
+ err := b .b .Commit (b .db .writeOptions )
690
+ if err != nil {
691
+ return err
692
+ }
693
+ stats := b .b .CommitStats ()
694
+ b .db .commitCount .Add (1 )
695
+ b .db .commitTotalDuration .Add (int64 (stats .TotalDuration ))
696
+ b .db .commitSemaphoreWait .Add (int64 (stats .SemaphoreWaitDuration ))
697
+ b .db .commitMemTableWriteStall .Add (int64 (stats .MemTableWriteStallDuration ))
698
+ b .db .commitL0ReadAmpWriteStall .Add (int64 (stats .L0ReadAmpWriteStallDuration ))
699
+ b .db .commitWALRotation .Add (int64 (stats .WALRotationDuration ))
700
+ b .db .commitWait .Add (int64 (stats .CommitWaitDuration ))
701
+ // TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0)
702
+ return nil
582
703
}
583
704
584
705
// Reset resets the batch for reuse.
0 commit comments