Skip to content

Commit 37d77b9

Browse files
authored
Fix data race in processlist map (#1884)
1 parent 3072091 commit 37d77b9

File tree

3 files changed

+80
-3
lines changed

3 files changed

+80
-3
lines changed

enginetest/engine_only_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ func TestTrackProcess(t *testing.T) {
333333
require.Error(ctx.Err())
334334
}
335335

336+
func TestConcurrentProcessList(t *testing.T) {
337+
enginetest.TestConcurrentProcessList(t, enginetest.NewDefaultMemoryHarness())
338+
}
339+
336340
func getRuleFrom(rules []analyzer.Rule, id analyzer.RuleId) *analyzer.Rule {
337341
for _, rule := range rules {
338342
if rule.Id == id {

enginetest/enginetests.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"io"
2121
"net"
2222
"strings"
23+
"sync"
2324
"testing"
2425
"time"
2526

@@ -4945,6 +4946,69 @@ func TestTransactionScripts(t *testing.T, harness Harness) {
49454946
}
49464947
}
49474948

4949+
func TestConcurrentProcessList(t *testing.T, harness Harness) {
4950+
require := require.New(t)
4951+
pl := sqle.NewProcessList()
4952+
numSessions := 2
4953+
4954+
for i := 0; i < numSessions; i++ {
4955+
pl.AddConnection(uint32(i), "foo")
4956+
sess := sql.NewBaseSessionWithClientServer("0.0.0.0:3306", sql.Client{Address: "", User: ""}, uint32(i))
4957+
pl.ConnectionReady(sess)
4958+
4959+
var err error
4960+
ctx := sql.NewContext(context.Background(), sql.WithPid(uint64(i)), sql.WithSession(sess), sql.WithProcessList(pl))
4961+
_, err = pl.BeginQuery(ctx, "foo")
4962+
require.NoError(err)
4963+
}
4964+
4965+
var wg sync.WaitGroup
4966+
4967+
// Read concurrently
4968+
for i := 0; i < numSessions; i++ {
4969+
wg.Add(1)
4970+
go func(x int) {
4971+
defer wg.Done()
4972+
procs := pl.Processes()
4973+
for _, proc := range procs {
4974+
for prog, part := range proc.Progress {
4975+
if prog == "" {
4976+
}
4977+
for p, pp := range part.PartitionsProgress {
4978+
if p == "" {
4979+
}
4980+
if pp.Name == "" {
4981+
}
4982+
}
4983+
}
4984+
}
4985+
}(i)
4986+
}
4987+
4988+
// Writes concurrently
4989+
for i := 0; i < numSessions; i++ {
4990+
wg.Add(4)
4991+
go func(x int) {
4992+
defer wg.Done()
4993+
pl.AddTableProgress(uint64(x), "foo", 100)
4994+
}(i)
4995+
go func(x int) {
4996+
defer wg.Done()
4997+
pl.AddPartitionProgress(uint64(x), "foo", "bar", 100)
4998+
}(i)
4999+
go func(x int) {
5000+
defer wg.Done()
5001+
pl.UpdateTableProgress(uint64(x), "foo", 100)
5002+
}(i)
5003+
go func(x int) {
5004+
defer wg.Done()
5005+
pl.UpdatePartitionProgress(uint64(x), "foo", "bar", 100)
5006+
}(i)
5007+
}
5008+
5009+
wg.Wait()
5010+
}
5011+
49485012
func TestNoDatabaseSelected(t *testing.T, harness Harness) {
49495013
harness.Setup(setup.MydbData)
49505014
e := mustNewEngine(t, harness)

processlist.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,21 @@ func (pl *ProcessList) Processes() []sql.Process {
4747
defer pl.mu.RUnlock()
4848
var result = make([]sql.Process, 0, len(pl.procs))
4949

50+
// Make a deep copy of all maps to avoid race
5051
for _, proc := range pl.procs {
5152
p := *proc
52-
var progress = make(map[string]sql.TableProgress, len(p.Progress))
53-
for n, p := range p.Progress {
54-
progress[n] = p
53+
var progMap = make(map[string]sql.TableProgress, len(p.Progress))
54+
for progName, prog := range p.Progress {
55+
newProg := sql.TableProgress{
56+
Progress: prog.Progress,
57+
PartitionsProgress: make(map[string]sql.PartitionProgress, len(prog.PartitionsProgress)),
58+
}
59+
for partName, partProg := range prog.PartitionsProgress {
60+
newProg.PartitionsProgress[partName] = partProg
61+
}
62+
progMap[progName] = newProg
5563
}
64+
p.Progress = progMap
5665
result = append(result, p)
5766
}
5867

0 commit comments

Comments
 (0)