Skip to content

Commit 5d502c2

Browse files
committed
add CriType and NodeLabels in cache
1 parent 14e96f0 commit 5d502c2

File tree

5 files changed

+49
-31
lines changed

5 files changed

+49
-31
lines changed

plugin/input/k8s/k8s.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
189189
meta.EnableGatherer(p.logger)
190190
}
191191

192-
if meta.CriType == "docker" {
192+
if meta.MetaData.CriType == "docker" {
193193
p.params.Controller.SuggestDecoder(decoder.JSON)
194194
} else {
195195
p.params.Controller.SuggestDecoder(decoder.CRI)

plugin/input/k8s/k8s_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,5 +242,5 @@ func TestCleanUp(t *testing.T) {
242242

243243
meta.DisableGatherer()
244244
p.Stop()
245-
assert.Equal(t, 0, len(meta.MetaData))
245+
assert.Equal(t, 0, len(meta.MetaData.PodMeta))
246246
}

plugin/input/k8s/meta/gatherer.go

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ import (
2424
"k8s.io/client-go/tools/clientcmd"
2525
)
2626

27-
const metaFileTempSuffix = ".atomic"
27+
const (
28+
metaFileTempSuffix = ".atomic"
29+
30+
defaultCriType = "docker"
31+
)
2832

2933
type (
3034
PodName string
@@ -45,11 +49,21 @@ type (
4549
}
4650

4751
meta map[Namespace]map[PodName]map[ContainerID]*podMeta
52+
53+
metaCache struct {
54+
PodMeta meta
55+
CriType string
56+
NodeLabels map[string]string
57+
}
4858
)
4959

5060
var (
51-
client *kubernetes.Clientset
52-
MetaData = make(meta)
61+
client *kubernetes.Clientset
62+
MetaData = &metaCache{
63+
PodMeta: make(meta),
64+
CriType: "",
65+
NodeLabels: nil,
66+
}
5367
metaDataMu = &sync.RWMutex{}
5468
MetaFileSaver = &MetaSaver{}
5569

@@ -82,9 +96,6 @@ var (
8296
expiredItemsCounter atomic.Int64
8397
deletedPodsCounter atomic.Int64
8498

85-
CriType = "docker"
86-
NodeLabels = make(map[string]string)
87-
8899
SelfNodeName string
89100

90101
localLogger *zap.SugaredLogger
@@ -160,7 +171,7 @@ func initNodeInfo(ctx context.Context) {
160171
}
161172

162173
ns := getNamespace()
163-
SelfNodeName = getNodeName(Namespace(ns), PodName(podName))
174+
SelfNodeName = getNodeName(Namespace(ns))
164175
if SelfNodeName != "" {
165176
return
166177
}
@@ -220,6 +231,10 @@ func initInformer() {
220231
}
221232

222233
func initRuntime(ctx context.Context) {
234+
if MetaData.NodeLabels != nil && MetaData.CriType != "" {
235+
return
236+
}
237+
223238
node, err := client.CoreV1().Nodes().Get(ctx, SelfNodeName, metav1.GetOptions{})
224239
if err != nil || node == nil {
225240
localLogger.Fatalf("can't detect CRI runtime for node %s, api call is unsuccessful: %s", node, err.Error())
@@ -231,8 +246,8 @@ func initRuntime(ctx context.Context) {
231246
localLogger.Fatalf("can't detect CRI runtime for node %s, wrong runtime version: %s", node, runtimeVer)
232247
}
233248

234-
NodeLabels = node.Labels
235-
CriType = runtimeVer[:pos]
249+
MetaData.NodeLabels = node.Labels
250+
MetaData.CriType = runtimeVer[:pos]
236251
}
237252

238253
func removeExpired() {
@@ -280,7 +295,7 @@ func getTotalItems() int {
280295
defer metaDataMu.RUnlock()
281296

282297
totalItems := 0
283-
for _, podNames := range MetaData {
298+
for _, podNames := range MetaData.PodMeta {
284299
for _, containerIDs := range podNames {
285300
totalItems += len(containerIDs)
286301
}
@@ -296,7 +311,7 @@ func getExpiredItems(out []*MetaItem) []*MetaItem {
296311
defer metaDataMu.RUnlock()
297312

298313
// find pods which aren't in k8s pod list for some time and add them to the expiration list
299-
for ns, podNames := range MetaData {
314+
for ns, podNames := range MetaData.PodMeta {
300315
for pod, containerIDs := range podNames {
301316
for cid, podData := range containerIDs {
302317
if now.Sub(podData.updateTime) > MetaExpireDuration {
@@ -319,14 +334,14 @@ func cleanUpItems(items []*MetaItem) {
319334

320335
for _, item := range items {
321336
expiredItemsCounter.Inc()
322-
delete(MetaData[item.Namespace][item.PodName], item.ContainerID)
337+
delete(MetaData.PodMeta[item.Namespace][item.PodName], item.ContainerID)
323338

324-
if len(MetaData[item.Namespace][item.PodName]) == 0 {
325-
delete(MetaData[item.Namespace], item.PodName)
339+
if len(MetaData.PodMeta[item.Namespace][item.PodName]) == 0 {
340+
delete(MetaData.PodMeta[item.Namespace], item.PodName)
326341
}
327342

328-
if len(MetaData[item.Namespace]) == 0 {
329-
delete(MetaData, item.Namespace)
343+
if len(MetaData.PodMeta[item.Namespace]) == 0 {
344+
delete(MetaData.PodMeta, item.Namespace)
330345
}
331346
}
332347
}
@@ -339,7 +354,7 @@ func GetPodMeta(ns Namespace, pod PodName, cid ContainerID) (bool, *podMeta) {
339354
canUpdateMetaDataBeforeRetries := canUpdateMetaData.Load()
340355
for {
341356
metaDataMu.RLock()
342-
pm, has := MetaData[ns][pod][cid]
357+
pm, has := MetaData.PodMeta[ns][pod][cid]
343358
isDeleted := deletedPodsCache.Contains(pod)
344359
metaDataMu.RUnlock()
345360

@@ -391,11 +406,11 @@ func PutMeta(podData *corev1.Pod) {
391406
ns := Namespace(podCopy.Namespace)
392407

393408
metaDataMu.Lock()
394-
if MetaData[ns] == nil {
395-
MetaData[ns] = make(map[PodName]map[ContainerID]*podMeta)
409+
if MetaData.PodMeta[ns] == nil {
410+
MetaData.PodMeta[ns] = make(map[PodName]map[ContainerID]*podMeta)
396411
}
397-
if MetaData[ns][pod] == nil {
398-
MetaData[ns][pod] = make(map[ContainerID]*podMeta)
412+
if MetaData.PodMeta[ns][pod] == nil {
413+
MetaData.PodMeta[ns][pod] = make(map[ContainerID]*podMeta)
399414
}
400415
metaDataMu.Unlock()
401416

@@ -453,7 +468,7 @@ func putContainerMeta(ns Namespace, pod PodName, fullContainerID string, podInfo
453468
}
454469

455470
metaDataMu.Lock()
456-
MetaData[ns][pod][containerID] = meta
471+
MetaData.PodMeta[ns][pod][containerID] = meta
457472
metaDataMu.Unlock()
458473
}
459474

@@ -465,10 +480,12 @@ func getNamespace() string {
465480
return strings.TrimSpace(string(data))
466481
}
467482

468-
func getNodeName(ns Namespace, podName PodName) string {
469-
for _, podData := range MetaData[ns][podName] {
470-
if podData.Spec.NodeName != "" {
471-
return podData.Spec.NodeName
483+
func getNodeName(ns Namespace) string {
484+
for _, containerIDs := range MetaData.PodMeta[ns] {
485+
for _, podData := range containerIDs {
486+
if podData.Spec.NodeName != "" {
487+
return podData.Spec.NodeName
488+
}
472489
}
473490
}
474491

@@ -505,6 +522,7 @@ func (ms *MetaSaver) saveMetaFile() {
505522
metaDataMu.RLock()
506523
buf, err := json.MarshalIndent(MetaData, "", " ")
507524
metaDataMu.RUnlock()
525+
508526
if err != nil {
509527
localLogger.Errorf("can't marshall k8s meta map into json: %s", err.Error())
510528
}
@@ -548,7 +566,7 @@ func (ms *MetaSaver) loadMeta() error {
548566
return fmt.Errorf("can't unmarshal map: %w", err)
549567
}
550568

551-
for _, podNames := range MetaData {
569+
for _, podNames := range MetaData.PodMeta {
552570
for _, containerIDs := range podNames {
553571
for _, podData := range containerIDs {
554572
podData.updateTime = time.Now()

plugin/input/k8s/multiline_action.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (p *MultilineAction) Do(event *pipeline.Event) pipeline.ActionResult {
185185
event.Root.AddFieldNoAlloc(event.Root, pipeline.ByteToStringUnsafe(event.Buf[l:])).MutateToString(labelValue)
186186
}
187187

188-
for labelName, labelValue := range meta.NodeLabels {
188+
for labelName, labelValue := range meta.MetaData.NodeLabels {
189189
if len(p.allowedNodeLabels) != 0 {
190190
_, has := p.allowedNodeLabels[labelName]
191191

plugin/input/k8s/multiline_action_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestMultilineAction_Do(t *testing.T) {
3131
filename := getLogFilename("k8s", item)
3232
meta.PutMeta(getPodInfo(item, true))
3333
meta.SelfNodeName = "node_1"
34-
meta.NodeLabels = map[string]string{"zone": "z34"}
34+
meta.MetaData.NodeLabels = map[string]string{"zone": "z34"}
3535

3636
tcs := []struct {
3737
Name string

0 commit comments

Comments
 (0)