Skip to content

Commit

Permalink
Refactor work units and added tests for command.go (#899)
Browse files Browse the repository at this point in the history
  • Loading branch information
resoluteCoder authored Jan 15, 2024
1 parent 1e275cb commit 2bae5f3
Show file tree
Hide file tree
Showing 15 changed files with 1,234 additions and 198 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ build-all:
GOOS=windows go build -o receptor.exe ./cmd/receptor-cl && \
GOOS=darwin go build -o receptor.app ./cmd/receptor-cl && \
go build example/*.go && \
go build -o receptor --tags no_backends,no_services,no_tls_config,no_workceptor,no_cert_auth ./cmd/receptor-cl && \
go build -o receptor --tags no_backends,no_services,no_tls_config ./cmd/receptor-cl && \
go build -o receptor ./cmd/receptor-cl

DIST := receptor_$(shell echo '$(VERSION)' | sed 's/^v//')_$(GOOS)_$(GOARCH)
Expand Down
85 changes: 59 additions & 26 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,60 @@
package workceptor

import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"

"github.com/ghjm/cmdline"
"github.com/google/shlex"
)

type BaseWorkUnitForWorkUnit interface {
CancelContext()
ID() string
Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper)
LastUpdateError() error
Load() error
MonitorLocalStatus()
Release(force bool) error
Save() error
SetFromParams(_ map[string]string) error
Status() *StatusFileData
StatusFileName() string
StdoutFileName() string
UnitDir() string
UnredactedStatus() *StatusFileData
UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateFullStatus(statusFunc func(*StatusFileData))
GetStatusCopy() StatusFileData
GetStatusWithoutExtraData() *StatusFileData
SetStatusExtraData(interface{})
GetStatusLock() *sync.RWMutex
GetWorkceptor() *Workceptor
SetWorkceptor(*Workceptor)
GetContext() context.Context
GetCancel() context.CancelFunc
}

// commandUnit implements the WorkUnit interface for the Receptor command worker plugin.
type commandUnit struct {
BaseWorkUnit
BaseWorkUnitForWorkUnit
command string
baseParams string
allowRuntimeParams bool
done bool
}

// commandExtraData is the content of the ExtraData JSON field for a command worker.
type commandExtraData struct {
// CommandExtraData is the content of the ExtraData JSON field for a command worker.
type CommandExtraData struct {
Pid int
Params string
}
Expand Down Expand Up @@ -60,7 +89,7 @@ func cmdWaiter(cmd *exec.Cmd, doneChan chan bool) {
// commandRunner is run in a separate process, to monitor the subprocess and report back metadata.
func commandRunner(command string, params string, unitdir string) error {
status := StatusFileData{}
status.ExtraData = &commandExtraData{}
status.ExtraData = &CommandExtraData{}
statusFilename := path.Join(unitdir, "status")
err := status.UpdateBasicStatus(statusFilename, WorkStatePending, "Not started yet", 0)
if err != nil {
Expand Down Expand Up @@ -169,7 +198,7 @@ func (cw *commandUnit) SetFromParams(params map[string]string) error {
if cmdParams != "" && !cw.allowRuntimeParams {
return fmt.Errorf("extra params provided but not allowed")
}
cw.status.ExtraData.(*commandExtraData).Params = combineParams(cw.baseParams, cmdParams)
cw.GetStatusCopy().ExtraData.(*CommandExtraData).Params = combineParams(cw.baseParams, cmdParams)

return nil
}
Expand All @@ -181,10 +210,10 @@ func (cw *commandUnit) Status() *StatusFileData {

// UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (cw *commandUnit) UnredactedStatus() *StatusFileData {
cw.statusLock.RLock()
defer cw.statusLock.RUnlock()
status := cw.getStatus()
ed, ok := cw.status.ExtraData.(*commandExtraData)
cw.GetStatusLock().RLock()
defer cw.GetStatusLock().RUnlock()
status := cw.GetStatusWithoutExtraData()
ed, ok := cw.GetStatusCopy().ExtraData.(*CommandExtraData)
if ok {
edCopy := *ed
status.ExtraData = &edCopy
Expand All @@ -206,9 +235,9 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {
}
cw.UpdateFullStatus(func(status *StatusFileData) {
if status.ExtraData == nil {
status.ExtraData = &commandExtraData{}
status.ExtraData = &CommandExtraData{}
}
status.ExtraData.(*commandExtraData).Pid = cmd.Process.Pid
status.ExtraData.(*CommandExtraData).Pid = cmd.Process.Pid
})
doneChan := make(chan bool)
go func() {
Expand All @@ -226,8 +255,8 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {

// Start launches a job with given parameters.
func (cw *commandUnit) Start() error {
level := cw.w.nc.GetLogger().GetLogLevel()
levelName, _ := cw.w.nc.GetLogger().LogLevelToName(level)
level := cw.GetWorkceptor().nc.GetLogger().GetLogLevel()
levelName, _ := cw.GetWorkceptor().nc.GetLogger().LogLevelToName(level)
cw.UpdateBasicStatus(WorkStatePending, "Launching command runner", 0)

// TODO: This is another place where we rely on a pre-built binary for testing.
Expand All @@ -243,7 +272,7 @@ func (cw *commandUnit) Start() error {
"--log-level", levelName,
"--command-runner",
fmt.Sprintf("command=%s", cw.command),
fmt.Sprintf("params=%s", cw.Status().ExtraData.(*commandExtraData).Params),
fmt.Sprintf("params=%s", cw.Status().ExtraData.(*CommandExtraData).Params),
fmt.Sprintf("unitdir=%s", cw.UnitDir()))

return cw.runCommand(cmd)
Expand All @@ -270,9 +299,9 @@ func (cw *commandUnit) Restart() error {

// Cancel stops a running job.
func (cw *commandUnit) Cancel() error {
cw.cancel()
cw.CancelContext()
status := cw.Status()
ced, ok := status.ExtraData.(*commandExtraData)
ced, ok := status.ExtraData.(*CommandExtraData)
if !ok || ced.Pid <= 0 {
return nil
}
Expand Down Expand Up @@ -304,7 +333,7 @@ func (cw *commandUnit) Release(force bool) error {
return err
}

return cw.BaseWorkUnit.Release(force)
return cw.BaseWorkUnitForWorkUnit.Release(force)
}

// **************************************************************************
Expand All @@ -320,18 +349,22 @@ type CommandWorkerCfg struct {
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}

func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit {
cw := &commandUnit{
BaseWorkUnit: BaseWorkUnit{
func (cfg CommandWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit {
if bwu == nil {
bwu = &BaseWorkUnit{
status: StatusFileData{
ExtraData: &commandExtraData{},
ExtraData: &CommandExtraData{},
},
},
command: cfg.Command,
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
}

cw := &commandUnit{
BaseWorkUnitForWorkUnit: bwu,
command: cfg.Command,
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
cw.BaseWorkUnitForWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
Loading

0 comments on commit 2bae5f3

Please sign in to comment.