Skip to content

Commit b7a96ea

Browse files
committed
fix: output log after log rotation
Signed-off-by: xyz-li <[email protected]>
1 parent 31352aa commit b7a96ea

File tree

7 files changed

+258
-2
lines changed

7 files changed

+258
-2
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ require (
3737
github.com/fahedouch/go-logrotate v0.2.1
3838
github.com/fatih/color v1.17.0
3939
github.com/fluent/fluent-logger-golang v1.9.0
40+
github.com/fsnotify/fsnotify v1.7.0
4041
github.com/ipfs/go-cid v0.4.1
4142
github.com/klauspost/compress v1.17.9
4243
github.com/mattn/go-isatty v0.0.20

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
111111
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
112112
github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/mldQDA7czhWXEg=
113113
github.com/fluent/fluent-logger-golang v1.9.0/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
114+
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
115+
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
114116
github.com/go-jose/go-jose/v4 v4.0.3 h1:o8aphO8Hv6RPmH+GfzVuyf7YXSBibp+8YyHdOoDESGo=
115117
github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
116118
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=

pkg/logging/cri_logger.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package logging
2525
import (
2626
"bufio"
2727
"bytes"
28+
"context"
2829
"errors"
2930
"fmt"
3031
"io"
@@ -35,6 +36,7 @@ import (
3536

3637
"github.com/containerd/log"
3738
"github.com/containerd/nerdctl/v2/pkg/logging/tail"
39+
"github.com/fsnotify/fsnotify"
3840
)
3941

4042
// LogStreamType is the type of the stream in CRI container log.
@@ -45,6 +47,9 @@ const (
4547
Stdout LogStreamType = "stdout"
4648
// Stderr is the stream type for stderr.
4749
Stderr LogStreamType = "stderr"
50+
51+
// logForceCheckPeriod is the period to check for a new read
52+
logForceCheckPeriod = 1 * time.Second
4853
)
4954

5055
// LogTag is the tag of a log line in CRI container log.
@@ -89,7 +94,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
8994
if err != nil {
9095
return fmt.Errorf("failed to open log file %q: %v", logPath, err)
9196
}
92-
defer f.Close()
97+
defer func() {
98+
f.Close()
99+
}()
93100

94101
// Search start point based on tail line.
95102
start, err := tail.FindTailLineStartIndex(f, opts.Tail)
@@ -101,6 +108,8 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
101108
return fmt.Errorf("failed to seek in log file %q: %v", logPath, err)
102109
}
103110

111+
var watcher *fsnotify.Watcher
112+
104113
limitedMode := (opts.Tail > 0) && (!opts.Follow)
105114
limitedNum := opts.Tail
106115
// Start parsing the logs.
@@ -110,6 +119,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
110119
isNewLine := true
111120
writer := newLogWriter(stdout, stderr, opts)
112121
msg := &logMessage{}
122+
baseName := filepath.Base(logPath)
123+
dir := filepath.Dir(logPath)
124+
113125
for {
114126
select {
115127
case <-stopChannel:
@@ -126,13 +138,39 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
126138
return fmt.Errorf("failed to read log file %q: %v", logPath, err)
127139
}
128140
if opts.Follow {
129-
130141
// Reset seek so that if this is an incomplete line,
131142
// it will be read again.
132143
if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
133144
return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err)
134145
}
135146

147+
if watcher == nil {
148+
// Initialize the watcher if it has not been initialized yet.
149+
if watcher, err = NewLogFileWatcher(dir); err != nil {
150+
return err
151+
}
152+
defer watcher.Close()
153+
// If we just created the watcher, try again to read as we might have missed
154+
// the event.
155+
continue
156+
}
157+
158+
var recreated bool
159+
// Wait until the next log change.
160+
recreated, err = startTail(context.Background(), baseName, watcher)
161+
if err != nil {
162+
return err
163+
}
164+
if recreated {
165+
newF, err := openFileShareDelete(logPath)
166+
if err != nil {
167+
return fmt.Errorf("failed to open log file %q: %v", logPath, err)
168+
}
169+
f.Close()
170+
f = newF
171+
r = bufio.NewReader(f)
172+
}
173+
136174
// If the container exited consume data until the next EOF
137175
continue
138176
}

pkg/logging/cri_logger_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"fmt"
2929
"io"
3030
"os"
31+
"path/filepath"
3132
"reflect"
3233
"testing"
3334
"time"
@@ -230,3 +231,85 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) {
230231
t.Errorf("should have two lines, lineCount= %d", lineCount)
231232
}
232233
}
234+
235+
func TestReadRotatedLog(t *testing.T) {
236+
tmpDir := t.TempDir()
237+
file, err := os.CreateTemp(tmpDir, "logfile")
238+
if err != nil {
239+
t.Errorf("unable to create temp file, error: %s", err.Error())
240+
}
241+
stdoutBuf := &bytes.Buffer{}
242+
stderrBuf := &bytes.Buffer{}
243+
containerStoped := make(chan os.Signal)
244+
// Start to follow the container's log.
245+
fileName := file.Name()
246+
go func() {
247+
lvOpts := &LogViewOptions{
248+
Follow: true,
249+
LogPath: fileName,
250+
}
251+
_ = ReadLogs(lvOpts, stdoutBuf, stderrBuf, containerStoped)
252+
}()
253+
254+
// log in stdout
255+
expectedStdout := "line0line2line4line6line8"
256+
// log in stderr
257+
expectedStderr := "line1line3line5line7line9"
258+
259+
dir := filepath.Dir(file.Name())
260+
baseName := filepath.Base(file.Name())
261+
262+
// Write 10 lines to log file.
263+
// Let ReadLogs start.
264+
time.Sleep(50 * time.Millisecond)
265+
266+
for line := 0; line < 10; line++ {
267+
// Write the first three lines to log file
268+
now := time.Now().Format(time.RFC3339Nano)
269+
if line%2 == 0 {
270+
file.WriteString(fmt.Sprintf(
271+
"%s stdout P line%d\n", now, line))
272+
} else {
273+
file.WriteString(fmt.Sprintf(
274+
"%s stderr P line%d\n", now, line))
275+
}
276+
277+
time.Sleep(1 * time.Millisecond)
278+
279+
if line == 5 {
280+
file.Close()
281+
// Pretend to rotate the log.
282+
rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405"))
283+
rotatedName = filepath.Join(dir, rotatedName)
284+
if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil {
285+
t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error())
286+
return
287+
}
288+
289+
time.Sleep(20 * time.Millisecond)
290+
newF := filepath.Join(dir, baseName)
291+
if file, err = os.Create(newF); err != nil {
292+
t.Errorf("unable to create new log file, error: %s", err.Error())
293+
return
294+
}
295+
}
296+
}
297+
298+
// Finished writing into the file, close it, so we can delete it later.
299+
err = file.Close()
300+
if err != nil {
301+
t.Errorf("could not close file, error: %s", err.Error())
302+
}
303+
304+
time.Sleep(2 * time.Second)
305+
// Make the function ReadLogs end.
306+
close(containerStoped)
307+
308+
if expectedStdout != stdoutBuf.String() {
309+
t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String())
310+
}
311+
312+
if expectedStderr != stderrBuf.String() {
313+
t.Errorf("expected: %s, acoutal: %s", expectedStderr, stderrBuf.String())
314+
}
315+
}

pkg/logging/logging.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
"path/filepath"
2828
"sort"
2929
"sync"
30+
"time"
3031

3132
"github.com/containerd/containerd/errdefs"
3233
"github.com/containerd/containerd/runtime/v2/logging"
3334
"github.com/containerd/log"
35+
"github.com/fsnotify/fsnotify"
3436
"github.com/muesli/cancelreader"
3537
)
3638

@@ -217,3 +219,45 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
217219
return nil
218220
}, nil
219221
}
222+
223+
func NewLogFileWatcher(dir string) (*fsnotify.Watcher, error) {
224+
watcher, err := fsnotify.NewWatcher()
225+
if err != nil {
226+
return nil, fmt.Errorf("failed to create fsnotify watcher: %v", err)
227+
}
228+
if err = watcher.Add(dir); err != nil {
229+
watcher.Close()
230+
return nil, fmt.Errorf("failed to watch directory %q: %w", dir, err)
231+
}
232+
return watcher, nil
233+
}
234+
235+
// startTail wait for the next log write.
236+
// the boolean value indicates if the log file was recreated;
237+
// the error is error happens during waiting new logs.
238+
func startTail(ctx context.Context, logName string, w *fsnotify.Watcher) (bool, error) {
239+
errRetry := 5
240+
for {
241+
select {
242+
case <-ctx.Done():
243+
return false, fmt.Errorf("context cancelled")
244+
case e := <-w.Events:
245+
switch {
246+
case e.Has(fsnotify.Write):
247+
return false, nil
248+
case e.Has(fsnotify.Create):
249+
return filepath.Base(e.Name) == logName, nil
250+
default:
251+
log.L.Debugf("Received unexpected fsnotify event: %v, retrying", e)
252+
}
253+
case err := <-w.Errors:
254+
log.L.Debugf("Received fsnotify watch error, retrying unless no more retries left, retries: %d, error: %s", errRetry, err)
255+
if errRetry == 0 {
256+
return false, err
257+
}
258+
errRetry--
259+
case <-time.After(logForceCheckPeriod):
260+
return false, nil
261+
}
262+
}
263+
}

pkg/logging/logs_other.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//go:build !windows
2+
// +build !windows
3+
4+
/*
5+
Copyright The containerd Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
/*
21+
Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_other.go
22+
Copyright The Kubernetes Authors.
23+
Licensed under the Apache License, Version 2.0
24+
*/
25+
package logging
26+
27+
import (
28+
"os"
29+
)
30+
31+
func openFileShareDelete(path string) (*os.File, error) {
32+
// Noop. Only relevant for Windows.
33+
return os.Open(path)
34+
}

pkg/logging/logs_windows.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//go:build windows
2+
// +build windows
3+
4+
/*
5+
Copyright The containerd Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
/*
21+
Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_windows.go
22+
Copyright The Kubernetes Authors.
23+
Licensed under the Apache License, Version 2.0
24+
*/
25+
package logging
26+
27+
import (
28+
"os"
29+
"syscall"
30+
)
31+
32+
// Based on Windows implementation of Windows' syscall.Open
33+
// https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/syscall/syscall_windows.go;l=342
34+
// In addition to syscall.Open, this function also adds the syscall.FILE_SHARE_DELETE flag to sharemode,
35+
// which will allow us to read from the file without blocking the file from being deleted or renamed.
36+
// This is essential for Log Rotation which is done by renaming the open file. Without this, the file rename would fail.
37+
func openFileShareDelete(path string) (*os.File, error) {
38+
pathp, err := syscall.UTF16PtrFromString(path)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
var access uint32 = syscall.GENERIC_READ
44+
var sharemode uint32 = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE | syscall.FILE_SHARE_DELETE
45+
var createmode uint32 = syscall.OPEN_EXISTING
46+
var attrs uint32 = syscall.FILE_ATTRIBUTE_NORMAL
47+
48+
handle, err := syscall.CreateFile(pathp, access, sharemode, nil, createmode, attrs, 0)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
return os.NewFile(uintptr(handle), path), nil
54+
}

0 commit comments

Comments
 (0)