Skip to content

Commit 9a3050e

Browse files
authored
fix: firehose log filters and log options (#105)
* fix: firehose log filters * refactor: remove log options from labels in log response
1 parent 70a6339 commit 9a3050e

File tree

2 files changed

+98
-60
lines changed

2 files changed

+98
-60
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ require (
121121
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
122122
github.com/mitchellh/copystructure v1.2.0 // indirect
123123
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
124-
github.com/mitchellh/mapstructure v1.5.0 // indirect
124+
github.com/mitchellh/mapstructure v1.5.0
125125
github.com/mitchellh/reflectwalk v1.0.2 // indirect
126126
github.com/moby/locker v1.0.1 // indirect
127127
github.com/moby/spdystream v0.2.0 // indirect

pkg/kube/client.go

+97-59
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@ import (
1111
"time"
1212

1313
"github.com/mcuadros/go-defaults"
14+
"github.com/mitchellh/mapstructure"
1415
batchv1 "k8s.io/api/batch/v1"
1516
corev1 "k8s.io/api/core/v1"
1617
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/fields"
19+
"k8s.io/apimachinery/pkg/labels"
20+
"k8s.io/apimachinery/pkg/selection"
1721
"k8s.io/client-go/kubernetes"
1822
typedbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
1923
"k8s.io/client-go/rest"
@@ -25,6 +29,7 @@ const (
2529
bufferSize = 4096
2630
sleepTime = 500
2731
defaultTTLSecondsAfterFinished = 60
32+
trueString = "true"
2833
)
2934

3035
var (
@@ -48,6 +53,72 @@ type Pod struct {
4853
Containers []string `json:"containers"`
4954
}
5055

56+
type LogOptions struct {
57+
App string `mapstructure:"app"`
58+
Pod string `mapstructure:"pod"`
59+
Container string `mapstructure:"container"`
60+
Follow string `mapstructure:"follow"`
61+
Previous string `mapstructure:"previous"`
62+
SinceSeconds string `mapstructure:"since_seconds"`
63+
Timestamps string `mapstructure:"timestamps"`
64+
TailLines string `mapstructure:"tail_lines"`
65+
}
66+
67+
func (l LogOptions) getPodListOptions() (metav1.ListOptions, error) {
68+
labelSelector := labels.NewSelector()
69+
fieldSelector := fields.Everything()
70+
r, err := labels.NewRequirement("app", selection.Equals, []string{l.App})
71+
if err != nil {
72+
return metav1.ListOptions{}, err
73+
}
74+
labelSelector = labelSelector.Add(*r)
75+
76+
if l.Pod != "" {
77+
fieldSelector = fields.AndSelectors(fieldSelector, fields.OneTermEqualSelector("metadata.name", l.Pod))
78+
}
79+
80+
return metav1.ListOptions{
81+
LabelSelector: labelSelector.String(),
82+
FieldSelector: fieldSelector.String(),
83+
}, nil
84+
}
85+
86+
func (l LogOptions) getPodLogOptions() (*corev1.PodLogOptions, error) {
87+
podLogOpts := &corev1.PodLogOptions{
88+
Container: l.Container,
89+
}
90+
91+
if l.Follow == trueString {
92+
podLogOpts.Follow = true
93+
}
94+
95+
if l.Previous == trueString {
96+
podLogOpts.Previous = true
97+
}
98+
99+
if l.Timestamps == trueString {
100+
podLogOpts.Timestamps = true
101+
}
102+
103+
if l.SinceSeconds != "" {
104+
ss, err := strconv.ParseInt(l.SinceSeconds, 10, 64)
105+
if err != nil {
106+
return nil, err
107+
}
108+
podLogOpts.SinceSeconds = &ss
109+
}
110+
111+
if l.TailLines != "" {
112+
tl, err := strconv.ParseInt(l.TailLines, 10, 64)
113+
if err != nil {
114+
return nil, err
115+
}
116+
podLogOpts.TailLines = &tl
117+
}
118+
119+
return podLogOpts, nil
120+
}
121+
51122
func DefaultClientConfig() Config {
52123
var defaultProviderConfig Config
53124
defaults.SetDefaults(&defaultProviderConfig)
@@ -62,44 +133,14 @@ func NewClient(config Config) *Client {
62133
}
63134

64135
func (c Client) StreamLogs(ctx context.Context, namespace string, filter map[string]string) (<-chan LogChunk, error) {
65-
var selectors []string
66-
var podName, containerName, labelSelector, filedSelector string
67-
var sinceSeconds, tailLines int64
68-
var opts metav1.ListOptions
136+
var logOptions LogOptions
69137

70-
for k, v := range filter {
71-
switch k {
72-
case "pod":
73-
podName = v
74-
case "container":
75-
containerName = v
76-
case "sinceSeconds":
77-
i, err := strconv.ParseInt(v, 10, 64)
78-
if err != nil {
79-
return nil, errors.ErrInvalid.WithMsgf("invalid sinceSeconds filter value: %v", err)
80-
}
81-
sinceSeconds = i
82-
case "tailLine":
83-
i, err := strconv.ParseInt(v, 10, 64)
84-
if err != nil {
85-
return nil, errors.ErrInvalid.WithMsgf("invalid tailLine filter value: %v", err)
86-
}
87-
tailLines = i
88-
default:
89-
s := fmt.Sprintf("%s=%s", k, v)
90-
selectors = append(selectors, s)
91-
}
92-
}
93-
94-
if podName == "" {
95-
labelSelector = strings.Join(selectors, ",")
96-
opts = metav1.ListOptions{LabelSelector: labelSelector}
97-
} else {
98-
filedSelector = fmt.Sprintf("metadata.name=%s", podName)
99-
opts = metav1.ListOptions{FieldSelector: filedSelector}
138+
err := mapstructure.Decode(filter, &logOptions)
139+
if err != nil {
140+
return nil, errors.ErrInvalid.WithMsgf(err.Error())
100141
}
101142

102-
return c.streamFromPods(ctx, namespace, containerName, opts, tailLines, sinceSeconds, filter)
143+
return c.streamFromPods(ctx, namespace, logOptions)
103144
}
104145

105146
func (c Client) RunJob(ctx context.Context, namespace, name string, image string, cmd []string, retries int32) error {
@@ -170,13 +211,18 @@ func waitForJob(ctx context.Context, jobName string, jobs typedbatchv1.JobInterf
170211
}
171212
}
172213

173-
func (c Client) streamFromPods(ctx context.Context, namespace, containerName string, opts metav1.ListOptions, tailLines, sinceSeconds int64, filter map[string]string) (<-chan LogChunk, error) {
214+
func (c Client) streamFromPods(ctx context.Context, namespace string, logOptions LogOptions) (<-chan LogChunk, error) {
174215
clientSet, err := kubernetes.NewForConfig(&c.restConfig)
175216
if err != nil {
176217
return nil, err
177218
}
178219

179-
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, opts)
220+
listOpts, err := logOptions.getPodListOptions()
221+
if err != nil {
222+
return nil, err
223+
}
224+
225+
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, listOpts)
180226
if err != nil {
181227
return nil, err
182228
}
@@ -191,16 +237,21 @@ func (c Client) streamFromPods(ctx context.Context, namespace, containerName str
191237
wg := &sync.WaitGroup{}
192238
for _, pod := range pods.Items {
193239
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
194-
if containerName != "" && container.Name != containerName {
240+
if logOptions.Container != "" && container.Name != logOptions.Container {
195241
continue
196242
}
243+
plo, err := logOptions.getPodLogOptions()
244+
if err != nil {
245+
return nil, err
246+
}
247+
plo.Container = container.Name
197248
wg.Add(1)
198-
go func(podName string, c corev1.Container) {
249+
go func(podName string, plo corev1.PodLogOptions) {
199250
defer wg.Done()
200-
if err := streamContainerLogs(ctx, namespace, podName, logCh, streamingClientSet, c, tailLines, sinceSeconds, filter); err != nil {
201-
log.Printf("[WARN] failed to stream from container '%s':%s", c.Name, err)
251+
if err := streamContainerLogs(ctx, namespace, podName, logCh, streamingClientSet, plo); err != nil {
252+
log.Printf("[WARN] failed to stream from container '%s':%s", plo.Container, err)
202253
}
203-
}(pod.Name, container)
254+
}(pod.Name, *plo)
204255
}
205256
}
206257

@@ -249,27 +300,14 @@ func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelect
249300
return podDetails, nil
250301
}
251302

252-
func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- LogChunk, clientSet *kubernetes.Clientset, container corev1.Container, tailLines, sinceSeconds int64, filter map[string]string) error {
253-
podLogOpts := corev1.PodLogOptions{}
254-
podLogOpts.Follow = true
255-
podLogOpts.Container = container.Name
256-
257-
if sinceSeconds > 0 {
258-
podLogOpts.SinceSeconds = &sinceSeconds
259-
}
260-
261-
if tailLines > 0 {
262-
podLogOpts.TailLines = &tailLines
263-
}
264-
303+
func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- LogChunk, clientSet *kubernetes.Clientset,
304+
podLogOpts corev1.PodLogOptions,
305+
) error {
265306
podLogs, err := clientSet.CoreV1().Pods(ns).GetLogs(podName, &podLogOpts).Stream(ctx)
266307
if err != nil {
267308
return err
268309
}
269310

270-
filter["pod"] = podName
271-
filter["container"] = container.Name
272-
273311
buf := make([]byte, bufferSize)
274312
for {
275313
numBytes, err := podLogs.Read(buf)
@@ -285,7 +323,7 @@ func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- L
285323

286324
logChunk := LogChunk{
287325
Data: []byte(string(buf[:numBytes])),
288-
Labels: filter,
326+
Labels: map[string]string{"pod": podName, "container": podLogOpts.Container},
289327
}
290328

291329
select {

0 commit comments

Comments
 (0)