Skip to content

Commit be6c516

Browse files
authored
Merge pull request #41 from euank/kmsg-parser
logwatchers: add new kmsg-based kernel log watcher
2 parents 9f20903 + 73cba49 commit be6c516

File tree

9 files changed

+777
-10
lines changed

9 files changed

+777
-10
lines changed

Godeps/Godeps.json

+6-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node-problem-detector.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ spec:
2727
- name: log
2828
mountPath: /var/log
2929
readOnly: true
30+
- name: kmsg
31+
mountPath: /dev/kmsg
32+
readOnly: true
3033
# Make sure node problem detector is in the same timezone
3134
# with the host.
3235
- name: localtime
@@ -40,6 +43,9 @@ spec:
4043
# Config `log` to your system log directory
4144
hostPath:
4245
path: /var/log/
46+
- name: kmsg
47+
hostPath:
48+
path: /dev/kmsg
4349
- name: localtime
4450
hostPath:
4551
path: /etc/localtime

pkg/systemlogmonitor/README.md

+11-9
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ the configuration files. (
88
[`config/kernel-monitor.json`](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) as an example).
99
The rule list is extensible.
1010

11-
## Limitations
11+
## Supported sources
1212

13-
* System Log Monitor only supports file based log and journald now, but it is easy
14-
to extend it with [new log watcher](#new-log-watcher)
13+
* System Log Monitor currently supports file-based logs, journald, and kmsg.
14+
Additional sources can be added by implementing a [new log
15+
watcher](#new-log-watcher).
1516

1617
## Add New NodeConditions
1718

@@ -44,10 +45,10 @@ with new rule definition:
4445

4546
System log monitor supports different log management tools with different log
4647
watchers:
47-
* [filelog](https://github.com/kubernetes/node-problem-detector/blob/master/pkg/systemlogmonitor/logwatchers/filelog): Log watcher for
48+
* [filelog](./logwatchers/filelog): Log watcher for
4849
arbitrary file based log.
49-
* [journald](https://github.com/kubernetes/node-problem-detector/blob/master/pkg/systemlogmonitor/logwatchers/journald): Log watcher for
50-
journald.
50+
* [journald](.//logwatchers/journald): Log watcher for
51+
* [kmsg](./logwatchers/kmsg): Log watcher for the kernel ring buffer device, /dev/kmsg.
5152
Set `plugin` in the configuration file to specify log watcher.
5253

5354
### Plugin Configuration
@@ -66,6 +67,7 @@ Log watcher specific configurations are configured in `pluginConfig`.
6667
* timestampFormat: The format of the timestamp. The format string is the time
6768
`2006-01-02T15:04:05Z07:00` in the expected format. (See
6869
[golang timestamp format](https://golang.org/pkg/time/#pkg-constants))
70+
* **kmsg**
6971

7072
### Change Log Path
7173

@@ -78,6 +80,6 @@ field in the configurtion file is the log path. You can always configure
7880

7981
### New Log Watcher
8082

81-
System log monitor uses [Log
82-
Watcher](https://github.com/kubernetes/node-problem-detector/blob/master/pkg/systemlogmonitor/logwatchers/types/log_watcher.go) to support different log management tools.
83-
It is easy to implement a new log watcher.
83+
System log monitor uses [Log Watcher](./logwatchers/types/log_watcher.go) to
84+
support different log management tools. It is easy to implement a new log
85+
watcher.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kmsg
18+
19+
import (
20+
"bufio"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
utilclock "code.cloudfoundry.org/clock"
26+
"github.com/euank/go-kmsg-parser/kmsgparser"
27+
"github.com/golang/glog"
28+
29+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
30+
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
31+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/util"
32+
)
33+
34+
type kernelLogWatcher struct {
35+
cfg types.WatcherConfig
36+
logCh chan *logtypes.Log
37+
tomb *util.Tomb
38+
reader *bufio.Reader
39+
40+
kmsgParser kmsgparser.Parser
41+
clock utilclock.Clock
42+
}
43+
44+
// NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg
45+
func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher {
46+
kmsgparser.NewParser()
47+
return &kernelLogWatcher{
48+
cfg: cfg,
49+
tomb: util.NewTomb(),
50+
// Arbitrary capacity
51+
logCh: make(chan *logtypes.Log, 100),
52+
clock: utilclock.NewClock(),
53+
}
54+
}
55+
56+
var _ types.WatcherCreateFunc = NewKmsgWatcher
57+
58+
func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) {
59+
if k.kmsgParser == nil {
60+
// nil-check to make mocking easier
61+
parser, err := kmsgparser.NewParser()
62+
if err != nil {
63+
return nil, fmt.Errorf("failed to create kmsg parser: %v", err)
64+
}
65+
k.kmsgParser = parser
66+
}
67+
68+
lookback, err := time.ParseDuration(k.cfg.Lookback)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", k.cfg.Lookback, err)
71+
}
72+
73+
go k.watchLoop(lookback)
74+
return k.logCh, nil
75+
}
76+
77+
// Stop closes the kmsgparser
78+
func (k *kernelLogWatcher) Stop() {
79+
k.kmsgParser.Close()
80+
k.tomb.Stop()
81+
}
82+
83+
// watchLoop is the main watch loop of kernel log watcher.
84+
func (k *kernelLogWatcher) watchLoop(lookback time.Duration) {
85+
defer func() {
86+
close(k.logCh)
87+
k.tomb.Done()
88+
}()
89+
kmsgs := k.kmsgParser.Parse()
90+
91+
for {
92+
select {
93+
case <-k.tomb.Stopping():
94+
glog.Infof("Stop watching kernel log")
95+
k.kmsgParser.Close()
96+
return
97+
case msg := <-kmsgs:
98+
glog.V(5).Infof("got kernel message: %+v", msg)
99+
if msg.Message == "" {
100+
continue
101+
}
102+
103+
// Discard too old messages
104+
if k.clock.Since(msg.Timestamp) > lookback {
105+
glog.V(5).Infof("throwing away msg %v for being too old: %v > %v", msg.Message, msg.Timestamp.String(), lookback.String())
106+
continue
107+
}
108+
109+
k.logCh <- &logtypes.Log{
110+
Message: strings.TrimSpace(msg.Message),
111+
Timestamp: msg.Timestamp,
112+
}
113+
}
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kmsg
18+
19+
import (
20+
"io"
21+
"testing"
22+
23+
"code.cloudfoundry.org/clock/fakeclock"
24+
"github.com/euank/go-kmsg-parser/kmsgparser"
25+
"github.com/stretchr/testify/assert"
26+
27+
"time"
28+
29+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
30+
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
31+
)
32+
33+
type mockKmsgParser struct {
34+
kmsgs []kmsgparser.Message
35+
}
36+
37+
func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {}
38+
func (m *mockKmsgParser) Close() error { return nil }
39+
func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message {
40+
c := make(chan kmsgparser.Message)
41+
go func() {
42+
for _, msg := range m.kmsgs {
43+
c <- msg
44+
}
45+
}()
46+
return c
47+
}
48+
func (m *mockKmsgParser) SeekEnd() error { return nil }
49+
50+
func TestWatch(t *testing.T) {
51+
now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local)
52+
fakeClock := fakeclock.NewFakeClock(now)
53+
testCases := []struct {
54+
log *mockKmsgParser
55+
logs []logtypes.Log
56+
lookback string
57+
}{
58+
{
59+
// The start point is at the head of the log file.
60+
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
61+
{Message: "1", Timestamp: now.Add(0 * time.Second)},
62+
{Message: "2", Timestamp: now.Add(1 * time.Second)},
63+
{Message: "3", Timestamp: now.Add(2 * time.Second)},
64+
}},
65+
logs: []logtypes.Log{
66+
{
67+
Timestamp: now,
68+
Message: "1",
69+
},
70+
{
71+
Timestamp: now.Add(time.Second),
72+
Message: "2",
73+
},
74+
{
75+
Timestamp: now.Add(2 * time.Second),
76+
Message: "3",
77+
},
78+
},
79+
lookback: "0",
80+
},
81+
{
82+
// The start point is in the middle of the log file.
83+
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
84+
{Message: "1", Timestamp: now.Add(-1 * time.Second)},
85+
{Message: "2", Timestamp: now.Add(0 * time.Second)},
86+
{Message: "3", Timestamp: now.Add(1 * time.Second)},
87+
}},
88+
logs: []logtypes.Log{
89+
{
90+
Timestamp: now,
91+
Message: "2",
92+
},
93+
{
94+
Timestamp: now.Add(time.Second),
95+
Message: "3",
96+
},
97+
},
98+
lookback: "0",
99+
},
100+
{
101+
// The start point is at the end of the log file, but we look back.
102+
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
103+
{Message: "1", Timestamp: now.Add(-2 * time.Second)},
104+
{Message: "2", Timestamp: now.Add(-1 * time.Second)},
105+
{Message: "3", Timestamp: now.Add(0 * time.Second)},
106+
}},
107+
lookback: "1s",
108+
logs: []logtypes.Log{
109+
{
110+
Timestamp: now.Add(-time.Second),
111+
Message: "2",
112+
},
113+
{
114+
Timestamp: now,
115+
Message: "3",
116+
},
117+
},
118+
},
119+
}
120+
for _, test := range testCases {
121+
w := NewKmsgWatcher(types.WatcherConfig{Lookback: test.lookback})
122+
w.(*kernelLogWatcher).clock = fakeClock
123+
w.(*kernelLogWatcher).kmsgParser = test.log
124+
logCh, err := w.Watch()
125+
if err != nil {
126+
t.Fatal(err)
127+
}
128+
defer w.Stop()
129+
for _, expected := range test.logs {
130+
got := <-logCh
131+
assert.Equal(t, &expected, got)
132+
}
133+
// The log channel should have already been drained
134+
// There could stil be future messages sent into the channel, but the chance is really slim.
135+
timeout := time.After(100 * time.Millisecond)
136+
select {
137+
case log := <-logCh:
138+
t.Errorf("unexpected extra log: %+v", *log)
139+
case <-timeout:
140+
}
141+
}
142+
}
143+
144+
type fakeKmsgReader struct {
145+
logLines []string
146+
}
147+
148+
func (r *fakeKmsgReader) Read(data []byte) (int, error) {
149+
if len(r.logLines) == 0 {
150+
return 0, io.EOF
151+
}
152+
l := r.logLines[0]
153+
r.logLines = r.logLines[1:]
154+
copy(data, []byte(l))
155+
return len(l), nil
156+
}
157+
158+
func (r *fakeKmsgReader) Close() error {
159+
return nil
160+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package logwatchers
18+
19+
import "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg"
20+
21+
func init() {
22+
registerLogWatcher("kmsg", kmsg.NewKmsgWatcher)
23+
}

0 commit comments

Comments
 (0)