Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[carry #2337] ensure logger completion after container exit #3995

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions cmd/nerdctl/container/container_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,55 @@ func TestLogsWithFailingContainer(t *testing.T) {
base.Cmd("rm", "-f", containerName).AssertOK()
}

func TestLogsWithRunningContainer(t *testing.T) {
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", "-f", containerName).Run()
expected := make([]string, 10)
for i := 0; i < 10; i++ {
expected[i] = fmt.Sprint(i + 1)
}

base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK()
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
}

func TestLogsWithoutNewlineOrEOF(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows")
}
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", "-f", containerName).Run()
expected := []string{"Hello World!", "There is no newline"}
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"printf", "'Hello World!\nThere is no newline'").AssertOK()
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
}

func TestLogsAfterRestartingContainer(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem <id>: The requested operation for attach namespace failed.: unknown")
}
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", "-f", containerName).Run()
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"printf", "'Hello World!\nThere is no newline'").AssertOK()
expected := []string{"Hello World!", "There is no newline"}
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
// restart and check logs again
base.Cmd("start", containerName)
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
}

func TestLogsWithForegroundContainers(t *testing.T) {
testCase := nerdtest.Setup()
// dual logging is not supported on Windows
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/container/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti
// Setup goroutine to send stop event if container task finishes:
go func() {
<-waitCh
// Wait for logger to process remaining logs after container exit
if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil {
log.G(ctx).WithError(err).Error("failed to wait for logger shutdown")
}
log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer")
stopChannel <- os.Interrupt
}()
Expand Down
104 changes: 98 additions & 6 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
"sync"
"time"

containerd "github.com/containerd/containerd/v2/client"
"github.com/fsnotify/fsnotify"
"github.com/muesli/cancelreader"

"github.com/containerd/containerd/v2/core/runtime/v2/logging"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/nerdctl/v2/pkg/lockutil"
)

const (
Expand Down Expand Up @@ -149,7 +151,59 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) {
return logConfig, nil
}

func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error {
func getLockPath(dataStore, ns, id string) string {
return filepath.Join(dataStore, "containers", ns, id, "logger-lock")
}

// WaitForLogger waits until the logger has finished executing and processing container logs
func WaitForLogger(dataStore, ns, id string) error {
return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error {
return nil
})
}

// getContainerWait loads the container from ID and returns its wait channel
func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
client, err := containerd.New(address, containerd.WithDefaultNamespace(config.Namespace))
if err != nil {
return nil, err
}
con, err := client.LoadContainer(ctx, config.ID)
if err != nil {
return nil, err
}

task, err := con.Task(ctx, nil)
if err == nil {
return task.Wait(ctx)
}
if !errdefs.IsNotFound(err) {
return nil, err
}

// If task was not found, it's possible that the container runtime is still being created.
// Retry every 100ms.
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, errors.New("timed out waiting for container task to start")
case <-ticker.C:
task, err = con.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
return task.Wait(ctx)
}
}
}

func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, config *logging.Config) error {
if err := driver.PreProcess(ctx, dataStore, config); err != nil {
return err
}
Expand All @@ -168,6 +222,20 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
stderrR.Cancel()
}()

// initialize goroutines to copy stdout and stderr streams to a closable pipe
pipeStdoutR, pipeStdoutW := io.Pipe()
pipeStderrR, pipeStderrW := io.Pipe()
copyStream := func(reader io.Reader, writer *io.PipeWriter) {
// copy using a buffer of size 32K
buf := make([]byte, 32<<10)
_, err := io.CopyBuffer(writer, reader, buf)
if err != nil {
log.G(ctx).Errorf("failed to copy stream: %s", err)
}
}
go copyStream(stdoutR, pipeStdoutW)
go copyStream(stderrR, pipeStderrW)

var wg sync.WaitGroup
wg.Add(3)
stdout := make(chan string, 10000)
Expand All @@ -192,12 +260,24 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
}
}
}
go processLogFunc(stdoutR, stdout)
go processLogFunc(stderrR, stderr)
go processLogFunc(pipeStdoutR, stdout)
go processLogFunc(pipeStderrR, stderr)
go func() {
defer wg.Done()
driver.Process(stdout, stderr)
}()
go func() {
// close stdout and stderr upon container exit
defer pipeStdoutW.Close()
defer pipeStdoutW.Close()

exitCh, err := getContainerWait(ctx, address, config)
if err != nil {
log.G(ctx).Errorf("failed to get container task wait channel: %v", err)
return
}
<-exitCh
}()
wg.Wait()
return driver.PostProcess()
}
Expand All @@ -220,11 +300,23 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
if err != nil {
return err
}
if err := ready(); err != nil {

loggerLock := getLockPath(dataStore, config.Namespace, config.ID)
f, err := os.Create(loggerLock)
if err != nil {
return err
}

return loggingProcessAdapter(ctx, driver, dataStore, config)
defer f.Close()

// the logger will obtain an exclusive lock on a file until the container is
// stopped and the driver has finished processing all output,
// so that waiting log viewers can be signalled when the process is complete.
return lockutil.WithDirLock(loggerLock, func() error {
if err := ready(); err != nil {
return err
}
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, config)
})
} else if !errors.Is(err, os.ErrNotExist) {
// the file does not exist if the container was created with nerdctl < 0.20
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/logging/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestLoggingProcessAdapter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := loggingProcessAdapter(ctx, driver, "testDataStore", config)
err := loggingProcessAdapter(ctx, driver, "testDataStore", "", config)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading