Skip to content

Commit 50c172f

Browse files
authored
feat(pipelineloop): Support logging to Object store. Built as an extension for zap. (#806)
* Object store logging as an extension for zap. * Do not add object store logger if it is not enabled.
1 parent ed7673c commit 50c172f

File tree

6 files changed

+281
-3
lines changed

6 files changed

+281
-3
lines changed

tekton-catalog/pipeline-loops/config/201-role.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ rules:
2828
- apiGroups: [""]
2929
resources: ["configmaps"]
3030
verbs: ["get"]
31-
resourceNames: ["config-leader-election", "config-logging", "config-observability"]
31+
resourceNames: ["config-leader-election", "config-logging", "config-observability", "object-store-config"]
3232
- apiGroups: ["policy"]
3333
resources: ["podsecuritypolicies"]
3434
resourceNames: ["tekton-pipelines"]
@@ -51,7 +51,7 @@ rules:
5151
- apiGroups: [""]
5252
resources: ["configmaps"]
5353
verbs: ["get"]
54-
resourceNames: ["config-logging", "config-observability", "config-leader-election"]
54+
resourceNames: ["config-logging", "config-observability", "config-leader-election", "object-store-config"]
5555
- apiGroups: [""]
5656
resources: ["secrets"]
5757
verbs: ["list", "watch"]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright 2020 The Knative Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: v1
16+
kind: ConfigMap
17+
metadata:
18+
name: object-store-config
19+
namespace: tekton-pipelines
20+
labels:
21+
app.kubernetes.io/instance: default
22+
app.kubernetes.io/part-of: tekton-pipelines-loops
23+
data:
24+
enable: "false"
25+
defaultBucketName: "pipelineloop-default"
26+
ibmStyleCredentials: "false"
27+
region: "us-south"
28+
accessKey: "<access key>"
29+
secretKey: "<secret key>"
30+
# Below are IBM cloud specific credentials, available if the flag ibmStyleCredentials is true.
31+
apiKey: "<APIKEY-dummy-1231231231-123abcdefgh>"
32+
serviceInstanceID: "crn:v1:bluemix:public:cloud-object-storage:global:a/ID-dummy-1231231231-123abcdefgh:dummy-values::"
33+
serviceEndpoint: "https://s3.us-south.cloud-object-storage.appdomain.cloud"
34+
authEndpoint: "https://iam.cloud.ibm.com/identity/token"

tekton-catalog/pipeline-loops/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops
33
go 1.13
44

55
require (
6+
github.com/IBM/ibm-cos-sdk-go v1.8.0
67
github.com/google/go-cmp v0.5.6
78
github.com/hashicorp/go-multierror v1.1.1
89
github.com/tektoncd/pipeline v0.30.0

tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/controller.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ package pipelinelooprun
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"os"
23+
"os/signal"
24+
"syscall"
2125

2226
"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop"
2327
pipelineloopv1alpha1 "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1"
@@ -28,7 +32,10 @@ import (
2832
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun"
2933
runreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1alpha1/run"
3034
pipelinecontroller "github.com/tektoncd/pipeline/pkg/controller"
35+
"go.uber.org/zap"
36+
"go.uber.org/zap/zapcore"
3137
"k8s.io/client-go/tools/cache"
38+
kubeclient "knative.dev/pkg/client/injection/kube/client"
3239
"knative.dev/pkg/configmap"
3340
"knative.dev/pkg/controller"
3441
"knative.dev/pkg/logging"
@@ -37,7 +44,7 @@ import (
3744
// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
3845
func NewController(namespace string) func(context.Context, configmap.Watcher) *controller.Impl {
3946
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
40-
47+
kubeclientset := kubeclient.Get(ctx)
4148
logger := logging.FromContext(ctx)
4249
pipelineclientset := pipelineclient.Get(ctx)
4350
pipelineloopclientset := pipelineloopclient.Get(ctx)
@@ -46,12 +53,50 @@ func NewController(namespace string) func(context.Context, configmap.Watcher) *c
4653
pipelineRunInformer := pipelineruninformer.Get(ctx)
4754

4855
c := &Reconciler{
56+
KubeClientSet: kubeclientset,
4957
pipelineClientSet: pipelineclientset,
5058
pipelineloopClientSet: pipelineloopclientset,
5159
runLister: runInformer.Lister(),
5260
pipelineLoopLister: pipelineLoopInformer.Lister(),
5361
pipelineRunLister: pipelineRunInformer.Lister(),
5462
}
63+
objectStoreLogger := Logger{
64+
MaxSize: 1024 * 100, // TODO make it configurable via a configmap.
65+
}
66+
err := objectStoreLogger.LoadDefaults(ctx, kubeclientset)
67+
if err == nil && objectStoreLogger.LogConfig.Enable {
68+
logger.Info("Loading object store logger...")
69+
w := zapcore.NewMultiWriteSyncer(
70+
zapcore.AddSync(os.Stdout),
71+
zapcore.AddSync(&objectStoreLogger),
72+
)
73+
core := zapcore.NewCore(
74+
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
75+
w,
76+
zap.InfoLevel,
77+
)
78+
logger := zap.New(core)
79+
logger.Info("First log msg with object store logger.")
80+
ctx = logging.WithLogger(ctx, logger.Sugar())
81+
82+
// set up SIGHUP to send logs to object store before shutdown.
83+
signal.Ignore(syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
84+
c := make(chan os.Signal, 3)
85+
signal.Notify(c, syscall.SIGTERM)
86+
signal.Notify(c, syscall.SIGINT)
87+
signal.Notify(c, syscall.SIGHUP)
88+
89+
go func() {
90+
for {
91+
<-c
92+
err = objectStoreLogger.Close()
93+
fmt.Printf("Synced with object store... %v", err)
94+
os.Exit(0)
95+
}
96+
}()
97+
} else {
98+
logger.Errorf("Object store logging unavailable, %v ", err)
99+
}
55100

56101
impl := runreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
57102
return controller.Options{
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
Copyright 2020 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package pipelinelooprun
18+
19+
import (
20+
"bytes"
21+
"context"
22+
"fmt"
23+
"io"
24+
"strconv"
25+
"sync"
26+
"time"
27+
28+
"github.com/IBM/ibm-cos-sdk-go/aws"
29+
"github.com/IBM/ibm-cos-sdk-go/aws/credentials"
30+
"github.com/IBM/ibm-cos-sdk-go/aws/credentials/ibmiam"
31+
"github.com/IBM/ibm-cos-sdk-go/aws/session"
32+
"github.com/IBM/ibm-cos-sdk-go/service/s3"
33+
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/client-go/kubernetes"
35+
"knative.dev/pkg/system"
36+
)
37+
38+
type ObjectStoreLogConfig struct {
39+
Enable bool
40+
defaultBucketName string
41+
accessKey string
42+
secretKey string
43+
ibmStyleCredentials bool
44+
apiKey string
45+
serviceInstanceID string
46+
region string
47+
serviceEndpoint string
48+
authEndpoint string
49+
client *s3.S3
50+
}
51+
52+
type Logger struct {
53+
buffer *bytes.Buffer
54+
// When buffer reaches the size of MaxSize, it tries to sync with object store.
55+
MaxSize int64
56+
// Whether to compress before syncing the buffer.
57+
Compress bool
58+
// Current size of the buffer.
59+
size int64
60+
// Sync irrespective of buffer size after elapsing this interval.
61+
SyncInterval time.Duration
62+
mu sync.Mutex
63+
LogConfig *ObjectStoreLogConfig
64+
}
65+
66+
// ensure we always implement io.WriteCloser
67+
var _ io.WriteCloser = (*Logger)(nil)
68+
69+
func (l *Logger) Write(p []byte) (n int, err error) {
70+
l.mu.Lock()
71+
defer l.mu.Unlock()
72+
writeLen := int64(len(p))
73+
if l.size+writeLen >= l.MaxSize {
74+
if err := l.syncBuffer(); err != nil {
75+
return 0, err
76+
}
77+
}
78+
if n, err = l.buffer.Write(p); err != nil {
79+
return n, err
80+
}
81+
l.size = l.size + int64(n)
82+
return n, nil
83+
}
84+
85+
func (l *Logger) syncBuffer() error {
86+
fmt.Printf("Syncing buffer size : %d, MaxSize: %d \n", l.size, l.MaxSize)
87+
err := l.LogConfig.writeToObjectStore(l.LogConfig.defaultBucketName,
88+
time.Now().Format(time.RFC3339Nano), l.buffer.Bytes())
89+
if err != nil {
90+
return err
91+
}
92+
l.buffer.Reset()
93+
l.size = 0
94+
return nil
95+
}
96+
97+
func (l *Logger) Close() error {
98+
l.mu.Lock()
99+
defer l.mu.Unlock()
100+
return l.syncBuffer()
101+
}
102+
103+
func (o *ObjectStoreLogConfig) load(ctx context.Context, kubeClientSet kubernetes.Interface) error {
104+
configMap, err := kubeClientSet.CoreV1().ConfigMaps(system.Namespace()).
105+
Get(ctx, "object-store-config", metaV1.GetOptions{})
106+
if err != nil {
107+
return err
108+
}
109+
if o.Enable, err = strconv.ParseBool(configMap.Data["enable"]); err != nil || !o.Enable {
110+
return err
111+
}
112+
113+
if o.ibmStyleCredentials, err = strconv.ParseBool(configMap.Data["ibmStyleCredentials"]); err != nil {
114+
return err
115+
}
116+
117+
o.apiKey = configMap.Data["apiKey"]
118+
o.accessKey = configMap.Data["accessKey"]
119+
o.secretKey = configMap.Data["secretKey"]
120+
o.serviceInstanceID = configMap.Data["serviceInstanceID"]
121+
o.region = configMap.Data["region"]
122+
o.serviceEndpoint = configMap.Data["serviceEndpoint"]
123+
o.authEndpoint = configMap.Data["authEndpoint"]
124+
o.defaultBucketName = configMap.Data["defaultBucketName"]
125+
ibmCredentials := ibmiam.NewStaticCredentials(aws.NewConfig(), o.authEndpoint, o.apiKey, o.serviceInstanceID)
126+
s3Credentials := credentials.NewStaticCredentials(o.accessKey, o.secretKey, "")
127+
var creds *credentials.Credentials
128+
if o.ibmStyleCredentials {
129+
creds = ibmCredentials
130+
} else {
131+
creds = s3Credentials
132+
}
133+
// Create client config
134+
var conf = aws.NewConfig().
135+
WithRegion(o.region).
136+
WithEndpoint(o.serviceEndpoint).
137+
WithCredentials(creds).
138+
WithS3ForcePathStyle(true)
139+
140+
var sess = session.Must(session.NewSession())
141+
o.client = s3.New(sess, conf)
142+
input := &s3.CreateBucketInput{
143+
Bucket: aws.String(o.defaultBucketName),
144+
}
145+
_, err = o.client.CreateBucket(input)
146+
if err != nil {
147+
fmt.Printf("This error might be harmless, as the default bucket may already exist, %v\n",
148+
err.Error())
149+
}
150+
return nil
151+
}
152+
153+
func (o *ObjectStoreLogConfig) CreateNewBucket(bucketName string) error {
154+
if !o.Enable || bucketName == o.defaultBucketName {
155+
return nil
156+
}
157+
input := &s3.CreateBucketInput{
158+
Bucket: aws.String(bucketName),
159+
}
160+
_, err := o.client.CreateBucket(input)
161+
return err
162+
}
163+
164+
func (o *ObjectStoreLogConfig) writeToObjectStore(bucketName string, key string, content []byte) error {
165+
if !o.Enable {
166+
return nil
167+
}
168+
input := s3.PutObjectInput{
169+
Bucket: aws.String(bucketName),
170+
Key: aws.String(key),
171+
Body: bytes.NewReader(content),
172+
}
173+
174+
_, err := o.client.PutObject(&input)
175+
// fmt.Printf("Response from object store: %v\n", obj)
176+
return err
177+
}
178+
179+
func (l *Logger) LoadDefaults(ctx context.Context, kubeClientSet kubernetes.Interface) error {
180+
181+
if l.LogConfig == nil {
182+
l.LogConfig = &ObjectStoreLogConfig{}
183+
err := l.LogConfig.load(ctx, kubeClientSet)
184+
if err != nil {
185+
return err
186+
}
187+
if !l.LogConfig.Enable {
188+
return fmt.Errorf("Object store logging is disabled. " +
189+
"Please edit `object-store-config` configMap to setup logging.\n")
190+
}
191+
}
192+
if l.buffer == nil {
193+
l.buffer = new(bytes.Buffer)
194+
}
195+
return nil
196+
}

tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"k8s.io/apimachinery/pkg/labels"
5353
"k8s.io/apimachinery/pkg/runtime/schema"
5454
"k8s.io/apimachinery/pkg/types"
55+
"k8s.io/client-go/kubernetes"
5556
"knative.dev/pkg/apis"
5657
"knative.dev/pkg/logging"
5758
pkgreconciler "knative.dev/pkg/reconciler"
@@ -86,6 +87,7 @@ const (
8687

8788
// Reconciler implements controller.Reconciler for Configuration resources.
8889
type Reconciler struct {
90+
KubeClientSet kubernetes.Interface
8991
pipelineClientSet clientset.Interface
9092
pipelineloopClientSet pipelineloopclientset.Interface
9193
runLister listersalpha.RunLister

0 commit comments

Comments
 (0)