Skip to content

Commit

Permalink
feat: tail JSON log
Browse files Browse the repository at this point in the history
Signed-off-by: xyz-li <[email protected]>
  • Loading branch information
xyz-li committed Jul 16, 2024
1 parent b7a96ea commit 278e20e
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 143 deletions.
178 changes: 74 additions & 104 deletions pkg/logging/json_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@
package logging

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/containerd/log"
"github.com/containerd/nerdctl/v2/pkg/logging/jsonfile"
"github.com/containerd/nerdctl/v2/pkg/logging/tail"
"github.com/containerd/nerdctl/v2/pkg/strutil"
"github.com/docker/go-units"
"github.com/fahedouch/go-logrotate"
"github.com/fsnotify/fsnotify"
)

var JSONDriverLogOpts = []string{
Expand Down Expand Up @@ -142,9 +142,6 @@ func viewLogsJSONFile(lvopts LogViewOptions, stdout, stderr io.Writer, stopChann
}
}

if checkExecutableAvailableInPath("tail") {
return viewLogsJSONFileThroughTailExec(lvopts, logFilePath, stdout, stderr, stopChannel)
}
return viewLogsJSONFileDirect(lvopts, logFilePath, stdout, stderr, stopChannel)
}

Expand All @@ -156,118 +153,91 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou
if err != nil {
return err
}
defer fin.Close()
err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail)
if err != nil {
return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err)
}
defer func() { fin.Close() }()

if lvopts.Follow {
// Get the current file handler's seek.
lastPos, err := fin.Seek(0, io.SeekCurrent)
if err != nil {
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err)
}
fin.Close()
for {
select {
case <-stopChannel:
log.L.Debugf("received stop signal while re-reading JSON logfile, returning")
// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(fin, lvopts.Tail)
if err != nil {
return fmt.Errorf("failed to tail %d lines of JSON logfile %q: %v", lvopts.Tail, jsonLogFilePath, err)
}

if _, err := fin.Seek(start, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek in log file %q: %v", jsonLogFilePath, err)
}

limitedMode := (lvopts.Tail > 0) && (!lvopts.Follow)
limitedNum := lvopts.Tail
var stop bool
var watcher *fsnotify.Watcher
baseName := filepath.Base(jsonLogFilePath)
dir := filepath.Dir(jsonLogFilePath)
retryTimes := 2
backBytes := 0

for {
select {
case <-stopChannel:
log.L.Debugf("received stop signal while re-reading JSON logfile, returning")
return nil
default:
if stop || (limitedMode && limitedNum == 0) {
log.L.Debugf("finished parsing log JSON filefile, path: %s", jsonLogFilePath)
return nil
default:
// Re-open the file and seek to the last-consumed offset.
fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err)
}

if line, err := jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until); err != nil {
if len(line) > 0 {
time.Sleep(5 * time.Millisecond)
if retryTimes == 0 {
log.L.Infof("finished parsing log JSON filefile, path: %s, line: %s", jsonLogFilePath, string(line))
return fmt.Errorf("error occurred while doing read of JSON logfile %q: %s, retryTimes: %d", jsonLogFilePath, err, retryTimes)
}
retryTimes--
backBytes = len(line)
} else {
return fmt.Errorf("error occurred while doing read of JSON logfile %q: %s", jsonLogFilePath, err)
}
_, err = fin.Seek(lastPos, 0)
} else {
retryTimes = 2
backBytes = 0
}

if lvopts.Follow {
// Get the current file handler's seek.
lastPos, err := fin.Seek(int64(-backBytes), io.SeekCurrent)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err)
}

err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err)
if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = NewLogFileWatcher(dir); err != nil {
return err
}
defer watcher.Close()
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}

// Record current file seek position before looping again.
lastPos, err = fin.Seek(0, io.SeekCurrent)
var recreated bool
// Wait until the next log change.
recreated, err = startTail(context.Background(), baseName, watcher)
if err != nil {
return err
}
if recreated {
newF, err := openFileShareDelete(jsonLogFilePath)
if err != nil {
return fmt.Errorf("failed to open JSON logfile %q: %v", jsonLogFilePath, err)
}
fin.Close()
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err)
fin = newF
}
fin.Close()
continue
}
stop = true
// Give the OS a second to breathe before re-opening the file:
time.Sleep(time.Second)
}
}
return nil
}

// Loads logs through the `tail` executable.
func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath string, stdout, stderr io.Writer, stopChannel chan os.Signal) error {
var args []string

args = append(args, "-n")
if lvopts.Tail == 0 {
args = append(args, "+0")
} else {
args = append(args, strconv.FormatUint(uint64(lvopts.Tail), 10))
}

if lvopts.Follow {
// using the `-F` to follow the file name instead of descriptor and retry if inaccessible
args = append(args, "-F")
}
args = append(args, jsonLogFilePath)
cmd := exec.Command("tail", args...)

cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return err
}

cmdStderr, err := cmd.StderrPipe()
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
return err
}

// filter the unwanted error message of the tail
go filterTailStderr(cmdStderr)

// Setup killing goroutine:
go func() {
<-stopChannel
log.L.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid)
cmd.Process.Kill()
}()

return jsonfile.Decode(stdout, stderr, cmdStdout, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0)
}

func filterTailStderr(reader io.Reader) error {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
if strings.HasSuffix(line, "has appeared; following new file") ||
strings.HasSuffix(line, "has become inaccessible: No such file or directory") ||
strings.HasSuffix(line, "has been replaced; following new file") ||
strings.HasSuffix(line, ": No such file or directory") {
continue
}
fmt.Fprintln(os.Stderr, line)
}

if err := scanner.Err(); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 278e20e

Please sign in to comment.