Skip to content

Commit 8cceac3

Browse files
authored
RSDK-11434 Managed Process Status on Windows (#461)
1 parent ee6aee2 commit 8cceac3

File tree

6 files changed

+203
-51
lines changed

6 files changed

+203
-51
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ require (
5151
goji.io v2.0.2+incompatible
5252
golang.org/x/net v0.42.0
5353
golang.org/x/oauth2 v0.28.0
54+
golang.org/x/sys v0.34.0
5455
golang.org/x/tools v0.34.0
5556
google.golang.org/api v0.196.0
5657
google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed
@@ -340,7 +341,6 @@ require (
340341
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
341342
golang.org/x/mod v0.25.0 // indirect
342343
golang.org/x/sync v0.16.0 // indirect
343-
golang.org/x/sys v0.34.0 // indirect
344344
golang.org/x/term v0.33.0 // indirect
345345
golang.org/x/text v0.27.0 // indirect
346346
golang.org/x/time v0.6.0 // indirect

pexec/managed_process.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,21 @@ import (
1818

1919
var errAlreadyStopped = errors.New("already stopped")
2020

21+
// ProcessNotExistsError is a custom error that is returned from managedProcess.kill() to
22+
// specify that the desired process no longer exists. It is checked by the modManager in
23+
// rdk.
24+
type ProcessNotExistsError struct {
25+
err error
26+
}
27+
28+
func (e *ProcessNotExistsError) Error() string {
29+
return fmt.Sprintf("process does not exist: %v", e.err)
30+
}
31+
32+
func (e *ProcessNotExistsError) Unwrap() error {
33+
return e.err
34+
}
35+
2136
// UnexpectedExitHandler is the signature for functions that can optionally be
2237
// provided to run when a managed process unexpectedly exits. The return value
2338
// indicates whether pexec should continue with its own attempt to restart the
@@ -133,9 +148,7 @@ func (p *managedProcess) UnixPid() (int, error) {
133148
}
134149

135150
func (p *managedProcess) Status() error {
136-
p.mu.Lock()
137-
defer p.mu.Unlock()
138-
return p.cmd.Process.Signal(syscall.Signal(0))
151+
return p.status()
139152
}
140153

141154
func (p *managedProcess) validateCWD() error {

pexec/managed_process_test.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ func TestManagedProcessStart(t *testing.T) {
151151
test.That(t, err.Error(), test.ShouldContainSubstring, `error setting process working directory to "idontexist"`)
152152
})
153153
t.Run("resolves relative path with different cwd", func(t *testing.T) {
154+
// on windows, this test will not run, because we use a unix executable.
155+
if runtime.GOOS == "windows" {
156+
t.Skip()
157+
}
154158
logger := golog.NewTestLogger(t)
155159
newWD := t.TempDir()
156160
// create an executable file in there that we will refence as a just "./exec.sh"
@@ -192,7 +196,13 @@ func TestManagedProcessStart(t *testing.T) {
192196
cancel()
193197

194198
test.That(t, proc.Start(ctx), test.ShouldBeNil)
195-
test.That(t, proc.Stop(), test.ShouldBeNil)
199+
if runtime.GOOS == "windows" {
200+
// on windows, we return a process not found error here
201+
var processNotExistsErr *ProcessNotExistsError
202+
test.That(t, errors.As(proc.Stop(), &processNotExistsErr), test.ShouldBeTrue)
203+
} else {
204+
test.That(t, proc.Stop(), test.ShouldBeNil)
205+
}
196206
})
197207
t.Run("starting with a normal context should run until stop", func(t *testing.T) {
198208
logger := golog.NewTestLogger(t)
@@ -680,7 +690,13 @@ func TestManagedProcessEnvironmentVariables(t *testing.T) {
680690
output, err := bufferedLogReader.ReadString('\n')
681691
test.That(t, err, test.ShouldBeNil)
682692
test.That(t, output, test.ShouldEqual, "/opt/viam\n")
683-
test.That(t, proc.Stop(), test.ShouldBeNil)
693+
if runtime.GOOS == "windows" {
694+
// on windows, we return a process not found error here
695+
var processNotExistsErr *ProcessNotExistsError
696+
test.That(t, errors.As(proc.Stop(), &processNotExistsErr), test.ShouldBeTrue)
697+
} else {
698+
test.That(t, proc.Stop(), test.ShouldBeNil)
699+
}
684700
})
685701

686702
t.Run("overwrite an environment variable", func(t *testing.T) {
@@ -739,7 +755,13 @@ func TestManagedProcessLogWriter(t *testing.T) {
739755
test.That(t, err, test.ShouldBeNil)
740756
test.That(t, line, test.ShouldEqual, "hello\n")
741757
}
742-
test.That(t, proc.Stop(), test.ShouldBeNil)
758+
if runtime.GOOS == "windows" {
759+
// on windows, we return a process not found error here
760+
var processNotExistsErr *ProcessNotExistsError
761+
test.That(t, errors.As(proc.Stop(), &processNotExistsErr), test.ShouldBeTrue)
762+
} else {
763+
test.That(t, proc.Stop(), test.ShouldBeNil)
764+
}
743765
})
744766
}
745767

pexec/managed_process_unix.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ func (p *managedProcess) sysProcAttr() (*syscall.SysProcAttr, error) {
8787
return attrs, nil
8888
}
8989

90+
func (p *managedProcess) status() error {
91+
p.mu.Lock()
92+
defer p.mu.Unlock()
93+
return p.cmd.Process.Signal(syscall.Signal(0))
94+
}
95+
96+
// kill attempts to stop the managedProcess.
97+
// The boolean return value indicates whether the process was force killed or not. If the process is already done
98+
// or no longer exist, a special ProcessNotExistsError is returned.
9099
func (p *managedProcess) kill() (bool, error) {
91100
p.logger.Infof("stopping process %d with signal %s", p.cmd.Process.Pid, p.stopSig)
92101
// First let's try to directly signal the process.
@@ -101,8 +110,15 @@ func (p *managedProcess) kill() (bool, error) {
101110
select {
102111
case <-timer.C:
103112
p.logger.Infof("stopping entire process group %d with signal %s", p.cmd.Process.Pid, p.stopSig)
104-
if err := syscall.Kill(-p.cmd.Process.Pid, p.stopSig); err != nil && !errors.Is(err, os.ErrProcessDone) {
105-
return false, errors.Wrapf(err, "error signaling process group %d with signal %s", p.cmd.Process.Pid, p.stopSig)
113+
if err := syscall.Kill(-p.cmd.Process.Pid, p.stopSig); err != nil {
114+
var errno syscall.Errno
115+
if errors.As(err, &errno) && errno == syscall.ESRCH {
116+
return false, &ProcessNotExistsError{err}
117+
} else if errors.Is(err, os.ErrProcessDone) {
118+
return false, &ProcessNotExistsError{err}
119+
}
120+
return false, errors.Wrapf(err, "error signaling process group %d with signal %s",
121+
p.cmd.Process.Pid, p.stopSig)
106122
}
107123
case <-p.managingCh:
108124
timer.Stop()
@@ -115,8 +131,15 @@ func (p *managedProcess) kill() (bool, error) {
115131
select {
116132
case <-timer2.C:
117133
p.logger.Infof("killing entire process group %d", p.cmd.Process.Pid)
118-
if err := syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL); err != nil && !errors.Is(err, os.ErrProcessDone) {
119-
return false, errors.Wrapf(err, "error killing process group %d", p.cmd.Process.Pid)
134+
if err := syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL); err != nil {
135+
var errno syscall.Errno
136+
if errors.As(err, &errno) && errno == syscall.ESRCH {
137+
return false, &ProcessNotExistsError{err}
138+
} else if errors.Is(err, os.ErrProcessDone) {
139+
return false, &ProcessNotExistsError{err}
140+
}
141+
return false, errors.Wrapf(err, "error signaling process group %d with signal %s",
142+
p.cmd.Process.Pid, p.stopSig)
120143
}
121144
forceKilled = true
122145
case <-p.managingCh:

pexec/managed_process_windows.go

Lines changed: 126 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
package pexec
44

55
import (
6+
"os"
67
"os/exec"
78
"strconv"
89
"strings"
910
"syscall"
1011
"time"
1112

1213
"github.com/pkg/errors"
14+
"golang.org/x/sys/windows"
1315
)
1416

1517
func sigStr(sig syscall.Signal) string {
@@ -35,75 +37,122 @@ func (p *managedProcess) sysProcAttr() (*syscall.SysProcAttr, error) {
3537
return ret, nil
3638
}
3739

40+
// kill attempts to stop the managedProcess.
41+
// The boolean return value indicates whether the process was force killed or not. If the process is already done
42+
// or no longer exist, a special ProcessNotExistsError is returned.
3843
func (p *managedProcess) kill() (bool, error) {
44+
// NOTE: the first kill attempt behavior is different on unix. If the first attempt to kill the
45+
// process results in os.ErrProcessDone on unix, no error is returned. Only if a process
46+
// is not found when trying to kill its tree or force kill it, then &ProcessNotExistsError
47+
// is returned.
48+
// On windows, we will always return this error, even when the first attempt failed.
3949
const mustForce = "This process can only be terminated forcefully"
4050
pidStr := strconv.Itoa(p.cmd.Process.Pid)
4151
p.logger.Infof("killing process %d", p.cmd.Process.Pid)
4252
// First let's try to ask the process to stop. If it's a console application, this is
4353
// very unlikely to work.
4454
var shouldJustForce bool
55+
// Our first attempt to gracefully close the process is taskkill. Taskkill is a windows
56+
// replacement for kill. However, windows does not implement signals in the same way
57+
// unix does, so their IPC involves "messages". Research has not shown exactly what is
58+
// the message sent by taskkill, but it is most likely WM_CLOSE (another option that I
59+
// have seen in discussions is WM_QUIT). WM_CLOSE is similar to pressing the X button on
60+
// an application window, which asks the process to shutdown, but the handling of this
61+
// "message" is up to the process. Moreover, to receive this message, a process needs to
62+
// have a "window". Since most viam modules do not have their own windows, killing them
63+
// results in a message "This process can only be terminated forcefully". However, this
64+
// line can potentially work if a module will have its own window.
4565
if out, err := exec.Command("taskkill", "/pid", pidStr).CombinedOutput(); err != nil {
4666
switch {
4767
case strings.Contains(string(out), mustForce):
4868
p.logger.Debug("must force terminate process")
69+
// if taskkill doesn't find a window to terminate the process, we will attempt to
70+
// send a "break" control event, which asks for a graceful shutdown of the whole
71+
// process group.
72+
73+
// GenerateConsoleCtrlEvent functions differently from taskkill. In particular, it
74+
// sends a "signal" to a process group that shares a console with the calling process.
75+
// Since we specify the CREATE_NEW_PROCESS_GROUP flag in SysProcAttr, this pattern
76+
// works well for us for two reasons:
77+
// a) The module still shares the console with the calling process (viam-server). If
78+
// we were to specify CREATE_NEW_CONSOLE in the creation flags, this would no longer
79+
// be the case.
80+
// b) By creating a new process group, we are safe to send the break signal to the
81+
// module. If we didn't specify the CREATE_NEW_PROCESS_GROUP flag, we would risk
82+
// shutting down viam-server as well, since they would be in the same process group.
83+
if err := windows.GenerateConsoleCtrlEvent(windows.CTRL_BREAK_EVENT, uint32(p.cmd.Process.Pid)); err != nil {
84+
p.logger.Debugw("sending a control break event to the process group failed with error", "err", err)
85+
}
4986
shouldJustForce = true
5087
case strings.Contains(string(out), "not found"):
51-
return false, nil
88+
return false, &ProcessNotExistsError{err}
5289
default:
5390
return false, errors.Wrapf(err, "error killing process %d", p.cmd.Process.Pid)
5491
}
5592
}
5693

57-
if !shouldJustForce {
58-
// In case the process didn't stop, or left behind any orphan children in its process group,
59-
// we now ask everything in the process tree to stop after a brief wait.
60-
timer := time.NewTimer(p.stopWaitInterval)
61-
defer timer.Stop()
62-
select {
63-
case <-timer.C:
64-
p.logger.Infof("killing entire process tree %d", p.cmd.Process.Pid)
65-
if out, err := exec.Command("taskkill", "/t", "/pid", pidStr).CombinedOutput(); err != nil {
66-
switch {
67-
case strings.Contains(string(out), mustForce):
68-
p.logger.Debug("must force terminate process tree")
69-
shouldJustForce = true
70-
case strings.Contains(string(out), "not found"):
71-
return false, nil
72-
default:
73-
return false, errors.Wrapf(err, "error killing process tree %d", p.cmd.Process.Pid)
74-
}
94+
// In case the process didn't stop, or left behind any orphan children in its process group,
95+
// we now ask everything in the process tree to stop after a brief wait.
96+
timer := time.NewTimer(p.stopWaitInterval)
97+
defer timer.Stop()
98+
select {
99+
case <-timer.C:
100+
p.logger.Infof("killing entire process tree %d", p.cmd.Process.Pid)
101+
out, err := exec.Command("taskkill", "/t", "/pid", pidStr).CombinedOutput()
102+
if err != nil {
103+
switch {
104+
case strings.Contains(string(out), mustForce):
105+
p.logger.Debug("must force terminate process tree")
106+
shouldJustForce = true
107+
case strings.Contains(string(out), "not found"):
108+
return false, &ProcessNotExistsError{err}
109+
default:
110+
return false, errors.Wrapf(err, "error killing process tree %d", p.cmd.Process.Pid)
75111
}
76-
case <-p.managingCh:
77-
timer.Stop()
78112
}
113+
case <-p.managingCh:
114+
timer.Stop()
79115
}
80116

81117
// Lastly, kill everything in the process tree that remains after a longer wait or now. This is
82118
// going to likely result in an "exit status 1" that we will have to interpret.
83119
// FUTURE(erd): find a way to do this better. Research has not come up with much and is
84120
// program dependent.
85-
var forceKilled bool
86-
if !shouldJustForce {
87-
timer2 := time.NewTimer(p.stopWaitInterval * 2)
88-
defer timer2.Stop()
89-
select {
90-
case <-timer2.C:
91-
p.logger.Infof("force killing entire process tree %d", p.cmd.Process.Pid)
92-
if err := exec.Command("taskkill", "/t", "/f", "/pid", pidStr).Run(); err != nil {
93-
return false, errors.Wrapf(err, "error force killing process tree %d", p.cmd.Process.Pid)
121+
122+
// We can force kill the process group right away, if the flag is already set
123+
forceKillCommand := exec.Command("taskkill", "/t", "/f", "/pid", pidStr)
124+
if shouldJustForce {
125+
if out, err := forceKillCommand.CombinedOutput(); err != nil {
126+
switch {
127+
case strings.Contains(string(out), "not found"):
128+
return false, &ProcessNotExistsError{err}
129+
default:
130+
return false, errors.Wrapf(err, "error killing process %d", p.cmd.Process.Pid)
94131
}
95-
forceKilled = true
96-
case <-p.managingCh:
97-
timer2.Stop()
98-
}
99-
} else {
100-
if err := exec.Command("taskkill", "/t", "/f", "/pid", pidStr).Run(); err != nil {
101-
return false, errors.Wrapf(err, "error force killing process tree %d", p.cmd.Process.Pid)
102132
}
103-
forceKilled = true
133+
return true, nil
104134
}
105135

106-
return forceKilled, nil
136+
// If shouldJustForce is not set yet, we will wait on a timer to give managing channel a
137+
// final chance to close. If it doesn't, we will force kill the process tree.
138+
timer2 := time.NewTimer(p.stopWaitInterval * 2)
139+
defer timer2.Stop()
140+
select {
141+
case <-timer2.C:
142+
p.logger.Infof("force killing entire process tree %d", p.cmd.Process.Pid)
143+
if out, err := forceKillCommand.CombinedOutput(); err != nil {
144+
switch {
145+
case strings.Contains(string(out), "not found"):
146+
return false, &ProcessNotExistsError{err}
147+
default:
148+
return false, errors.Wrapf(err, "error killing process %d", p.cmd.Process.Pid)
149+
}
150+
}
151+
return true, nil
152+
case <-p.managingCh:
153+
timer2.Stop()
154+
}
155+
return false, nil
107156
}
108157

109158
// forceKillGroup kills everything in the process tree. This will not wait for completion and may result in a zombie process.
@@ -112,3 +161,41 @@ func (p *managedProcess) forceKillGroup() error {
112161
p.logger.Infof("force killing entire process tree %d", p.cmd.Process.Pid)
113162
return exec.Command("taskkill", "/t", "/f", "/pid", pidStr).Start()
114163
}
164+
165+
// Status is a best effort method to return an os.ErrProcessDone in case the process no
166+
// longer exists.
167+
func (p *managedProcess) status() error {
168+
p.mu.Lock()
169+
defer p.mu.Unlock()
170+
pid, err := p.UnixPid()
171+
if err != nil {
172+
return err
173+
}
174+
175+
handle, err := windows.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid))
176+
defer windows.CloseHandle(handle)
177+
if err != nil {
178+
if err == windows.ERROR_INVALID_PARAMETER {
179+
// Bohdan: my understanding is that Invalid_Paramater is not a strong guarantee, but
180+
// it's highly likely that we can treat it as "ProcessDone".
181+
return os.ErrProcessDone
182+
}
183+
// A common error here could be Access_Denied, which would signal that the process
184+
// still exists.
185+
return err
186+
}
187+
188+
// To be extra sure, we can examine the exit code of the process handle.
189+
var exitCode uint32
190+
err = windows.GetExitCodeProcess(handle, &exitCode)
191+
if err != nil {
192+
return err
193+
}
194+
// Somehow, this constant is not defined in the windows library, but it looks like it's
195+
// a commonly used Windows constant to check that the process is still running.
196+
const STILL_ACTIVE = 259
197+
if exitCode != STILL_ACTIVE {
198+
return os.ErrProcessDone
199+
}
200+
return nil
201+
}

pexec/process_manager_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pexec
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"runtime"
@@ -183,7 +184,13 @@ func TestProcessManagerStart(t *testing.T) {
183184
logger := golog.NewTestLogger(t)
184185
pm := NewProcessManager(logger)
185186
defer func() {
186-
test.That(t, pm.Stop(), test.ShouldBeNil)
187+
if runtime.GOOS == "windows" {
188+
// on windows, we return a process not found error here
189+
var processNotExistsErr *ProcessNotExistsError
190+
test.That(t, errors.As(pm.Stop(), &processNotExistsErr), test.ShouldBeTrue)
191+
} else {
192+
test.That(t, pm.Stop(), test.ShouldBeNil)
193+
}
187194
}()
188195
test.That(t, pm.Start(context.Background()), test.ShouldBeNil)
189196
test.That(t, pm.Start(context.Background()), test.ShouldBeNil)

0 commit comments

Comments
 (0)