Skip to content

Commit 9c23921

Browse files
committed
logwatchers/kmsg: add initial kmsg watcher impl
This adds a logwatcher which is able to parse kernel messages directly from the /dev/kmsg interface. This supports any modern linux distro, while also avoiding any dependency on libraries (e.g. as journald needs).
1 parent 5422f63 commit 9c23921

File tree

3 files changed

+298
-0
lines changed

3 files changed

+298
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kmsg
18+
19+
import (
20+
"bufio"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
utilclock "code.cloudfoundry.org/clock"
26+
"github.com/euank/go-kmsg-parser/kmsgparser"
27+
"github.com/golang/glog"
28+
29+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
30+
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
31+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/util"
32+
)
33+
34+
type kernelLogWatcher struct {
35+
cfg types.WatcherConfig
36+
logCh chan *logtypes.Log
37+
tomb *util.Tomb
38+
reader *bufio.Reader
39+
40+
kmsgParser kmsgparser.Parser
41+
clock utilclock.Clock
42+
}
43+
44+
// NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg
45+
func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher {
46+
kmsgparser.NewParser()
47+
return &kernelLogWatcher{
48+
cfg: cfg,
49+
tomb: util.NewTomb(),
50+
// Arbitrary capacity
51+
logCh: make(chan *logtypes.Log, 100),
52+
clock: utilclock.NewClock(),
53+
}
54+
}
55+
56+
var _ types.WatcherCreateFunc = NewKmsgWatcher
57+
58+
func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) {
59+
if k.kmsgParser == nil {
60+
// nil-check to make mocking easier
61+
parser, err := kmsgparser.NewParser()
62+
if err != nil {
63+
return nil, fmt.Errorf("failed to create kmsg parser: %v", err)
64+
}
65+
k.kmsgParser = parser
66+
}
67+
68+
lookback, err := time.ParseDuration(k.cfg.Lookback)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", k.cfg.Lookback, err)
71+
}
72+
73+
go k.watchLoop(lookback)
74+
return k.logCh, nil
75+
}
76+
77+
// Stop closes the kmsgparser
78+
func (k *kernelLogWatcher) Stop() {
79+
k.kmsgParser.Close()
80+
k.tomb.Stop()
81+
}
82+
83+
// watchLoop is the main watch loop of kernel log watcher.
84+
func (k *kernelLogWatcher) watchLoop(lookback time.Duration) {
85+
defer func() {
86+
close(k.logCh)
87+
k.tomb.Done()
88+
}()
89+
kmsgs := k.kmsgParser.Parse()
90+
91+
for {
92+
select {
93+
case <-k.tomb.Stopping():
94+
glog.Infof("Stop watching kernel log")
95+
k.kmsgParser.Close()
96+
return
97+
case msg := <-kmsgs:
98+
glog.V(5).Infof("got kernel message: %+v", msg)
99+
if msg.Message == "" {
100+
continue
101+
}
102+
103+
// Discard too old messages
104+
if k.clock.Since(msg.Timestamp) > lookback {
105+
glog.V(5).Infof("throwing away msg %v for being too old: %v > %v", msg.Message, msg.Timestamp.String(), lookback.String())
106+
continue
107+
}
108+
109+
k.logCh <- &logtypes.Log{
110+
Message: strings.TrimSpace(msg.Message),
111+
Timestamp: msg.Timestamp,
112+
}
113+
}
114+
}
115+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kmsg
18+
19+
import (
20+
"io"
21+
"testing"
22+
23+
"code.cloudfoundry.org/clock/fakeclock"
24+
"github.com/euank/go-kmsg-parser/kmsgparser"
25+
"github.com/stretchr/testify/assert"
26+
27+
"time"
28+
29+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
30+
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
31+
)
32+
33+
type mockKmsgParser struct {
34+
kmsgs []kmsgparser.Message
35+
}
36+
37+
func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {}
38+
func (m *mockKmsgParser) Close() error { return nil }
39+
func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message {
40+
c := make(chan kmsgparser.Message)
41+
go func() {
42+
for _, msg := range m.kmsgs {
43+
c <- msg
44+
}
45+
}()
46+
return c
47+
}
48+
func (m *mockKmsgParser) SeekEnd() error { return nil }
49+
50+
func TestWatch(t *testing.T) {
51+
now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local)
52+
fakeClock := fakeclock.NewFakeClock(now)
53+
testCases := []struct {
54+
log *mockKmsgParser
55+
logs []logtypes.Log
56+
lookback string
57+
}{
58+
{
59+
// The start point is at the head of the log file.
60+
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
61+
{Message: "1", Timestamp: now.Add(0 * time.Second)},
62+
{Message: "2", Timestamp: now.Add(1 * time.Second)},
63+
{Message: "3", Timestamp: now.Add(2 * time.Second)},
64+
}},
65+
logs: []logtypes.Log{
66+
{
67+
Timestamp: now,
68+
Message: "1",
69+
},
70+
{
71+
Timestamp: now.Add(time.Second),
72+
Message: "2",
73+
},
74+
{
75+
Timestamp: now.Add(2 * time.Second),
76+
Message: "3",
77+
},
78+
},
79+
lookback: "0",
80+
},
81+
{
82+
// The start point is in the middle of the log file.
83+
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
84+
{Message: "1", Timestamp: now.Add(-1 * time.Second)},
85+
{Message: "2", Timestamp: now.Add(0 * time.Second)},
86+
{Message: "3", Timestamp: now.Add(1 * time.Second)},
87+
}},
88+
logs: []logtypes.Log{
89+
{
90+
Timestamp: now,
91+
Message: "2",
92+
},
93+
{
94+
Timestamp: now.Add(time.Second),
95+
Message: "3",
96+
},
97+
},
98+
lookback: "0",
99+
},
100+
{
101+
// The start point is at the end of the log file, but we look back.
102+
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
103+
{Message: "1", Timestamp: now.Add(-2 * time.Second)},
104+
{Message: "2", Timestamp: now.Add(-1 * time.Second)},
105+
{Message: "3", Timestamp: now.Add(0 * time.Second)},
106+
}},
107+
lookback: "1s",
108+
logs: []logtypes.Log{
109+
{
110+
Timestamp: now.Add(-time.Second),
111+
Message: "2",
112+
},
113+
{
114+
Timestamp: now,
115+
Message: "3",
116+
},
117+
},
118+
},
119+
}
120+
for _, test := range testCases {
121+
w := NewKmsgWatcher(types.WatcherConfig{Lookback: test.lookback})
122+
w.(*kernelLogWatcher).clock = fakeClock
123+
w.(*kernelLogWatcher).kmsgParser = test.log
124+
logCh, err := w.Watch()
125+
if err != nil {
126+
t.Fatal(err)
127+
}
128+
defer w.Stop()
129+
for _, expected := range test.logs {
130+
got := <-logCh
131+
assert.Equal(t, &expected, got)
132+
}
133+
// The log channel should have already been drained
134+
// There could stil be future messages sent into the channel, but the chance is really slim.
135+
timeout := time.After(100 * time.Millisecond)
136+
select {
137+
case log := <-logCh:
138+
t.Errorf("unexpected extra log: %+v", *log)
139+
case <-timeout:
140+
}
141+
}
142+
}
143+
144+
type fakeKmsgReader struct {
145+
logLines []string
146+
}
147+
148+
func (r *fakeKmsgReader) Read(data []byte) (int, error) {
149+
if len(r.logLines) == 0 {
150+
return 0, io.EOF
151+
}
152+
l := r.logLines[0]
153+
r.logLines = r.logLines[1:]
154+
copy(data, []byte(l))
155+
return len(l), nil
156+
}
157+
158+
func (r *fakeKmsgReader) Close() error {
159+
return nil
160+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package logwatchers
18+
19+
import "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg"
20+
21+
func init() {
22+
registerLogWatcher("kmsg", kmsg.NewKmsgWatcher)
23+
}

0 commit comments

Comments
 (0)