Skip to content

Commit 90a9a50

Browse files
authored
feat: make streaming buffer size configurable (#696)
* feat: make streaming buffer size configurable Signed-off-by: Eitan Yarmush <eitan.yarmush@solo.io> * switch to resource quantities for buffer size Signed-off-by: Eitan Yarmush <eitan.yarmush@solo.io> --------- Signed-off-by: Eitan Yarmush <eitan.yarmush@solo.io>
1 parent 4e010d4 commit 90a9a50

File tree

7 files changed

+107
-77
lines changed

7 files changed

+107
-77
lines changed

go/controller/cmd/main.go

Lines changed: 82 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/kagent-dev/kagent/go/internal/version"
3030

31+
"k8s.io/apimachinery/pkg/api/resource"
3132
"k8s.io/apimachinery/pkg/types"
3233

3334
"github.com/kagent-dev/kagent/go/controller/translator"
@@ -85,51 +86,71 @@ func init() {
8586
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
8687
}
8788

89+
type Config struct {
90+
Metrics struct {
91+
Addr string
92+
CertPath string
93+
CertName string
94+
CertKey string
95+
}
96+
Webhook struct {
97+
CertPath string
98+
CertName string
99+
CertKey string
100+
}
101+
Streaming struct {
102+
MaxBufSize resource.QuantityValue `default:"1Mi"`
103+
InitialBufSize resource.QuantityValue `default:"4Ki"`
104+
}
105+
LeaderElection bool
106+
ProbeAddr string
107+
SecureMetrics bool
108+
EnableHTTP2 bool
109+
DefaultModelConfig types.NamespacedName
110+
HttpServerAddr string
111+
WatchNamespaces string
112+
A2ABaseUrl string
113+
Database struct {
114+
Type string
115+
Path string
116+
Url string
117+
}
118+
}
119+
88120
// nolint:gocyclo
89121
func main() {
90-
var metricsAddr string
91-
var metricsCertPath, metricsCertName, metricsCertKey string
92-
var webhookCertPath, webhookCertName, webhookCertKey string
93-
var enableLeaderElection bool
94-
var probeAddr string
95-
var secureMetrics bool
96-
var enableHTTP2 bool
97-
var defaultModelConfig types.NamespacedName
122+
cfg := Config{}
98123
var tlsOpts []func(*tls.Config)
99-
var httpServerAddr string
100-
var watchNamespaces string
101-
var a2aBaseUrl string
102-
var databasePath string
103-
var databaseType string
104-
var databaseURL string
105-
106-
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
124+
flag.StringVar(&cfg.Metrics.Addr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
107125
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
108-
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8082", "The address the probe endpoint binds to.")
109-
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
126+
flag.StringVar(&cfg.ProbeAddr, "health-probe-bind-address", ":8082", "The address the probe endpoint binds to.")
127+
flag.BoolVar(&cfg.LeaderElection, "leader-elect", false,
110128
"Enable leader election for controller manager. "+
111129
"Enabling this will ensure there is only one active controller manager.")
112-
flag.BoolVar(&secureMetrics, "metrics-secure", true,
130+
flag.BoolVar(&cfg.SecureMetrics, "metrics-secure", true,
113131
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
114-
flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
115-
flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
116-
flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
117-
flag.StringVar(&metricsCertPath, "metrics-cert-path", "",
132+
flag.StringVar(&cfg.Webhook.CertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
133+
flag.StringVar(&cfg.Webhook.CertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
134+
flag.StringVar(&cfg.Webhook.CertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
135+
flag.StringVar(&cfg.Metrics.CertPath, "metrics-cert-path", "",
118136
"The directory that contains the metrics server certificate.")
119-
flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
120-
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
121-
flag.BoolVar(&enableHTTP2, "enable-http2", false,
137+
flag.StringVar(&cfg.Metrics.CertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
138+
flag.StringVar(&cfg.Metrics.CertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
139+
flag.BoolVar(&cfg.EnableHTTP2, "enable-http2", false,
122140
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
123141

124-
flag.StringVar(&defaultModelConfig.Name, "default-model-config-name", "default-model-config", "The name of the default model config.")
125-
flag.StringVar(&defaultModelConfig.Namespace, "default-model-config-namespace", kagentNamespace, "The namespace of the default model config.")
126-
flag.StringVar(&httpServerAddr, "http-server-address", ":8083", "The address the HTTP server binds to.")
127-
flag.StringVar(&a2aBaseUrl, "a2a-base-url", "http://127.0.0.1:8083", "The base URL of the A2A Server endpoint, as advertised to clients.")
128-
flag.StringVar(&databaseType, "database-type", "sqlite", "The type of the database to use. Supported values: sqlite, postgres.")
129-
flag.StringVar(&databasePath, "sqlite-database-path", "./kagent.db", "The path to the SQLite database file.")
130-
flag.StringVar(&databaseURL, "postgres-database-url", "postgres://postgres:kagent@db.kagent.svc.cluster.local:5432/crud", "The URL of the PostgreSQL database.")
142+
flag.StringVar(&cfg.DefaultModelConfig.Name, "default-model-config-name", "default-model-config", "The name of the default model config.")
143+
flag.StringVar(&cfg.DefaultModelConfig.Namespace, "default-model-config-namespace", kagentNamespace, "The namespace of the default model config.")
144+
flag.StringVar(&cfg.HttpServerAddr, "http-server-address", ":8083", "The address the HTTP server binds to.")
145+
flag.StringVar(&cfg.A2ABaseUrl, "a2a-base-url", "http://127.0.0.1:8083", "The base URL of the A2A Server endpoint, as advertised to clients.")
146+
flag.StringVar(&cfg.Database.Type, "database-type", "sqlite", "The type of the database to use. Supported values: sqlite, postgres.")
147+
flag.StringVar(&cfg.Database.Path, "sqlite-database-path", "./kagent.db", "The path to the SQLite database file.")
148+
flag.StringVar(&cfg.Database.Url, "postgres-database-url", "postgres://postgres:kagent@db.kagent.svc.cluster.local:5432/crud", "The URL of the PostgreSQL database.")
131149

132-
flag.StringVar(&watchNamespaces, "watch-namespaces", "", "The namespaces to watch for .")
150+
flag.StringVar(&cfg.WatchNamespaces, "watch-namespaces", "", "The namespaces to watch for .")
151+
152+
flag.Var(&cfg.Streaming.MaxBufSize, "streaming-max-buf-size", "The maximum size of the streaming buffer.")
153+
flag.Var(&cfg.Streaming.InitialBufSize, "streaming-initial-buf-size", "The initial size of the streaming buffer.")
133154

134155
opts := zap.Options{
135156
Development: true,
@@ -139,6 +160,9 @@ func main() {
139160

140161
logger := zap.New(zap.UseFlagOptions(&opts))
141162

163+
logger.Info("Starting KAgent Controller", "version", Version, "git_commit", GitCommit, "build_date", BuildDate)
164+
logger.Info("Config", "config", cfg)
165+
142166
ctrl.SetLogger(logger)
143167

144168
goruntime.SetMaxProcs(logger)
@@ -156,7 +180,7 @@ func main() {
156180
c.NextProtos = []string{"http/1.1"}
157181
}
158182

159-
if !enableHTTP2 {
183+
if !cfg.EnableHTTP2 {
160184
tlsOpts = append(tlsOpts, disableHTTP2)
161185
}
162186

@@ -166,14 +190,14 @@ func main() {
166190
// Initial webhook TLS options
167191
webhookTLSOpts := tlsOpts
168192

169-
if len(webhookCertPath) > 0 {
193+
if len(cfg.Webhook.CertPath) > 0 {
170194
setupLog.Info("Initializing webhook certificate watcher using provided certificates",
171-
"webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey)
195+
"webhook-cert-path", cfg.Webhook.CertPath, "webhook-cert-name", cfg.Webhook.CertName, "webhook-cert-key", cfg.Webhook.CertKey)
172196

173197
var err error
174198
webhookCertWatcher, err = certwatcher.New(
175-
filepath.Join(webhookCertPath, webhookCertName),
176-
filepath.Join(webhookCertPath, webhookCertKey),
199+
filepath.Join(cfg.Webhook.CertPath, cfg.Webhook.CertName),
200+
filepath.Join(cfg.Webhook.CertPath, cfg.Webhook.CertKey),
177201
)
178202
if err != nil {
179203
setupLog.Error(err, "Failed to initialize webhook certificate watcher")
@@ -196,12 +220,12 @@ func main() {
196220
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.0/pkg/metrics/server
197221
// - https://book.kubebuilder.io/reference/metrics.html
198222
metricsServerOptions := metricsserver.Options{
199-
BindAddress: metricsAddr,
200-
SecureServing: secureMetrics,
223+
BindAddress: cfg.Metrics.Addr,
224+
SecureServing: cfg.SecureMetrics,
201225
TLSOpts: tlsOpts,
202226
}
203227

204-
if secureMetrics {
228+
if cfg.SecureMetrics {
205229
// FilterProvider is used to protect the metrics endpoint with authn/authz.
206230
// These configurations ensure that only authorized users and service accounts
207231
// can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
@@ -217,14 +241,14 @@ func main() {
217241
// - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates
218242
// managed by cert-manager for the metrics server.
219243
// - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification.
220-
if len(metricsCertPath) > 0 {
244+
if len(cfg.Metrics.CertPath) > 0 {
221245
setupLog.Info("Initializing metrics certificate watcher using provided certificates",
222-
"metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey)
246+
"metrics-cert-path", cfg.Metrics.CertPath, "metrics-cert-name", cfg.Metrics.CertName, "metrics-cert-key", cfg.Metrics.CertKey)
223247

224248
var err error
225249
metricsCertWatcher, err = certwatcher.New(
226-
filepath.Join(metricsCertPath, metricsCertName),
227-
filepath.Join(metricsCertPath, metricsCertKey),
250+
filepath.Join(cfg.Metrics.CertPath, cfg.Metrics.CertName),
251+
filepath.Join(cfg.Metrics.CertPath, cfg.Metrics.CertKey),
228252
)
229253
if err != nil {
230254
setupLog.Error(err, "to initialize metrics certificate watcher", "error", err)
@@ -237,14 +261,14 @@ func main() {
237261
}
238262

239263
// filter out invalid namespaces from the watchNamespaces flag (comma separated list)
240-
watchNamespacesList := filterValidNamespaces(strings.Split(watchNamespaces, ","))
264+
watchNamespacesList := filterValidNamespaces(strings.Split(cfg.WatchNamespaces, ","))
241265

242266
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
243267
Scheme: scheme,
244268
Metrics: metricsServerOptions,
245269
WebhookServer: webhookServer,
246-
HealthProbeBindAddress: probeAddr,
247-
LeaderElection: enableLeaderElection,
270+
HealthProbeBindAddress: cfg.ProbeAddr,
271+
LeaderElection: cfg.LeaderElection,
248272
LeaderElectionID: "0e9f6799.kagent.dev",
249273
Cache: cache.Options{
250274
DefaultNamespaces: configureNamespaceWatching(watchNamespacesList),
@@ -268,12 +292,12 @@ func main() {
268292

269293
// Initialize database
270294
dbManager, err := database.NewManager(&database.Config{
271-
DatabaseType: database.DatabaseType(databaseType),
295+
DatabaseType: database.DatabaseType(cfg.Database.Type),
272296
SqliteConfig: &database.SqliteConfig{
273-
DatabasePath: databasePath,
297+
DatabasePath: cfg.Database.Path,
274298
},
275299
PostgresConfig: &database.PostgresConfig{
276-
URL: databaseURL,
300+
URL: cfg.Database.Url,
277301
},
278302
})
279303
if err != nil {
@@ -293,21 +317,23 @@ func main() {
293317

294318
apiTranslator := translator.NewAdkApiTranslator(
295319
kubeClient,
296-
defaultModelConfig,
320+
cfg.DefaultModelConfig,
297321
)
298322

299323
a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A)
300324

301325
a2aReconciler := a2a_reconciler.NewReconciler(
302326
a2aHandler,
303-
a2aBaseUrl+httpserver.APIPathA2A,
327+
cfg.A2ABaseUrl+httpserver.APIPathA2A,
328+
int(cfg.Streaming.MaxBufSize.Value()),
329+
int(cfg.Streaming.InitialBufSize.Value()),
304330
)
305331

306332
rcnclr := reconciler.NewKagentReconciler(
307333
apiTranslator,
308334
kubeClient,
309335
dbClient,
310-
defaultModelConfig,
336+
cfg.DefaultModelConfig,
311337
a2aReconciler,
312338
)
313339

@@ -383,7 +409,7 @@ func main() {
383409
}
384410

385411
httpServer, err := httpserver.NewHTTPServer(httpserver.ServerConfig{
386-
BindAddr: httpServerAddr,
412+
BindAddr: cfg.HttpServerAddr,
387413
KubeClient: kubeClient,
388414
A2AHandler: a2aHandler,
389415
WatchedNamespaces: watchNamespacesList,

go/controller/internal/a2a/a2a_reconciler.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,22 @@ type A2AReconciler interface {
3131
type a2aReconciler struct {
3232
a2aHandler a2a.A2AHandlerMux
3333
a2aBaseUrl string
34+
35+
streamingMaxBufSize int
36+
streamingInitialBufSize int
3437
}
3538

3639
func NewReconciler(
3740
a2aHandler a2a.A2AHandlerMux,
3841
a2aBaseUrl string,
42+
streamingMaxBufSize int,
43+
streamingInitialBufSize int,
3944
) A2AReconciler {
4045
return &a2aReconciler{
41-
a2aHandler: a2aHandler,
42-
a2aBaseUrl: a2aBaseUrl,
46+
a2aHandler: a2aHandler,
47+
a2aBaseUrl: a2aBaseUrl,
48+
streamingMaxBufSize: streamingMaxBufSize,
49+
streamingInitialBufSize: streamingInitialBufSize,
4350
}
4451
}
4552

@@ -53,7 +60,7 @@ func (a *a2aReconciler) ReconcileAgent(
5360
agentRef := common.GetObjectRef(agent)
5461
cardCopy.URL = fmt.Sprintf("%s/%s/", a.a2aBaseUrl, agentRef)
5562

56-
client, err := a2aclient.NewA2AClient(adkConfig.AgentCard.URL)
63+
client, err := a2aclient.NewA2AClient(adkConfig.AgentCard.URL, a2aclient.WithBuffer(a.streamingInitialBufSize, a.streamingMaxBufSize))
5764
if err != nil {
5865
return err
5966
}

go/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,5 @@ require (
150150
sigs.k8s.io/randfill v1.0.0 // indirect
151151
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect
152152
)
153+
154+
replace trpc.group/trpc-go/trpc-a2a-go => github.com/kagent-dev/a2a-go v0.0.0-20250806145931-0fab01f644c3

go/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
126126
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
127127
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
128128
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
129+
github.com/kagent-dev/a2a-go v0.0.0-20250806145931-0fab01f644c3 h1:NrQCHyIOsQTtq9u7fpfQuDOMQAGVOa+n5Km/8csdqOM=
130+
github.com/kagent-dev/a2a-go v0.0.0-20250806145931-0fab01f644c3/go.mod h1:lu052zH/pTlTBwWMU/E3UckR0U9ajL1NlhiRkIF9R6A=
129131
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
130132
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
131133
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
@@ -381,5 +383,3 @@ sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vt
381383
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
382384
sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs=
383385
sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4=
384-
trpc.group/trpc-go/trpc-a2a-go v0.2.1 h1:v2U4e74wAmgD019+Xw8GTSn7ZJaz+enBfWbLshyhZQs=
385-
trpc.group/trpc-go/trpc-a2a-go v0.2.1/go.mod h1:lu052zH/pTlTBwWMU/E3UckR0U9ajL1NlhiRkIF9R6A=

go/internal/a2a/manager.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package a2a
22

33
import (
44
"context"
5-
"errors"
65

76
"trpc.group/trpc-go/trpc-a2a-go/client"
87
"trpc.group/trpc-go/trpc-a2a-go/protocol"
@@ -56,16 +55,5 @@ func (m *PassthroughManager) OnPushNotificationGet(ctx context.Context, params p
5655
}
5756

5857
func (m *PassthroughManager) OnResubscribe(ctx context.Context, params protocol.TaskIDParams) (<-chan protocol.StreamingMessageEvent, error) {
59-
// TODO: Implement
60-
return nil, nil
61-
}
62-
63-
// Deprecated: OnSendTask is deprecated and will be removed in the future.
64-
func (m *PassthroughManager) OnSendTask(ctx context.Context, request protocol.SendTaskParams) (*protocol.Task, error) {
65-
return nil, errors.New("OnSendTask is deprecated and will be removed in the future")
66-
}
67-
68-
// Deprecated: OnSendTaskSubscribe is deprecated and will be removed in the future.
69-
func (m *PassthroughManager) OnSendTaskSubscribe(ctx context.Context, request protocol.SendTaskParams) (<-chan protocol.TaskEvent, error) {
70-
return nil, errors.New("OnSendTaskSubscribe is deprecated and will be removed in the future")
58+
return m.client.ResubscribeTask(ctx, params)
7159
}

helm/kagent/templates/controller-deployment.yaml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ spec:
4242
- {{ .Values.controller.loglevel }}
4343
- -watch-namespaces
4444
- "{{ include "kagent.watchNamespaces" . }}"
45+
- -streaming-max-buf-size
46+
- {{ .Values.controller.streaming.maxBufSize | quote }}
47+
- -streaming-initial-buf-size
48+
- {{ .Values.controller.streaming.initialBufSize | quote }}
4549
- -database-type
4650
- {{ .Values.database.type }}
4751
{{- if eq .Values.database.type "sqlite" }}
@@ -128,15 +132,15 @@ spec:
128132
- name: OTEL_EXPORTER_OTLP_TRACES_INSECURE
129133
value: {{ .Values.otel.tracing.exporter.otlp.insecure | quote }}
130134
ports:
131-
- name: http
135+
- name: http-grafana
132136
containerPort: 8000
133137
protocol: TCP
134138
startupProbe:
135139
tcpSocket:
136-
port: http
140+
port: http-grafana
137141
periodSeconds: 1
138142
initialDelaySeconds: 1
139143
readinessProbe:
140144
tcpSocket:
141-
port: http
145+
port: http-grafana
142146
periodSeconds: 30

helm/kagent/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ database:
5858
controller:
5959
replicas: 1
6060
loglevel: "info"
61+
streaming: # Streaming buffer size for A2A communication
62+
maxBufSize: 1Mi # 1024 * 1024
63+
initialBufSize: 4Ki # 4 * 1024
6164

6265
# -- Namespaces the controller should watch.
6366
# If empty, the controller will watch ALL available namespaces.

0 commit comments

Comments
 (0)