Skip to content

Commit e4a3281

Browse files
authored
[#19649, #19664, #19408]Fix bugs (#19675)
1. Cherry-pick 2.0-dev Approved by: @XuPeng-SH, @sukki37
1 parent 4d6b21a commit e4a3281

File tree

2 files changed

+20
-28
lines changed

2 files changed

+20
-28
lines changed

pkg/vm/engine/disttae/logtail_consumer.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,8 +1891,6 @@ func updatePartitionOfPush(
18911891
if len(tl.CkpLocation) > 0 {
18921892
t0 = time.Now()
18931893
ckpStart, ckpEnd = parseCkpDuration(tl)
1894-
state.CacheCkpDuration(ckpStart, partition)
1895-
state.AppendCheckpoint(tl.CkpLocation, partition)
18961894
v2.LogtailUpdatePartitonHandleCheckpointDurationHistogram.Observe(time.Since(t0).Seconds())
18971895
}
18981896

@@ -1925,20 +1923,26 @@ func updatePartitionOfPush(
19251923
}
19261924

19271925
//After consume checkpoints finished ,then update the start and end of
1928-
//the mo system table's partition and catalog.
1929-
if !lazyLoad {
1926+
//the table's partition and catalog cache.
1927+
if isSub {
19301928
if len(tl.CkpLocation) != 0 {
19311929
if !ckpStart.IsEmpty() || !ckpEnd.IsEmpty() {
19321930
t0 = time.Now()
19331931
state.UpdateDuration(ckpStart, types.MaxTs())
1934-
//Notice that the checkpoint duration is same among all mo system tables,
1935-
//such as mo_databases, mo_tables, mo_columns.
1936-
e.GetLatestCatalogCache().UpdateDuration(ckpStart, types.MaxTs())
1932+
if lazyLoad {
1933+
state.AppendCheckpoint(tl.CkpLocation, partition)
1934+
} else {
1935+
//Notice that the checkpoint duration is same among all mo system tables,
1936+
//such as mo_databases, mo_tables, mo_columns.
1937+
e.GetLatestCatalogCache().UpdateDuration(ckpStart, types.MaxTs())
1938+
}
19371939
v2.LogtailUpdatePartitonUpdateTimestampsDurationHistogram.Observe(time.Since(t0).Seconds())
19381940
}
19391941
} else {
19401942
state.UpdateDuration(types.TS{}, types.MaxTs())
1941-
e.GetLatestCatalogCache().UpdateDuration(types.TS{}, types.MaxTs())
1943+
if !lazyLoad {
1944+
e.GetLatestCatalogCache().UpdateDuration(types.TS{}, types.MaxTs())
1945+
}
19421946
}
19431947
}
19441948

pkg/vm/engine/disttae/logtailreplay/partition.go

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -158,41 +158,29 @@ func (p *Partition) ConsumeCheckpoints(
158158
return nil
159159
}
160160

161-
//curState := p.state.Load()
162-
//if len(curState.checkpoints) == 0 {
163-
// p.UpdateDuration(types.TS{}, types.MaxTs())
164-
// return nil
165-
//}
161+
curState := p.state.Load()
162+
if len(curState.checkpoints) == 0 {
163+
return nil
164+
}
166165

167166
lockErr := p.Lock(ctx)
168167
if lockErr != nil {
169168
return lockErr
170169
}
171170
defer p.Unlock()
172171

173-
curState := p.state.Load()
174-
//if len(curState.checkpoints) == 0 {
175-
// p.UpdateDuration(types.TS{}, types.MaxTs())
176-
// return nil
177-
//}
178-
179-
state := curState.Copy()
180-
181-
if len(state.checkpoints) == 0 {
182-
state.UpdateDuration(types.TS{}, types.MaxTs())
183-
if !p.state.CompareAndSwap(curState, state) {
184-
panic("concurrent mutation")
185-
}
172+
curState = p.state.Load()
173+
if len(curState.checkpoints) == 0 {
186174
return nil
187175
}
188176

177+
state := curState.Copy()
178+
189179
//consume checkpoints.
190180
if err := state.consumeCheckpoints(fn); err != nil {
191181
return err
192182
}
193183

194-
state.UpdateDuration(state.start, types.MaxTs())
195-
196184
if !p.state.CompareAndSwap(curState, state) {
197185
panic("concurrent mutation")
198186
}

0 commit comments

Comments
 (0)