diff --git a/go.mod b/go.mod index aaf2291ec97..26a574b8255 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/fahedouch/go-logrotate v0.2.1 github.com/fatih/color v1.17.0 github.com/fluent/fluent-logger-golang v1.9.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/ipfs/go-cid v0.4.1 github.com/klauspost/compress v1.17.9 github.com/mattn/go-isatty v0.0.20 diff --git a/go.sum b/go.sum index 4c3172ebf39..8070c0ca46c 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/mldQDA7czhWXEg= github.com/fluent/fluent-logger-golang v1.9.0/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-jose/go-jose/v4 v4.0.3 h1:o8aphO8Hv6RPmH+GfzVuyf7YXSBibp+8YyHdOoDESGo= github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/pkg/logging/cri_logger.go b/pkg/logging/cri_logger.go index c3c3b35efc3..eada94c6add 100644 --- a/pkg/logging/cri_logger.go +++ b/pkg/logging/cri_logger.go @@ -25,6 +25,7 @@ package logging import ( "bufio" "bytes" + "context" "errors" "fmt" "io" @@ -35,6 +36,7 @@ import ( "github.com/containerd/log" "github.com/containerd/nerdctl/v2/pkg/logging/tail" + "github.com/fsnotify/fsnotify" ) // LogStreamType is the type of the stream in CRI container log. @@ -45,6 +47,9 @@ const ( Stdout LogStreamType = "stdout" // Stderr is the stream type for stderr. Stderr LogStreamType = "stderr" + + // logForceCheckPeriod is the period to check for a new read + logForceCheckPeriod = 1 * time.Second ) // LogTag is the tag of a log line in CRI container log. @@ -89,7 +94,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o if err != nil { return fmt.Errorf("failed to open log file %q: %v", logPath, err) } - defer f.Close() + defer func() { + f.Close() + }() // Search start point based on tail line. start, err := tail.FindTailLineStartIndex(f, opts.Tail) @@ -101,6 +108,8 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o return fmt.Errorf("failed to seek in log file %q: %v", logPath, err) } + var watcher *fsnotify.Watcher + limitedMode := (opts.Tail > 0) && (!opts.Follow) limitedNum := opts.Tail // Start parsing the logs. @@ -110,6 +119,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o isNewLine := true writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} + baseName := filepath.Base(logPath) + dir := filepath.Dir(logPath) + for { select { case <-stopChannel: @@ -126,13 +138,48 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o return fmt.Errorf("failed to read log file %q: %v", logPath, err) } if opts.Follow { - // Reset seek so that if this is an incomplete line, // it will be read again. if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err) } + if watcher == nil { + // Initialize the watcher if it has not been initialized yet. + if watcher, err = NewLogFileWatcher(dir); err != nil { + return err + } + defer watcher.Close() + // If we just created the watcher, try again to read as we might have missed + // the event. + continue + } + + var recreated bool + // Wait until the next log change. + recreated, err = startTail(context.Background(), baseName, watcher) + if err != nil { + return err + } + if recreated { + newF, err := openFileShareDelete(logPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + //If the user application outputs logs too quickly, + //There is a slight possibility that nerdctl has just rotated the log file, + //try opening it once more. + time.Sleep(10 * time.Millisecond) + } + newF, err = openFileShareDelete(logPath) + if err != nil { + return fmt.Errorf("failed to open cri logfile %q: %w", logPath, err) + } + } + f.Close() + f = newF + r = bufio.NewReader(f) + } + // If the container exited consume data until the next EOF continue } diff --git a/pkg/logging/cri_logger_test.go b/pkg/logging/cri_logger_test.go index 3788a3c706b..c8207d3c341 100644 --- a/pkg/logging/cri_logger_test.go +++ b/pkg/logging/cri_logger_test.go @@ -28,6 +28,7 @@ import ( "fmt" "io" "os" + "path/filepath" "reflect" "testing" "time" @@ -230,3 +231,85 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) { t.Errorf("should have two lines, lineCount= %d", lineCount) } } + +func TestReadRotatedLog(t *testing.T) { + tmpDir := t.TempDir() + file, err := os.CreateTemp(tmpDir, "logfile") + if err != nil { + t.Errorf("unable to create temp file, error: %s", err.Error()) + } + stdoutBuf := &bytes.Buffer{} + stderrBuf := &bytes.Buffer{} + containerStoped := make(chan os.Signal) + // Start to follow the container's log. + fileName := file.Name() + go func() { + lvOpts := &LogViewOptions{ + Follow: true, + LogPath: fileName, + } + _ = ReadLogs(lvOpts, stdoutBuf, stderrBuf, containerStoped) + }() + + // log in stdout + expectedStdout := "line0line2line4line6line8" + // log in stderr + expectedStderr := "line1line3line5line7line9" + + dir := filepath.Dir(file.Name()) + baseName := filepath.Base(file.Name()) + + // Write 10 lines to log file. + // Let ReadLogs start. + time.Sleep(50 * time.Millisecond) + + for line := 0; line < 10; line++ { + // Write the first three lines to log file + now := time.Now().Format(time.RFC3339Nano) + if line%2 == 0 { + file.WriteString(fmt.Sprintf( + "%s stdout P line%d\n", now, line)) + } else { + file.WriteString(fmt.Sprintf( + "%s stderr P line%d\n", now, line)) + } + + time.Sleep(1 * time.Millisecond) + + if line == 5 { + file.Close() + // Pretend to rotate the log. + rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405")) + rotatedName = filepath.Join(dir, rotatedName) + if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil { + t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error()) + return + } + + time.Sleep(20 * time.Millisecond) + newF := filepath.Join(dir, baseName) + if file, err = os.Create(newF); err != nil { + t.Errorf("unable to create new log file, error: %s", err.Error()) + return + } + } + } + + // Finished writing into the file, close it, so we can delete it later. + err = file.Close() + if err != nil { + t.Errorf("could not close file, error: %s", err.Error()) + } + + time.Sleep(2 * time.Second) + // Make the function ReadLogs end. + close(containerStoped) + + if expectedStdout != stdoutBuf.String() { + t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String()) + } + + if expectedStderr != stderrBuf.String() { + t.Errorf("expected: %s, acoutal: %s", expectedStderr, stderrBuf.String()) + } +} diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index defd6e8174d..f76c053bca4 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -17,24 +17,24 @@ package logging import ( - "bufio" + "context" "errors" "fmt" "io" "os" - "os/exec" "path/filepath" "strconv" - "strings" "time" "github.com/containerd/containerd/runtime/v2/logging" "github.com/containerd/errdefs" "github.com/containerd/log" "github.com/containerd/nerdctl/v2/pkg/logging/jsonfile" + "github.com/containerd/nerdctl/v2/pkg/logging/tail" "github.com/containerd/nerdctl/v2/pkg/strutil" "github.com/docker/go-units" "github.com/fahedouch/go-logrotate" + "github.com/fsnotify/fsnotify" ) var JSONDriverLogOpts = []string{ @@ -142,9 +142,6 @@ func viewLogsJSONFile(lvopts LogViewOptions, stdout, stderr io.Writer, stopChann } } - if checkExecutableAvailableInPath("tail") { - return viewLogsJSONFileThroughTailExec(lvopts, logFilePath, stdout, stderr, stopChannel) - } return viewLogsJSONFileDirect(lvopts, logFilePath, stdout, stderr, stopChannel) } @@ -156,118 +153,100 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou if err != nil { return err } - defer fin.Close() - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail) - if err != nil { - return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err) - } + defer func() { fin.Close() }() - if lvopts.Follow { - // Get the current file handler's seek. - lastPos, err := fin.Seek(0, io.SeekCurrent) - if err != nil { - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) - } - fin.Close() - for { - select { - case <-stopChannel: - log.L.Debugf("received stop signal while re-reading JSON logfile, returning") + // Search start point based on tail line. + start, err := tail.FindTailLineStartIndex(fin, lvopts.Tail) + if err != nil { + return fmt.Errorf("failed to tail %d lines of JSON logfile %q: %w", lvopts.Tail, jsonLogFilePath, err) + } + + if _, err := fin.Seek(start, io.SeekStart); err != nil { + return fmt.Errorf("failed to seek in log file %q from %d position: %w", jsonLogFilePath, start, err) + } + + limitedMode := (lvopts.Tail > 0) && (!lvopts.Follow) + limitedNum := lvopts.Tail + var stop bool + var watcher *fsnotify.Watcher + baseName := filepath.Base(jsonLogFilePath) + dir := filepath.Dir(jsonLogFilePath) + retryTimes := 2 + backBytes := 0 + + for { + select { + case <-stopChannel: + log.L.Debug("received stop signal while re-reading JSON logfile, returning") + return nil + default: + if stop || (limitedMode && limitedNum == 0) { + log.L.Debugf("finished parsing log JSON filefile, path: %s", jsonLogFilePath) return nil - default: - // Re-open the file and seek to the last-consumed offset. - fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) + } + + if line, err := jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until); err != nil { + if len(line) > 0 { + time.Sleep(5 * time.Millisecond) + if retryTimes == 0 { + log.L.Infof("finished parsing log JSON filefile, path: %s, line: %s", jsonLogFilePath, string(line)) + return fmt.Errorf("error occurred while doing read of JSON logfile %q: %s, retryTimes: %d", jsonLogFilePath, err, retryTimes) + } + retryTimes-- + backBytes = len(line) + } else { + return fmt.Errorf("error occurred while doing read of JSON logfile %q: %s", jsonLogFilePath, err) } - _, err = fin.Seek(lastPos, 0) + } else { + retryTimes = 2 + backBytes = 0 + } + + if lvopts.Follow { + // Get the current file handler's seek. + lastPos, err := fin.Seek(int64(-backBytes), io.SeekCurrent) if err != nil { - fin.Close() return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) } - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) + if watcher == nil { + // Initialize the watcher if it has not been initialized yet. + if watcher, err = NewLogFileWatcher(dir); err != nil { + return err + } + defer watcher.Close() + // If we just created the watcher, try again to read as we might have missed + // the event. + continue } - // Record current file seek position before looping again. - lastPos, err = fin.Seek(0, io.SeekCurrent) + var recreated bool + // Wait until the next log change. + recreated, err = startTail(context.Background(), baseName, watcher) if err != nil { + return err + } + if recreated { + newF, err := openFileShareDelete(jsonLogFilePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + //If the user application outputs logs too quickly, + //There is a slight possibility that nerdctl has just rotated the log file, + //try opening it once more. + time.Sleep(10 * time.Millisecond) + } + newF, err = openFileShareDelete(jsonLogFilePath) + if err != nil { + return fmt.Errorf("failed to open JSON logfile %q: %w", jsonLogFilePath, err) + } + } fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) + fin = newF } - fin.Close() + continue } + stop = true // Give the OS a second to breathe before re-opening the file: - time.Sleep(time.Second) - } - } - return nil -} - -// Loads logs through the `tail` executable. -func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath string, stdout, stderr io.Writer, stopChannel chan os.Signal) error { - var args []string - - args = append(args, "-n") - if lvopts.Tail == 0 { - args = append(args, "+0") - } else { - args = append(args, strconv.FormatUint(uint64(lvopts.Tail), 10)) - } - - if lvopts.Follow { - // using the `-F` to follow the file name instead of descriptor and retry if inaccessible - args = append(args, "-F") - } - args = append(args, jsonLogFilePath) - cmd := exec.Command("tail", args...) - - cmdStdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - - cmdStderr, err := cmd.StderrPipe() - if err != nil { - return err - } - - if err := cmd.Start(); err != nil { - return err - } - - // filter the unwanted error message of the tail - go filterTailStderr(cmdStderr) - - // Setup killing goroutine: - go func() { - <-stopChannel - log.L.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid) - cmd.Process.Kill() - }() - - return jsonfile.Decode(stdout, stderr, cmdStdout, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) -} - -func filterTailStderr(reader io.Reader) error { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - line := scanner.Text() - if strings.HasSuffix(line, "has appeared; following new file") || - strings.HasSuffix(line, "has become inaccessible: No such file or directory") || - strings.HasSuffix(line, "has been replaced; following new file") || - strings.HasSuffix(line, ": No such file or directory") { - continue } - fmt.Fprintln(os.Stderr, line) } - - if err := scanner.Err(); err != nil { - return err - } - return nil } diff --git a/pkg/logging/json_logger_test.go b/pkg/logging/json_logger_test.go new file mode 100644 index 00000000000..76014d761df --- /dev/null +++ b/pkg/logging/json_logger_test.go @@ -0,0 +1,173 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package logging + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func TestReadRotatedJSONLog(t *testing.T) { + tmpDir := t.TempDir() + file, err := os.CreateTemp(tmpDir, "logfile") + if err != nil { + t.Errorf("unable to create temp file, error: %s", err.Error()) + } + stdoutBuf := &bytes.Buffer{} + stderrBuf := &bytes.Buffer{} + containerStopped := make(chan os.Signal) + // Start to follow the container's log. + fileName := file.Name() + go func() { + lvOpts := LogViewOptions{ + Follow: true, + LogPath: fileName, + } + viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, containerStopped) + }() + + // log in stdout + expectedStdout := "line0\nline1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\n" + dir := filepath.Dir(file.Name()) + baseName := filepath.Base(file.Name()) + + // Write 10 lines to log file. + // Let ReadLogs start. + time.Sleep(50 * time.Millisecond) + + type logContent struct { + Log string `json:"log"` + Stream string `json:"stream"` + Time string `json:"time"` + } + + for line := 0; line < 10; line++ { + // Write the first three lines to log file + log := logContent{} + log.Log = fmt.Sprintf("line%d\n", line) + log.Stream = "stdout" + log.Time = time.Now().Format(time.RFC3339Nano) + time.Sleep(1 * time.Millisecond) + logData, _ := json.Marshal(log) + file.Write(logData) + + if line == 5 { + file.Close() + // Pretend to rotate the log. + rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("20060102-150405")) + rotatedName = filepath.Join(dir, rotatedName) + if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil { + t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error()) + return + } + + time.Sleep(20 * time.Millisecond) + newF := filepath.Join(dir, baseName) + if file, err = os.Create(newF); err != nil { + t.Errorf("unable to create new log file, error: %s", err.Error()) + return + } + } + } + + // Finished writing into the file, close it, so we can delete it later. + err = file.Close() + if err != nil { + t.Errorf("could not close file, error: %s", err.Error()) + } + + time.Sleep(2 * time.Second) + // Make the function ReadLogs end. + close(containerStopped) + + if expectedStdout != stdoutBuf.String() { + t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String()) + } +} + +func TestReadJSONLogs(t *testing.T) { + file, err := os.CreateTemp("", "TestFollowLogs") + if err != nil { + t.Fatalf("unable to create temp file") + } + defer os.Remove(file.Name()) + file.WriteString(`{"log":"line1\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + file.WriteString(`{"log":"line2\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + file.WriteString(`{"log":"line3\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + + stopChan := make(chan os.Signal) + testCases := []struct { + name string + logViewOptions LogViewOptions + expected string + }{ + { + name: "default log options should output all lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 0, + }, + expected: "line1\nline2\nline3\n", + }, + { + name: "using Tail 2 should output last 2 lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 2, + }, + expected: "line2\nline3\n", + }, + { + name: "using Tail 4 should output all lines when the log has less than 4 lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 4, + }, + expected: "line1\nline2\nline3\n", + }, + { + name: "using Tail 0 should output all", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 0, + }, + expected: "line1\nline2\nline3\n", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + err = viewLogsJSONFileDirect(tc.logViewOptions, file.Name(), stdoutBuf, stderrBuf, stopChan) + + if err != nil { + t.Fatalf(err.Error()) + } + if stderrBuf.Len() > 0 { + t.Fatalf("Stderr: %v", stderrBuf.String()) + } + if actual := stdoutBuf.String(); tc.expected != actual { + t.Fatalf("Actual output does not match expected.\nActual: %v\nExpected: %v\n", actual, tc.expected) + } + }) + } +} diff --git a/pkg/logging/jsonfile/jsonfile.go b/pkg/logging/jsonfile/jsonfile.go index cb951534281..eebcff7cd96 100644 --- a/pkg/logging/jsonfile/jsonfile.go +++ b/pkg/logging/jsonfile/jsonfile.go @@ -17,7 +17,6 @@ package jsonfile import ( - "container/ring" "encoding/json" "fmt" "io" @@ -128,12 +127,7 @@ func writeEntry(e *Entry, stdout, stderr io.Writer, refTime time.Time, timestamp return nil } -func Decode(stdout, stderr io.Writer, r io.Reader, timestamps bool, since string, until string, tail uint) error { - var buff *ring.Ring - if tail != 0 { - buff = ring.New(int(tail)) - } - +func Decode(stdout, stderr io.Writer, r io.Reader, timestamps bool, since string, until string) ([]byte, error) { dec := json.NewDecoder(r) now := time.Now() for { @@ -141,41 +135,19 @@ func Decode(stdout, stderr io.Writer, r io.Reader, timestamps bool, since string if err := dec.Decode(&e); err == io.EOF { break } else if err != nil { - return err - } - - if buff == nil { - // Write out the entry directly - err := writeEntry(&e, stdout, stderr, now, timestamps, since, until) + line, err := io.ReadAll(dec.Buffered()) if err != nil { - log.L.Errorf("error while writing log entry to output stream: %s", err) + return nil, err } - } else { - // Else place the entry in a ring buffer - buff.Value = &e - buff = buff.Next() + return line, err } - } - if buff != nil { - // The ring should now contain up to `tail` elements and be set to - // internally point to the oldest element in the ring. - buff.Do(func(e interface{}) { - if e == nil { - // unallocated ring element - return - } - cast, ok := e.(*Entry) - if !ok { - log.L.Errorf("failed to cast Entry struct: %#v", e) - return - } - - err := writeEntry(cast, stdout, stderr, now, timestamps, since, until) - if err != nil { - log.L.Errorf("error while writing log entry to output stream: %s", err) - } - }) + // Write out the entry directly + err := writeEntry(&e, stdout, stderr, now, timestamps, since, until) + if err != nil { + log.L.Errorf("error while writing log entry to output stream: %s", err) + } } - return nil + + return nil, nil } diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index c89dbd6085c..3644d8b79a6 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -27,10 +27,12 @@ import ( "path/filepath" "sort" "sync" + "time" "github.com/containerd/containerd/runtime/v2/logging" "github.com/containerd/errdefs" "github.com/containerd/log" + "github.com/fsnotify/fsnotify" "github.com/muesli/cancelreader" ) @@ -217,3 +219,45 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { return nil }, nil } + +func NewLogFileWatcher(dir string) (*fsnotify.Watcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("failed to create fsnotify watcher: %v", err) + } + if err = watcher.Add(dir); err != nil { + watcher.Close() + return nil, fmt.Errorf("failed to watch directory %q: %w", dir, err) + } + return watcher, nil +} + +// startTail wait for the next log write. +// the boolean value indicates if the log file was recreated; +// the error is error happens during waiting new logs. +func startTail(ctx context.Context, logName string, w *fsnotify.Watcher) (bool, error) { + errRetry := 5 + for { + select { + case <-ctx.Done(): + return false, fmt.Errorf("context cancelled") + case e := <-w.Events: + switch { + case e.Has(fsnotify.Write): + return false, nil + case e.Has(fsnotify.Create): + return filepath.Base(e.Name) == logName, nil + default: + log.L.Debugf("Received unexpected fsnotify event: %v, retrying", e) + } + case err := <-w.Errors: + log.L.Debugf("Received fsnotify watch error, retrying unless no more retries left, retries: %d, error: %s", errRetry, err) + if errRetry == 0 { + return false, err + } + errRetry-- + case <-time.After(logForceCheckPeriod): + return false, nil + } + } +} diff --git a/pkg/logging/logs_other.go b/pkg/logging/logs_other.go new file mode 100644 index 00000000000..6270efbe0fe --- /dev/null +++ b/pkg/logging/logs_other.go @@ -0,0 +1,34 @@ +//go:build !windows +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* +Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_other.go +Copyright The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 +*/ +package logging + +import ( + "os" +) + +func openFileShareDelete(path string) (*os.File, error) { + // Noop. Only relevant for Windows. + return os.Open(path) +} diff --git a/pkg/logging/logs_windows.go b/pkg/logging/logs_windows.go new file mode 100644 index 00000000000..262ac0c2d42 --- /dev/null +++ b/pkg/logging/logs_windows.go @@ -0,0 +1,54 @@ +//go:build windows +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* +Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_windows.go +Copyright The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 +*/ +package logging + +import ( + "os" + "syscall" +) + +// Based on Windows implementation of Windows' syscall.Open +// https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/syscall/syscall_windows.go;l=342 +// In addition to syscall.Open, this function also adds the syscall.FILE_SHARE_DELETE flag to sharemode, +// which will allow us to read from the file without blocking the file from being deleted or renamed. +// This is essential for Log Rotation which is done by renaming the open file. Without this, the file rename would fail. +func openFileShareDelete(path string) (*os.File, error) { + pathp, err := syscall.UTF16PtrFromString(path) + if err != nil { + return nil, err + } + + var access uint32 = syscall.GENERIC_READ + var sharemode uint32 = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE | syscall.FILE_SHARE_DELETE + var createmode uint32 = syscall.OPEN_EXISTING + var attrs uint32 = syscall.FILE_ATTRIBUTE_NORMAL + + handle, err := syscall.CreateFile(pathp, access, sharemode, nil, createmode, attrs, 0) + if err != nil { + return nil, err + } + + return os.NewFile(uintptr(handle), path), nil +}