From 4f9768a1e6c7a486e3280a102b668ffae7d1eae7 Mon Sep 17 00:00:00 2001 From: Mathieu Champlon Date: Thu, 12 Dec 2024 17:43:40 +0100 Subject: [PATCH 1/6] Fix deadlock when closing pipe Signed-off-by: Mathieu Champlon --- pipe.go | 6 ++++-- pipe_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/pipe.go b/pipe.go index a2da6639..116509ae 100644 --- a/pipe.go +++ b/pipe.go @@ -574,9 +574,11 @@ func (l *win32PipeListener) Accept() (net.Conn, error) { func (l *win32PipeListener) Close() error { select { - case l.closeCh <- 1: - <-l.doneCh case <-l.doneCh: + case <-l.closeCh: + default: + close(l.closeCh) + <-l.doneCh } return nil } diff --git a/pipe_test.go b/pipe_test.go index 8b1d5b94..5a9dcd3e 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -646,3 +646,43 @@ func TestListenConnectRace(t *testing.T) { wg.Wait() } } + +func TestCloseRace(t *testing.T) { + for i := 0; i < 200 && !t.Failed(); i++ { + l, err := ListenPipe(testPipeName, &PipeConfig{MessageMode: true}) + if err != nil { + t.Fatal(err) + } + go func() { + for { + c, err := l.Accept() + if err != nil { + return + } + b, err := io.ReadAll(c) + if err != nil { + t.Error(err) + return + } + _, _ = c.Write(b) + _ = c.Close() + } + }() + + c, err := DialPipe(testPipeName, nil) + if err != nil { + t.Fatal(err) + } + if _, err = c.Write([]byte("hello")); err != nil { + t.Fatal(err) + } + if err := c.(CloseWriter).CloseWrite(); err != nil { + t.Fatal(err) + } + if _, err := io.ReadAll(c); err != nil { + t.Fatal(err) + } + _ = c.Close() + _ = l.Close() + } +} From 8404c745321ebd3741e20d90529190734a410b67 Mon Sep 17 00:00:00 2001 From: Mathieu Champlon Date: Mon, 13 Jan 2025 09:48:36 +0100 Subject: [PATCH 2/6] Use chan struct{} instead on chan int Signed-off-by: Mathieu Champlon --- pipe.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipe.go b/pipe.go index 116509ae..3748263a 100644 --- a/pipe.go +++ b/pipe.go @@ -316,8 +316,8 @@ type win32PipeListener struct { path string config PipeConfig acceptCh chan (chan acceptResponse) - closeCh chan int - doneCh chan int + closeCh chan struct{} + doneCh chan struct{} } func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (windows.Handle, error) { @@ -530,8 +530,8 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) { path: path, config: *c, acceptCh: make(chan (chan acceptResponse)), - closeCh: make(chan int), - doneCh: make(chan int), + closeCh: make(chan struct{}), + doneCh: make(chan struct{}), } go l.listenerRoutine() return l, nil From 74efb1d4ac4f319700760b6c4b053fc4c605f31c Mon Sep 17 00:00:00 2001 From: Mathieu Champlon Date: Mon, 13 Jan 2025 09:50:35 +0100 Subject: [PATCH 3/6] Handle multiple concurrent calls to Close Signed-off-by: Mathieu Champlon --- pipe.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pipe.go b/pipe.go index 3748263a..8938735a 100644 --- a/pipe.go +++ b/pipe.go @@ -576,6 +576,7 @@ func (l *win32PipeListener) Close() error { select { case <-l.doneCh: case <-l.closeCh: + <-l.doneCh default: close(l.closeCh) <-l.doneCh From 4b491a9339c37eacad1342f034494bb8bd409a9a Mon Sep 17 00:00:00 2001 From: Mathieu Champlon Date: Mon, 13 Jan 2025 09:52:19 +0100 Subject: [PATCH 4/6] Increase number of test iterations 200 was a little low to consistently trigger the issue. Signed-off-by: Mathieu Champlon --- pipe_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe_test.go b/pipe_test.go index 5a9dcd3e..2dfc373b 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -648,7 +648,7 @@ func TestListenConnectRace(t *testing.T) { } func TestCloseRace(t *testing.T) { - for i := 0; i < 200 && !t.Failed(); i++ { + for i := 0; i < 1000 && !t.Failed(); i++ { l, err := ListenPipe(testPipeName, &PipeConfig{MessageMode: true}) if err != nil { t.Fatal(err) From 9d32dd6750b4aab6180bdc44524a3261286b322c Mon Sep 17 00:00:00 2001 From: Mathieu Champlon Date: Mon, 13 Jan 2025 09:58:18 +0100 Subject: [PATCH 5/6] Log current test iteration This should help tell when we are deadlocking from test logs if we hit a problem in the future. Signed-off-by: Mathieu Champlon --- pipe_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pipe_test.go b/pipe_test.go index 2dfc373b..27223850 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -644,6 +644,8 @@ func TestListenConnectRace(t *testing.T) { s.Close() } wg.Wait() + + t.Logf("iteration %d", i) } } @@ -684,5 +686,7 @@ func TestCloseRace(t *testing.T) { } _ = c.Close() _ = l.Close() + + t.Logf("iteration %d", i) } } From 71048cda9d93677367465faeac7b98a0ddd1dd8c Mon Sep 17 00:00:00 2001 From: Mathieu Champlon Date: Mon, 13 Jan 2025 10:26:28 +0100 Subject: [PATCH 6/6] Use sync.Once to properly support concurrent calls to Close Signed-off-by: Mathieu Champlon --- pipe.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pipe.go b/pipe.go index 8938735a..c5999a5d 100644 --- a/pipe.go +++ b/pipe.go @@ -11,6 +11,7 @@ import ( "net" "os" "runtime" + "sync" "time" "unsafe" @@ -316,6 +317,7 @@ type win32PipeListener struct { path string config PipeConfig acceptCh chan (chan acceptResponse) + closeOnce sync.Once closeCh chan struct{} doneCh chan struct{} } @@ -573,14 +575,10 @@ func (l *win32PipeListener) Accept() (net.Conn, error) { } func (l *win32PipeListener) Close() error { - select { - case <-l.doneCh: - case <-l.closeCh: - <-l.doneCh - default: + l.closeOnce.Do(func() { close(l.closeCh) - <-l.doneCh - } + }) + <-l.doneCh return nil }