Skip to content

Commit f602215

Browse files
authored
improve performance (#171)
improve performance with global vector params --------- Signed-off-by: Aleksandr Aleksandrov <[email protected]>
1 parent 30abc56 commit f602215

20 files changed

+118
-239
lines changed

api/v1alpha1/vector_common_types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package v1alpha1
33
import v1 "k8s.io/api/core/v1"
44

55
type VectorCommonStatus struct {
6-
ConfigCheckResult *bool `json:"configCheckResult,omitempty"`
7-
Reason *string `json:"reason,omitempty"`
8-
LastAppliedConfigHash *uint32 `json:"LastAppliedConfigHash,omitempty"`
6+
ConfigCheckResult *bool `json:"configCheckResult,omitempty"`
7+
Reason *string `json:"reason,omitempty"`
8+
LastAppliedConfigHash *uint32 `json:"LastAppliedConfigHash,omitempty"`
9+
LastAppliedGlobalConfigHash *uint32 `json:"LastAppliedGlobalConfigHash,omitempty"`
910
}
1011

1112
type VectorCommon struct {

config/crd/bases/observability.kaasops.io_clustervectoraggregators.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4973,6 +4973,9 @@ spec:
49734973
LastAppliedConfigHash:
49744974
format: int32
49754975
type: integer
4976+
LastAppliedGlobalConfigHash:
4977+
format: int32
4978+
type: integer
49764979
configCheckResult:
49774980
type: boolean
49784981
reason:

config/crd/bases/observability.kaasops.io_vectoraggregators.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4966,6 +4966,9 @@ spec:
49664966
LastAppliedConfigHash:
49674967
format: int32
49684968
type: integer
4969+
LastAppliedGlobalConfigHash:
4970+
format: int32
4971+
type: integer
49694972
configCheckResult:
49704973
type: boolean
49714974
reason:

config/crd/bases/observability.kaasops.io_vectors.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4985,6 +4985,9 @@ spec:
49854985
LastAppliedConfigHash:
49864986
format: int32
49874987
type: integer
4988+
LastAppliedGlobalConfigHash:
4989+
format: int32
4990+
type: integer
49884991
configCheckResult:
49894992
type: boolean
49904993
reason:

internal/config/agent.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,24 @@ import (
1010
goyaml "sigs.k8s.io/yaml"
1111
)
1212

13-
func BuildAgentConfig(p VectorConfigParams, pipelines ...pipeline.Pipeline) ([]byte, error) {
13+
const (
14+
AgentApiPort = 8686
15+
)
16+
17+
func BuildAgentConfig(p VectorConfigParams, pipelines ...pipeline.Pipeline) (*VectorConfig, []byte, error) {
1418
cfg, err := buildAgentConfig(p, pipelines...)
1519
if err != nil {
16-
return nil, err
20+
return nil, nil, err
1721
}
1822
yamlBytes, err := yaml.Marshal(cfg)
1923
if err != nil {
20-
return nil, err
24+
return nil, nil, err
2125
}
2226
jsonBytes, err := goyaml.YAMLToJSON(yamlBytes)
2327
if err != nil {
24-
return nil, err
28+
return nil, nil, err
2529
}
26-
return jsonBytes, nil
30+
return cfg, jsonBytes, nil
2731
}
2832

2933
func buildAgentConfig(params VectorConfigParams, pipelines ...pipeline.Pipeline) (*VectorConfig, error) {

internal/config/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1"
2424
"github.com/kaasops/vector-operator/internal/evcollector"
25-
"github.com/kaasops/vector-operator/internal/vector/vectoragent"
2625
"github.com/mitchellh/mapstructure"
2726
"gopkg.in/yaml.v2"
2827
"net"
@@ -50,7 +49,7 @@ func newVectorConfig(p VectorConfigParams) *VectorConfig {
5049
sinks := make(map[string]*Sink)
5150

5251
api := &ApiSpec{
53-
Address: net.JoinHostPort("0.0.0.0", strconv.Itoa(vectoragent.ApiPort)),
52+
Address: net.JoinHostPort("0.0.0.0", strconv.Itoa(AgentApiPort)),
5453
Enabled: p.ApiEnabled,
5554
Playground: p.PlaygroundEnabled,
5655
}
@@ -63,7 +62,9 @@ func newVectorConfig(p VectorConfigParams) *VectorConfig {
6362
Transforms: transforms,
6463
Sinks: sinks,
6564
},
66-
ExpireMetricsSecs: p.ExpireMetricsSecs,
65+
globalOptions: globalOptions{
66+
ExpireMetricsSecs: p.ExpireMetricsSecs,
67+
},
6768
}
6869
}
6970

internal/config/types.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,23 @@ limitations under the License.
1717
package config
1818

1919
import (
20+
"encoding/json"
2021
"fmt"
22+
"github.com/kaasops/vector-operator/internal/utils/hash"
2123

2224
corev1 "k8s.io/api/core/v1"
2325
)
2426

27+
type globalOptions struct {
28+
ExpireMetricsSecs *int `yaml:"expire_metrics_secs,omitempty"`
29+
}
30+
2531
type VectorConfig struct {
26-
DataDir string `yaml:"data_dir"`
27-
ExpireMetricsSecs *int `yaml:"expire_metrics_secs,omitempty"`
28-
Api *ApiSpec `yaml:"api"`
29-
PipelineConfig `yaml:",inline"`
30-
internal internalConfig `yaml:"-"`
32+
DataDir string `yaml:"data_dir"`
33+
globalOptions `yaml:",inline"`
34+
Api *ApiSpec `yaml:"api"`
35+
PipelineConfig `yaml:",inline"`
36+
internal internalConfig `yaml:"-"`
3137
}
3238

3339
type PipelineConfig struct {
@@ -96,3 +102,9 @@ func (c *internalConfig) addServicePort(port *ServicePort) error {
96102
}
97103
return nil
98104
}
105+
106+
func (c *VectorConfig) GetGlobalConfigHash() *uint32 {
107+
bytes, _ := json.Marshal(c.globalOptions)
108+
gHash := hash.Get(bytes)
109+
return &gHash
110+
}

internal/controller/clustervectoraggregator_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (r *ClusterVectorAggregatorReconciler) createOrUpdateVectorAggregator(ctx c
167167
return ctrl.Result{}, err
168168
}
169169

170-
if err := vaCtrl.SetSuccessStatus(ctx, &cfgHash); err != nil {
170+
if err := vaCtrl.SetSuccessStatus(ctx, &cfgHash, vaCtrl.Config.GetGlobalConfigHash()); err != nil {
171171
return ctrl.Result{}, err
172172
}
173173

internal/controller/pipeline_controller.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
140140
for _, vector := range vectorAgents {
141141
eg.Go(func() error {
142142
vaCtrl := vectoragent.NewController(vector, r.Client, r.Clientset)
143-
byteConfig, err := config.BuildAgentConfig(config.VectorConfigParams{
143+
cfg, byteConfig, err := config.BuildAgentConfig(config.VectorConfigParams{
144144
ApiEnabled: vaCtrl.Vector.Spec.Agent.Api.Enabled,
145145
PlaygroundEnabled: vaCtrl.Vector.Spec.Agent.Api.Playground,
146146
UseApiServerCache: vaCtrl.Vector.Spec.UseApiServerCache,
@@ -151,9 +151,11 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
151151
return fmt.Errorf("agent %s/%s build config failed: %w: %w", vector.Namespace, vector.Name, ErrBuildConfigFailed, err)
152152
}
153153

154-
vaCtrl.Config = byteConfig
154+
vaCtrl.Config = cfg
155+
vaCtrl.ByteConfig = byteConfig
156+
155157
configCheck := configcheck.New(
156-
vaCtrl.Config,
158+
vaCtrl.ByteConfig,
157159
vaCtrl.Client,
158160
vaCtrl.ClientSet,
159161
&vaCtrl.Vector.Spec.Agent.VectorCommon,

internal/controller/vector_controller.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (r *VectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
8686
log.Error(err, "Failed to list vector instances")
8787
return ctrl.Result{}, err
8888
}
89-
return r.reconcileVectors(ctx, r.Client, r.Clientset, false, vectors...)
89+
return r.reconcileVectors(ctx, r.Client, r.Clientset, vectors...)
9090
}
9191

9292
vectorCR, err := r.findVectorCustomResourceInstance(ctx, req)
@@ -98,7 +98,7 @@ func (r *VectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
9898
log.Info("Vector CR not found. Ignoring since object must be deleted")
9999
return ctrl.Result{}, nil
100100
}
101-
return r.createOrUpdateVector(ctx, r.Client, r.Clientset, vectorCR, false)
101+
return r.createOrUpdateVector(ctx, r.Client, r.Clientset, vectorCR)
102102
}
103103

104104
// SetupWithManager sets up the controller with the Manager.
@@ -156,7 +156,7 @@ func (r *VectorReconciler) findVectorCustomResourceInstance(ctx context.Context,
156156
return vectorCR, nil
157157
}
158158

159-
func (r *VectorReconciler) reconcileVectors(ctx context.Context, client client.Client, clientset *kubernetes.Clientset, configOnly bool, vectors ...*v1alpha1.Vector) (ctrl.Result, error) {
159+
func (r *VectorReconciler) reconcileVectors(ctx context.Context, client client.Client, clientset *kubernetes.Clientset, vectors ...*v1alpha1.Vector) (ctrl.Result, error) {
160160
if len(vectors) == 0 {
161161
return ctrl.Result{}, nil
162162
}
@@ -166,14 +166,14 @@ func (r *VectorReconciler) reconcileVectors(ctx context.Context, client client.C
166166
continue
167167
}
168168
setAgentTypeMetaIfNeeded(vector)
169-
if _, err := r.createOrUpdateVector(ctx, client, clientset, vector, configOnly); err != nil {
169+
if _, err := r.createOrUpdateVector(ctx, client, clientset, vector); err != nil {
170170
return ctrl.Result{}, err
171171
}
172172
}
173173
return ctrl.Result{}, nil
174174
}
175175

176-
func (r *VectorReconciler) createOrUpdateVector(ctx context.Context, client client.Client, clientset *kubernetes.Clientset, v *v1alpha1.Vector, configOnly bool) (ctrl.Result, error) {
176+
func (r *VectorReconciler) createOrUpdateVector(ctx context.Context, client client.Client, clientset *kubernetes.Clientset, v *v1alpha1.Vector) (ctrl.Result, error) {
177177
log := log.FromContext(ctx).WithValues("Vector", v.Name)
178178
// Init Controller for Vector Agent
179179
vaCtrl := vectoragent.NewController(v, client, clientset)
@@ -189,7 +189,7 @@ func (r *VectorReconciler) createOrUpdateVector(ctx context.Context, client clie
189189
}
190190

191191
// Get Config in Json ([]byte)
192-
byteConfig, err := config.BuildAgentConfig(config.VectorConfigParams{
192+
cfg, byteConfig, err := config.BuildAgentConfig(config.VectorConfigParams{
193193
ApiEnabled: vaCtrl.Vector.Spec.Agent.Api.Enabled,
194194
PlaygroundEnabled: vaCtrl.Vector.Spec.Agent.Api.Playground,
195195
UseApiServerCache: vaCtrl.Vector.Spec.UseApiServerCache,
@@ -231,14 +231,15 @@ func (r *VectorReconciler) createOrUpdateVector(ctx context.Context, client clie
231231
}
232232
}
233233

234-
vaCtrl.Config = byteConfig
234+
vaCtrl.ByteConfig = byteConfig
235+
vaCtrl.Config = cfg
235236

236237
// Start Reconcile Vector Agent
237-
if err := vaCtrl.EnsureVectorAgent(ctx, configOnly); err != nil {
238+
if err := vaCtrl.EnsureVectorAgent(ctx); err != nil {
238239
return ctrl.Result{}, err
239240
}
240241

241-
if err := vaCtrl.SetSuccessStatus(ctx, &cfgHash); err != nil {
242+
if err := vaCtrl.SetSuccessStatus(ctx, &cfgHash, cfg.GetGlobalConfigHash()); err != nil {
242243
// TODO: Handle err: Operation cannot be fulfilled on vectors.observability.kaasops.io \"vector-sample\": the object has been modified; please apply your changes to the latest version and try again
243244
if api_errors.IsConflict(err) {
244245
return ctrl.Result{}, err

internal/controller/vectoraggregator_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func (r *VectorAggregatorReconciler) createOrUpdateVectorAggregator(ctx context.
244244
return ctrl.Result{}, err
245245
}
246246

247-
if err := vaCtrl.SetSuccessStatus(ctx, &cfgHash); err != nil {
247+
if err := vaCtrl.SetSuccessStatus(ctx, &cfgHash, vaCtrl.Config.GetGlobalConfigHash()); err != nil {
248248
return ctrl.Result{}, err
249249
}
250250

internal/utils/compression/compression.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,3 @@ func Compress(data []byte, log logr.Logger) []byte {
2020

2121
return b.Bytes()
2222
}
23-
24-
func Decompress(data []byte, log logr.Logger) []byte {
25-
if len(data) == 0 {
26-
return []byte{}
27-
}
28-
29-
reader := bytes.NewReader(data)
30-
gz, err := gzip.NewReader(reader)
31-
if err != nil {
32-
log.Error(err, "Failed to create gzip reader for decompress")
33-
return nil
34-
}
35-
defer func() {
36-
if err := gz.Close(); err != nil {
37-
log.Error(err, "Failed to close reader for decompress")
38-
}
39-
}()
40-
41-
var result bytes.Buffer
42-
if _, err := result.ReadFrom(gz); err != nil {
43-
log.Error(err, "Failed to read decompressed data")
44-
return nil
45-
}
46-
return result.Bytes()
47-
}

internal/utils/compression/compression_test.go

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)