Skip to content

Commit

Permalink
fix: output log after log rotation
Browse files Browse the repository at this point in the history
Signed-off-by: xyz-li <[email protected]>
  • Loading branch information
xyz-li committed Jul 16, 2024
1 parent 31352aa commit 84a9837
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/fahedouch/go-logrotate v0.2.1
github.com/fatih/color v1.17.0
github.com/fluent/fluent-logger-golang v1.9.0
github.com/fsnotify/fsnotify v1.7.0
github.com/ipfs/go-cid v0.4.1
github.com/klauspost/compress v1.17.9
github.com/mattn/go-isatty v0.0.20
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/ml
github.com/fluent/fluent-logger-golang v1.9.0/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
github.com/go-jose/go-jose/v4 v4.0.3 h1:o8aphO8Hv6RPmH+GfzVuyf7YXSBibp+8YyHdOoDESGo=
github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA=
github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
42 changes: 40 additions & 2 deletions pkg/logging/cri_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package logging
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -35,6 +36,7 @@ import (

"github.com/containerd/log"
"github.com/containerd/nerdctl/v2/pkg/logging/tail"
"github.com/fsnotify/fsnotify"
)

// LogStreamType is the type of the stream in CRI container log.
Expand All @@ -45,6 +47,9 @@ const (
Stdout LogStreamType = "stdout"
// Stderr is the stream type for stderr.
Stderr LogStreamType = "stderr"

// logForceCheckPeriod is the period to check for a new read
logForceCheckPeriod = 1 * time.Second
)

// LogTag is the tag of a log line in CRI container log.
Expand Down Expand Up @@ -89,7 +94,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", logPath, err)
}
defer f.Close()
defer func() {
f.Close()
}()

// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(f, opts.Tail)
Expand All @@ -101,6 +108,8 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
return fmt.Errorf("failed to seek in log file %q: %v", logPath, err)
}

var watcher *fsnotify.Watcher

limitedMode := (opts.Tail > 0) && (!opts.Follow)
limitedNum := opts.Tail
// Start parsing the logs.
Expand All @@ -110,6 +119,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
isNewLine := true
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
baseName := filepath.Base(logPath)
dir := filepath.Dir(logPath)

for {
select {
case <-stopChannel:
Expand All @@ -126,13 +138,39 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
return fmt.Errorf("failed to read log file %q: %v", logPath, err)
}
if opts.Follow {

// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err)
}

if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = NewLogFileWatcher(dir); err != nil {
return err
}
defer watcher.Close()
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}

var recreated bool
// Wait until the next log change.
recreated, err = startTail(context.Background(), baseName, watcher)
if err != nil {
return err
}
if recreated {
newF, err := openFileShareDelete(logPath)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", logPath, err)
}
f.Close()
f = newF
r = bufio.NewReader(f)
}

// If the container exited consume data until the next EOF
continue
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/logging/cri_logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -230,3 +231,85 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) {
t.Errorf("should have two lines, lineCount= %d", lineCount)
}
}

func TestReadRotatedLog(t *testing.T) {
tmpDir := t.TempDir()
file, err := os.CreateTemp(tmpDir, "logfile")
if err != nil {
t.Errorf("unable to create temp file, error: %s", err.Error())
}
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
containerStoped := make(chan os.Signal)
// Start to follow the container's log.
fileName := file.Name()
go func() {
lvOpts := &LogViewOptions{
Follow: true,
LogPath: fileName,
}
_ = ReadLogs(lvOpts, stdoutBuf, stderrBuf, containerStoped)
}()

// log in stdout
expectedStdout := "line0line2line4line6line8"
// log in stderr
expectedStderr := "line1line3line5line7line9"

dir := filepath.Dir(file.Name())
baseName := filepath.Base(file.Name())

// Write 10 lines to log file.
// Let ReadLogs start.
time.Sleep(50 * time.Millisecond)

for line := 0; line < 10; line++ {
// Write the first three lines to log file
now := time.Now().Format(time.RFC3339Nano)
if line%2 == 0 {
file.WriteString(fmt.Sprintf(
"%s stdout P line%d\n", now, line))
} else {
file.WriteString(fmt.Sprintf(
"%s stderr P line%d\n", now, line))
}

time.Sleep(1 * time.Millisecond)

if line == 5 {
file.Close()
// Pretend to rotate the log.
rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405"))
rotatedName = filepath.Join(dir, rotatedName)
if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil {
t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error())
return
}

time.Sleep(20 * time.Millisecond)
newF := filepath.Join(dir, baseName)
if file, err = os.Create(newF); err != nil {
t.Errorf("unable to create new log file, error: %s", err.Error())
return
}
}
}

// Finished writing into the file, close it, so we can delete it later.
err = file.Close()
if err != nil {
t.Errorf("could not close file, error: %s", err.Error())
}

time.Sleep(2 * time.Second)
// Make the function ReadLogs end.
close(containerStoped)

if expectedStdout != stdoutBuf.String() {
t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String())
}

if expectedStderr != stderrBuf.String() {
t.Errorf("expected: %s, acoutal: %s", expectedStderr, stderrBuf.String())
}
}
44 changes: 44 additions & 0 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
"path/filepath"
"sort"
"sync"
"time"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/containerd/log"
"github.com/fsnotify/fsnotify"
"github.com/muesli/cancelreader"
)

Expand Down Expand Up @@ -217,3 +219,45 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
return nil
}, nil
}

func NewLogFileWatcher(dir string) (*fsnotify.Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
if err = watcher.Add(dir); err != nil {
watcher.Close()
return nil, fmt.Errorf("failed to watch directory %q: %w", dir, err)
}
return watcher, nil
}

// startTail wait for the next log write.
// the boolean value indicates if the log file was recreated;
// the error is error happens during waiting new logs.
func startTail(ctx context.Context, logName string, w *fsnotify.Watcher) (bool, error) {
errRetry := 5
for {
select {
case <-ctx.Done():
return false, fmt.Errorf("context cancelled")
case e := <-w.Events:
switch {
case e.Has(fsnotify.Write):
return false, nil
case e.Has(fsnotify.Create):
return filepath.Base(e.Name) == logName, nil
default:
log.L.Debugf("Received unexpected fsnotify event: %v, retrying", e)
}
case err := <-w.Errors:
log.L.Debugf("Received fsnotify watch error, retrying unless no more retries left, retries: %d, error: %s", errRetry, err)
if errRetry == 0 {
return false, err
}
errRetry--
case <-time.After(logForceCheckPeriod):
return false, nil
}
}
}
34 changes: 34 additions & 0 deletions pkg/logging/logs_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build !windows
// +build !windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_other.go
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0
*/
package logging

import (
"os"
)

func openFileShareDelete(path string) (*os.File, error) {
// Noop. Only relevant for Windows.
return os.Open(path)
}
54 changes: 54 additions & 0 deletions pkg/logging/logs_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build windows
// +build windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_windows.go
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0
*/
package logging

import (
"os"
"syscall"
)

// Based on Windows implementation of Windows' syscall.Open
// https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/syscall/syscall_windows.go;l=342
// In addition to syscall.Open, this function also adds the syscall.FILE_SHARE_DELETE flag to sharemode,
// which will allow us to read from the file without blocking the file from being deleted or renamed.
// This is essential for Log Rotation which is done by renaming the open file. Without this, the file rename would fail.
func openFileShareDelete(path string) (*os.File, error) {
pathp, err := syscall.UTF16PtrFromString(path)
if err != nil {
return nil, err
}

var access uint32 = syscall.GENERIC_READ
var sharemode uint32 = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE | syscall.FILE_SHARE_DELETE
var createmode uint32 = syscall.OPEN_EXISTING
var attrs uint32 = syscall.FILE_ATTRIBUTE_NORMAL

handle, err := syscall.CreateFile(pathp, access, sharemode, nil, createmode, attrs, 0)
if err != nil {
return nil, err
}

return os.NewFile(uintptr(handle), path), nil
}

0 comments on commit 84a9837

Please sign in to comment.