Skip to content

Commit e48bbe1

Browse files
authored
Merge delta loc 1.2 dev (#17328)
enable merging based on delta location. add mo_ctl command to control enable/disable merging based on delta location. bump golangci-lint version to 1.59.1 Approved by: @XuPeng-SH, @zhangxu19830126, @triump2020, @sukki37
1 parent 399364a commit e48bbe1

File tree

14 files changed

+146
-94
lines changed

14 files changed

+146
-94
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ fmt:
235235

236236
.PHONY: install-static-check-tools
237237
install-static-check-tools:
238-
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | bash -s -- -b $(GOPATH)/bin v1.55.2
238+
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | bash -s -- -b $(GOPATH)/bin v1.59.1
239239
@go install github.com/matrixorigin/linter/cmd/molint@latest
240240
@go install github.com/apache/skywalking-eyes/cmd/[email protected]
241241

pkg/vm/engine/disttae/filter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,7 @@ func ExecuteBlockFilter(
11721172
blk.EntryState = obj.EntryState
11731173
blk.CommitTs = obj.CommitTS
11741174
if obj.HasDeltaLoc {
1175-
deltaLoc, commitTs, ok := snapshot.GetBockDeltaLoc(blk.BlockID)
1175+
deltaLoc, commitTs, ok := snapshot.GetBlockDeltaLoc(blk.BlockID)
11761176
if ok {
11771177
blk.DeltaLoc = deltaLoc
11781178
blk.CommitTs = commitTs

pkg/vm/engine/disttae/logtailreplay/blocks_iter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (p *PartitionState) GetChangedObjsBetween(
158158
return
159159
}
160160

161-
func (p *PartitionState) GetBockDeltaLoc(bid types.Blockid) (objectio.ObjectLocation, types.TS, bool) {
161+
func (p *PartitionState) GetBlockDeltaLoc(bid types.Blockid) (objectio.ObjectLocation, types.TS, bool) {
162162
iter := p.blockDeltas.Copy().Iter()
163163
defer iter.Release()
164164

pkg/vm/engine/disttae/logtailreplay/object_filter.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"slices"
2121

2222
"github.com/matrixorigin/matrixone/pkg/container/types"
23-
"github.com/matrixorigin/matrixone/pkg/logutil"
2423
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/compute"
2524
)
2625

@@ -65,7 +64,6 @@ func (o *overlap) Filter(objs []ObjectInfo) []ObjectInfo {
6564
}
6665
o.t = objs[0].SortKeyZoneMap().GetType()
6766
for _, obj := range objs {
68-
obj := obj
6967
o.intervals = append(o.intervals, entryInterval{
7068
min: obj.SortKeyZoneMap().GetMin(),
7169
max: obj.SortKeyZoneMap().GetMax(),
@@ -81,8 +79,6 @@ func (o *overlap) Filter(objs []ObjectInfo) []ObjectInfo {
8179

8280
set := entrySet{entries: make([]ObjectInfo, 0), maxValue: minValue(o.t)}
8381
for _, interval := range o.intervals {
84-
interval := interval
85-
logutil.Infof("Mergeblocks %v %v", interval.min, interval.max)
8682
if len(set.entries) == 0 || compute.CompareGeneric(set.maxValue, interval.min, o.t) > 0 {
8783
set.add(o.t, interval)
8884
} else if len(set.entries) == 1 {

pkg/vm/engine/disttae/merge.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ func newCNMergeTask(
8787
blkCnts := make([]int, len(targets))
8888
blkIters := make([]*StatsBlkIter, len(targets))
8989
for i, objInfo := range targets {
90-
objInfo := objInfo
9190
blkCnts[i] = int(objInfo.BlkCnt())
9291

9392
loc := objInfo.ObjectLocation()
@@ -170,7 +169,7 @@ func (t *cnMergeTask) LoadNextBatch(ctx context.Context, objIdx uint32) (*batch.
170169
blk.EntryState = obj.EntryState
171170
blk.CommitTs = obj.CommitTS
172171
if obj.HasDeltaLoc {
173-
deltaLoc, commitTs, ok := t.state.GetBockDeltaLoc(blk.BlockID)
172+
deltaLoc, commitTs, ok := t.state.GetBlockDeltaLoc(blk.BlockID)
174173
if ok {
175174
blk.DeltaLoc = deltaLoc
176175
blk.CommitTs = commitTs

pkg/vm/engine/disttae/txn_table.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ func (tbl *txnTable) rangesOnePart(
849849
blk.EntryState = obj.EntryState
850850
blk.CommitTs = obj.CommitTS
851851
if obj.HasDeltaLoc {
852-
deltaLoc, commitTs, ok := state.GetBockDeltaLoc(blk.BlockID)
852+
deltaLoc, commitTs, ok := state.GetBlockDeltaLoc(blk.BlockID)
853853
if ok {
854854
blk.DeltaLoc = deltaLoc
855855
blk.CommitTs = commitTs
@@ -2154,7 +2154,7 @@ func (tbl *txnTable) PKPersistedBetween(
21542154
blk.EntryState = obj.EntryState
21552155
blk.CommitTs = obj.CommitTS
21562156
if obj.HasDeltaLoc {
2157-
deltaLoc, commitTs, ok := p.GetBockDeltaLoc(blk.BlockID)
2157+
deltaLoc, commitTs, ok := p.GetBlockDeltaLoc(blk.BlockID)
21582158
if ok {
21592159
blk.DeltaLoc = deltaLoc
21602160
blk.CommitTs = commitTs
@@ -2326,7 +2326,7 @@ func (tbl *txnTable) transferDeletes(
23262326
SegmentID: *obj.ObjectShortName().Segmentid(),
23272327
}
23282328
if obj.HasDeltaLoc {
2329-
deltaLoc, commitTs, ok := state.GetBockDeltaLoc(blkInfo.BlockID)
2329+
deltaLoc, commitTs, ok := state.GetBlockDeltaLoc(blkInfo.BlockID)
23302330
if ok {
23312331
blkInfo.DeltaLoc = deltaLoc
23322332
blkInfo.CommitTs = commitTs

pkg/vm/engine/tae/db/checkpoint/runner.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ type tableAndSize struct {
8989
// A: A checkpoint runner organizes and manages all checkpoint-related behaviors. It roughly
9090
// does the following things:
9191
// - Manage the life cycle of all checkpoints and provide some query interfaces.
92-
// - A cron job periodically collects and analyzes dirty blocks, and flushes eligibl dirty
92+
// - A cron job periodically collects and analyzes dirty blocks, and flushes eligible dirty
9393
// blocks to the remote storage
94-
// - The cron job peridically test whether a new checkpoint can be created. If it is not
94+
// - The cron job periodically test whether a new checkpoint can be created. If it is not
9595
// satisfied, it will wait for next trigger. Otherwise, it will start the process of
9696
// creating a checkpoint.
9797

@@ -155,11 +155,11 @@ type tableAndSize struct {
155155
// 8. Schedule to remove stale checkpoint meta objects
156156

157157
// Q: How to boot from the checkpoints?
158-
// A: When a meta version is created, it contains all information of the previouse version. So we always
158+
// A: When a meta version is created, it contains all information of the previous version. So we always
159159
//
160160
// delete the stale versions when a new version is created. Over time, the number of objects under
161161
// `ckp/` is small.
162-
// 1. List all meta objects under `ckp/`. Get the latest meta object and read all checkpoint informations
162+
// 1. List all meta objects under `ckp/`. Get the latest meta object and read all checkpoint information
163163
// from the meta object.
164164
// 2. Apply the latest global checkpoint
165165
// 3. Apply the incremental checkpoint start from the version right after the global checkpoint to the
@@ -200,7 +200,7 @@ type runner struct {
200200

201201
ctx context.Context
202202

203-
// logtail sourcer
203+
// logtail source
204204
source logtail.Collector
205205
catalog *catalog.Catalog
206206
rt *dbutils.Runtime
@@ -404,7 +404,7 @@ func (r *runner) gcCheckpointEntries(ts types.TS) {
404404
func (r *runner) onIncrementalCheckpointEntries(items ...any) {
405405
now := time.Now()
406406
entry := r.MaxCheckpoint()
407-
// In some unit tests, ckp is managed manually, and ckp deletiton (CleanPenddingCheckpoint)
407+
// In some unit tests, ckp is managed manually, and ckp deletion (CleanPendingCheckpoint)
408408
// can be called when the queue still has unexecuted task.
409409
// Add `entry == nil` here as protective codes
410410
if entry == nil || entry.GetState() != ST_Running {

pkg/vm/engine/tae/db/merge/executor.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,12 @@ func (e *MergeExecutor) OnExecDone(v any) {
154154
atomic.AddInt64(&e.activeEstimateBytes, -int64(stat.estBytes))
155155
}
156156

157-
func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, policy Policy) {
157+
func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) {
158158
if e.roundMergeRows*36 /*28 * 1.3 */ > e.transPageLimit/8 {
159159
return
160160
}
161161
e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema().Name)
162162

163-
mobjs, kind := policy.Revise(e.CPUPercent(), int64(e.MemAvailBytes()))
164-
if len(mobjs) < 2 {
165-
return
166-
}
167-
168163
if ActiveCNObj.CheckOverlapOnCNActive(mobjs) {
169164
return
170165
}

pkg/vm/engine/tae/db/merge/mod.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
)
3636

3737
var StopMerge atomic.Bool
38+
var DisableDeltaLocMerge atomic.Bool
3839

3940
type CNMergeScheduler interface {
4041
SendMergeTask(ctx context.Context, task *api.MergeTaskEntry) error
@@ -199,8 +200,8 @@ const (
199200
)
200201

201202
type Policy interface {
202-
OnObject(obj *catalog.ObjectEntry)
203-
Revise(cpu, mem int64) ([]*catalog.ObjectEntry, TaskHostKind)
203+
OnObject(obj *catalog.ObjectEntry, force bool)
204+
Revise(cpu, mem int64, littleFirst bool) ([]*catalog.ObjectEntry, TaskHostKind)
204205
ResetForTable(*catalog.TableEntry)
205206
SetConfig(*catalog.TableEntry, func() txnif.AsyncTxn, any)
206207
GetConfig(*catalog.TableEntry) any

pkg/vm/engine/tae/db/merge/policyBasic.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ package merge
1616

1717
import (
1818
"bytes"
19+
"cmp"
1920
"context"
2021
"fmt"
21-
"sort"
22+
"slices"
2223
"sync"
2324

2425
"github.com/matrixorigin/matrixone/pkg/logutil"
@@ -131,9 +132,7 @@ func (o *customConfigProvider) String() string {
131132
for k := range o.configs {
132133
keys = append(keys, k)
133134
}
134-
sort.Slice(keys, func(i, j int) bool {
135-
return keys[i] < keys[j]
136-
})
135+
slices.SortFunc(keys, func(a, b uint64) int { return cmp.Compare(a, b) })
137136
buf := bytes.Buffer{}
138137
buf.WriteString("customConfigProvider: ")
139138
for _, k := range keys {
@@ -172,12 +171,12 @@ func NewBasicPolicy() Policy {
172171
}
173172

174173
// impl Policy for Basic
175-
func (o *basic) OnObject(obj *catalog.ObjectEntry) {
174+
func (o *basic) OnObject(obj *catalog.ObjectEntry, force bool) {
176175
rowsLeftOnObj := obj.GetRemainingRows()
177176
osize := obj.GetOriginSize()
178177

179-
iscandidate := func() bool {
180-
// objext with a lot of holes
178+
isCandidate := func() bool {
179+
// object with a lot of holes
181180
if rowsLeftOnObj < obj.GetRows()/2 {
182181
return true
183182
}
@@ -192,7 +191,7 @@ func (o *basic) OnObject(obj *catalog.ObjectEntry) {
192191
return false
193192
}
194193

195-
if iscandidate() {
194+
if force || isCandidate() {
196195
o.objHeap.pushWithCap(&mItem[*catalog.ObjectEntry]{
197196
row: rowsLeftOnObj,
198197
entry: obj,
@@ -237,11 +236,17 @@ func (o *basic) GetConfig(tbl *catalog.TableEntry) any {
237236
return r
238237
}
239238

240-
func (o *basic) Revise(cpu, mem int64) ([]*catalog.ObjectEntry, TaskHostKind) {
239+
func (o *basic) Revise(cpu, mem int64, littleFirst bool) ([]*catalog.ObjectEntry, TaskHostKind) {
241240
objs := o.objHeap.finish()
242-
sort.Slice(objs, func(i, j int) bool {
243-
return objs[i].GetRemainingRows() < objs[j].GetRemainingRows()
244-
})
241+
if littleFirst {
242+
slices.SortFunc(objs, func(a, b *catalog.ObjectEntry) int {
243+
return cmp.Compare(a.GetRemainingRows(), b.GetRemainingRows())
244+
})
245+
} else {
246+
slices.SortFunc(objs, func(a, b *catalog.ObjectEntry) int {
247+
return -cmp.Compare(a.GetRemainingRows(), b.GetRemainingRows())
248+
})
249+
}
245250

246251
isStandalone := common.IsStandaloneBoost.Load()
247252
mergeOnDNIfStandalone := !common.ShouldStandaloneCNTakeOver.Load()
@@ -327,16 +332,12 @@ func (o *basic) controlMem(objs []*catalog.ObjectEntry, mem int64) []*catalog.Ob
327332
}
328333

329334
needPopout := func(ss []*catalog.ObjectEntry) bool {
330-
osize, esize, _ := estimateMergeConsume(ss)
331-
if esize > int(2*mem/3) {
332-
return true
333-
}
334-
335335
if len(ss) <= 2 {
336336
return false
337337
}
338-
// make object averaged size
339-
return osize > int(o.config.MaxOsizeMergedObj)
338+
339+
_, esize, _ := estimateMergeConsume(ss)
340+
return esize > int(2*mem/3)
340341
}
341342
for needPopout(objs) {
342343
objs = objs[:len(objs)-1]

0 commit comments

Comments
 (0)