Skip to content

Commit 54f89ee

Browse files
authored
[bug] logtail/stats: do not unsubscribe table before it finished waited (#19682)
do not unsubscribe table before it finished waited, otherwise, it will never stop waiting for the table's first logtail as it is subscribed. Approved by: @triump2020, @sukki37
1 parent a42cff1 commit 54f89ee

File tree

5 files changed

+90
-0
lines changed

5 files changed

+90
-0
lines changed

pkg/vm/engine/disttae/engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,10 @@ func (e *Engine) cleanMemoryTableWithTable(dbId, tblId uint64) {
738738
logutil.Debugf("clean memory table of tbl[dbId: %d, tblId: %d]", dbId, tblId)
739739
}
740740

741+
func (e *Engine) safeToUnsubscribe(tid uint64) bool {
742+
return e.globalStats.safeToUnsubscribe(tid)
743+
}
744+
741745
func (e *Engine) PushClient() *PushClient {
742746
return &e.pClient
743747
}

pkg/vm/engine/disttae/logtail_consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,10 @@ func (c *PushClient) unusedTableGCTicker(ctx context.Context) {
947947
// never unsubscribe the mo_databases, mo_tables, mo_columns.
948948
continue
949949
}
950+
if !c.eng.safeToUnsubscribe(k) {
951+
logutil.Infof("%s table [%d-%d] is not safe to unsubscribe", logTag, v.DBID, k)
952+
continue
953+
}
950954
if !v.LatestTime.After(shouldClean) {
951955
if v.SubState != Subscribed {
952956
continue

pkg/vm/engine/disttae/logtail_consumer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func TestSubscribedTable(t *testing.T) {
3232
partitions: make(map[[2]uint64]*logtailreplay.Partition),
3333
globalStats: &GlobalStats{
3434
logtailUpdate: newLogtailUpdate(),
35+
waitKeeper: newWaitKeeper(),
3536
},
3637
}
3738
require.Equal(t, 0, len(subscribeRecord.m))

pkg/vm/engine/disttae/stats.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,37 @@ var (
4444
MinUpdateInterval = time.Second * 15
4545
)
4646

47+
// waitKeeper is used to mark the table has finished waited,
48+
// only after which, the table can be unsubscribed.
49+
type waitKeeper struct {
50+
sync.Mutex
51+
records map[uint64]struct{}
52+
}
53+
54+
func newWaitKeeper() *waitKeeper {
55+
return &waitKeeper{
56+
records: make(map[uint64]struct{}),
57+
}
58+
}
59+
60+
func (w *waitKeeper) reset() {
61+
w.Lock()
62+
defer w.Unlock()
63+
w.records = make(map[uint64]struct{})
64+
}
65+
66+
func (w *waitKeeper) add(tid uint64) {
67+
w.Lock()
68+
defer w.Unlock()
69+
w.records[tid] = struct{}{}
70+
}
71+
72+
func (w *waitKeeper) del(tid uint64) {
73+
w.Lock()
74+
defer w.Unlock()
75+
delete(w.records, tid)
76+
}
77+
4778
type updateStatsRequest struct {
4879
// statsInfo is the field which is to update.
4980
statsInfo *pb.StatsInfo
@@ -150,6 +181,10 @@ type GlobalStats struct {
150181
statsInfoMap map[pb.StatsInfoKey]*pb.StatsInfo
151182
}
152183

184+
// waitKeeper is used to make sure the table is safe to unsubscribe.
185+
// Only when the table is finished waited, it can be unsubscribed safely.
186+
waitKeeper *waitKeeper
187+
153188
// updateWorkerFactor is the times of CPU number of this node
154189
// to start update worker. Default is 8.
155190
updateWorkerFactor int
@@ -171,6 +206,7 @@ func NewGlobalStats(
171206
logtailUpdate: newLogtailUpdate(),
172207
tableLogtailCounter: make(map[pb.StatsInfoKey]int64),
173208
KeyRouter: keyRouter,
209+
waitKeeper: newWaitKeeper(),
174210
}
175211
s.updatingMu.updating = make(map[pb.StatsInfoKey]*updateRecord)
176212
s.mu.statsInfoMap = make(map[pb.StatsInfoKey]*pb.StatsInfo)
@@ -265,20 +301,34 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool)
265301
}
266302

267303
func (gs *GlobalStats) RemoveTid(tid uint64) {
304+
gs.waitKeeper.del(tid)
305+
268306
gs.logtailUpdate.mu.Lock()
269307
defer gs.logtailUpdate.mu.Unlock()
270308
delete(gs.logtailUpdate.mu.updated, tid)
271309
}
272310

273311
// clearTables clears the tables in the map if there are any tables in it.
274312
func (gs *GlobalStats) clearTables() {
313+
// clear all the waiters in the keeper.
314+
gs.waitKeeper.reset()
315+
275316
gs.logtailUpdate.mu.Lock()
276317
defer gs.logtailUpdate.mu.Unlock()
277318
if len(gs.logtailUpdate.mu.updated) > 0 {
278319
gs.logtailUpdate.mu.updated = make(map[uint64]struct{})
279320
}
280321
}
281322

323+
func (gs *GlobalStats) safeToUnsubscribe(tid uint64) bool {
324+
gs.waitKeeper.Lock()
325+
defer gs.waitKeeper.Unlock()
326+
if _, ok := gs.waitKeeper.records[tid]; ok {
327+
return true
328+
}
329+
return false
330+
}
331+
282332
func (gs *GlobalStats) enqueue(tail *logtail.TableLogtail) {
283333
select {
284334
case gs.tailC <- tail:
@@ -380,6 +430,8 @@ func (gs *GlobalStats) notifyLogtailUpdate(tid uint64) {
380430
}
381431

382432
func (gs *GlobalStats) waitLogtailUpdated(tid uint64) {
433+
defer gs.waitKeeper.add(tid)
434+
383435
// If the tid is less than reserved, return immediately.
384436
if tid < catalog.MO_RESERVED_MAX {
385437
return

pkg/vm/engine/disttae/stats_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,32 @@ func TestGlobalStats_ClearTables(t *testing.T) {
8787
gs.clearTables()
8888
assert.Equal(t, 0, len(gs.logtailUpdate.mu.updated))
8989
}
90+
91+
func TestWaitKeeper(t *testing.T) {
92+
defer leaktest.AfterTest(t)()
93+
94+
var tid uint64 = 1
95+
ctx, cancel := context.WithCancel(context.Background())
96+
defer cancel()
97+
gs := NewGlobalStats(ctx, nil, nil)
98+
assert.False(t, gs.safeToUnsubscribe(tid))
99+
100+
w := gs.waitKeeper
101+
w.add(tid)
102+
_, ok := w.records[tid]
103+
assert.True(t, ok)
104+
assert.True(t, gs.safeToUnsubscribe(tid))
105+
106+
gs.RemoveTid(tid)
107+
_, ok = w.records[tid]
108+
assert.False(t, ok)
109+
assert.False(t, gs.safeToUnsubscribe(tid))
110+
111+
w.add(tid)
112+
_, ok = w.records[tid]
113+
assert.True(t, ok)
114+
gs.clearTables()
115+
_, ok = w.records[tid]
116+
assert.False(t, ok)
117+
assert.False(t, gs.safeToUnsubscribe(tid))
118+
}

0 commit comments

Comments
 (0)