Skip to content

Commit 0e55402

Browse files
author
ffffwh
committed
fix possible premature return of WaitForAllCommitted #1056-2
See TestMtsManager2 for the case.
1 parent aaf9459 commit 0e55402

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

driver/mysql/applier_incr.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,9 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
404404

405405
if isBig {
406406
if binlogEntry.Index == 0 {
407-
a.mtsManager.lastEnqueue = binlogEntry.Coordinates.GetSequenceNumber()
407+
if !a.mtsManager.WaitForExecution(binlogEntry) {
408+
return nil // shutdown
409+
}
408410
}
409411
a.logger.Info("bigtx ApplyBinlogEvent", "gno", txGno, "index", binlogEntry.Index)
410412
err = a.ApplyBinlogEvent(0, entryCtx)

driver/mysql/applier_incr_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,31 @@ func TestMtsManager(t *testing.T) {
4242
t.Fatal("might be stuck")
4343
}
4444
}
45+
46+
func TestMtsManager2(t *testing.T) {
47+
logger := hclog.Default()
48+
shutdownCh := make(chan struct{})
49+
defer close(shutdownCh)
50+
51+
mm := NewMtsManager(shutdownCh, logger)
52+
go mm.LcUpdater()
53+
54+
mm.WaitForExecution0(1, 0)
55+
logger.Info("wait 1 0")
56+
mm.Executed0(1)
57+
mm.WaitForExecution0(2, 1)
58+
logger.Info("wait 2 1")
59+
go func() {
60+
// execute later
61+
time.Sleep(100 * time.Millisecond)
62+
mm.Executed0(2)
63+
}()
64+
65+
// entry resent
66+
mm.WaitForExecution0(1, 0)
67+
mm.WaitForAllCommitted(logger)
68+
if mm.lastCommitted < 2 {
69+
t.Fatal("WaitForAllCommitted should not return before 2-executed")
70+
}
71+
}
72+

driver/mysql/applier_mts.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ func (mm *MtsManager) WaitForExecution(binlogEntry *common.DataEntry) bool {
8686
}
8787

8888
func (mm *MtsManager) WaitForExecution0(seq int64, lc int64) bool {
89-
mm.lastEnqueue = seq
89+
if mm.lastEnqueue < seq {
90+
mm.lastEnqueue = seq
91+
}
9092

9193
if mm.forceMts {
9294
return true

0 commit comments

Comments
 (0)