From 96a90b4282bc6f884e0806a00740b29003ac4f1d Mon Sep 17 00:00:00 2001 From: npolshakova Date: Fri, 14 Feb 2025 11:29:39 -0500 Subject: [PATCH] initial plugins --- go.mod | 2 +- hack/utils/oss_compliance/osa_provided.md | 1 + .../directresponse/direct_response_plugin.go | 5 + .../listenerpolicy/listener_policy_plugin.go | 5 + .../extensions2/plugins/routepolicy/ai.go | 435 +++++++++++++++++- .../routepolicy/route_policy_plugin.go | 54 ++- .../extensions2/plugins/upstream/ai.go | 5 +- .../extensions2/plugins/upstream/aws.go | 6 +- .../extensions2/plugins/upstream/plugin.go | 114 ++++- internal/kgateway/ir/iface.go | 30 +- internal/kgateway/ir/upstream.go | 5 +- internal/kgateway/krtcollections/builtin.go | 5 + .../ai-anthropic-passthrough-out.yaml | 150 ------ .../testdata/ai-anthropic-passthrough.yaml | 64 +++ .../setup/testdata/ai-prompt-guard-out.yaml | 150 ------ .../setup/testdata/ai-prompt-guard.yaml | 10 +- .../testdata/ai-vertex-ai-streaming-out.yaml | 155 ------- .../testdata/ai-vertex-ai-streaming.yaml | 10 +- .../kgateway/translator/irtranslator/route.go | 61 ++- internal/kgateway/wellknown/constants.go | 6 + 20 files changed, 756 insertions(+), 517 deletions(-) delete mode 100644 internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml create mode 100644 internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml delete mode 100644 internal/kgateway/setup/testdata/ai-prompt-guard-out.yaml delete mode 100644 internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml diff --git a/go.mod b/go.mod index 6ee3cd33727..ebe3096abc1 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/kelseyhightower/envconfig v1.4.0 + github.com/mitchellh/hashstructure v1.0.0 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.35.0 github.com/pkg/errors v0.9.1 @@ -157,7 +158,6 @@ require ( github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect - github.com/mitchellh/hashstructure v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/locker v1.0.1 // indirect diff --git a/hack/utils/oss_compliance/osa_provided.md b/hack/utils/oss_compliance/osa_provided.md index 3cf409f9772..d28766e1969 100644 --- a/hack/utils/oss_compliance/osa_provided.md +++ b/hack/utils/oss_compliance/osa_provided.md @@ -15,6 +15,7 @@ Name|Version|License [google/go-cmp](https://github.com/google/go-cmp)|v0.6.0|BSD 3-clause "New" or "Revised" License [grpc-ecosystem/go-grpc-middleware](https://github.com/grpc-ecosystem/go-grpc-middleware)|v1.4.0|Apache License 2.0 [kelseyhightower/envconfig](https://github.com/kelseyhightower/envconfig)|v1.4.0|MIT License +[mitchellh/hashstructure](https://github.com/mitchellh/hashstructure)|v1.0.0|MIT License [ginkgo/v2](https://github.com/onsi/ginkgo)|v2.20.2|MIT License [onsi/gomega](https://github.com/onsi/gomega)|v1.35.0|MIT License [pkg/errors](https://github.com/pkg/errors)|v0.9.1|BSD 2-clause "Simplified" License diff --git a/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go b/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go index b33954416bb..c71c2ec4050 100644 --- a/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go +++ b/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go @@ -47,6 +47,11 @@ func (d *directResponse) Equals(in any) bool { type directResponseGwPass struct { } +func (p *directResponseGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func registerTypes(ourCli versioned.Interface) { skubeclient.Register[*v1alpha1.DirectResponse]( v1alpha1.DirectResponseGVK.GroupVersion().WithResource("directresponses"), diff --git a/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go b/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go index 726812acef0..7d14e5b545e 100644 --- a/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go +++ b/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go @@ -40,6 +40,11 @@ func (d *listenerOptsPlugin) Equals(in any) bool { type listenerOptsPluginGwPass struct { } +func (p *listenerOptsPluginGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensionplug.Plugin { col := krtutil.SetupCollectionDynamic[v1alpha1.ListenerPolicy]( diff --git a/internal/kgateway/extensions2/plugins/routepolicy/ai.go b/internal/kgateway/extensions2/plugins/routepolicy/ai.go index 3e30430fa9c..4aca74b0dc4 100644 --- a/internal/kgateway/extensions2/plugins/routepolicy/ai.go +++ b/internal/kgateway/extensions2/plugins/routepolicy/ai.go @@ -2,44 +2,439 @@ package routepolicy import ( "context" + "encoding/binary" + "encoding/json" + "fmt" + "hash" + "hash/fnv" + "reflect" + "strings" - envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - "google.golang.org/protobuf/types/known/anypb" + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + "github.com/mitchellh/hashstructure" + envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2" + "github.com/solo-io/go-utils/contextutils" + "google.golang.org/protobuf/proto" + v1 "k8s.io/api/core/v1" "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" ) const ( - aiMetadataNamespace = "envoy.filters.ai.solo.io" - transformationFilterName = "ai.transformation.solo.io" - transformationEEFilterName = "ai.transformation_ee.solo.io" - extProcFilterName = "ai.extproc.solo.io" - modSecurityFilterName = "ai.modsecurity.solo.io" - setMetadataFilterName = "envoy.filters.http.set_filter_state" - extProcUDSClusterName = "ai_ext_proc_uds_cluster" - extProcUDSSocketPath = "@gloo-ai-sock" + contextString = `{"content":"%s","role":"%s"}` ) -func processAIRoutePolicy(ctx context.Context, aiConfig *v1alpha1.AIRoutePolicy, outputRoute *envoy_config_route_v3.Route, pCtx *ir.RouteContext) error { - if outputRoute.GetTypedPerFilterConfig() == nil { - outputRoute.TypedPerFilterConfig = make(map[string]*anypb.Any) +type transformationWithOutput struct { + transformation *envoytransformation.TransformationTemplate + perFilterConfig map[string]proto.Message +} + +func processAIRoutePolicy( + ctx context.Context, + aiConfig *v1alpha1.AIRoutePolicy, + pCtx *ir.RouteBackendContext, + extprocSettings *envoy_ext_proc_v3.ExtProcPerRoute, +) error { + + if extprocSettings == nil { + // If it's not an AI route we want to disable our ext-proc filter just in case. + // This will have no effect if we don't add the listener filter + disabledExtprocSettings := &envoy_ext_proc_v3.ExtProcPerRoute{ + Override: &envoy_ext_proc_v3.ExtProcPerRoute_Disabled{ + Disabled: true, + }, + } + pCtx.AddTypedConfig(wellknown.ExtProcFilterName, disabledExtprocSettings) + } else { + // If the route options specify this as a chat streaming route, add a header to the ext-proc request + if aiConfig.RouteType == v1alpha1.CHAT_STREAMING { + // append streaming header if it's a streaming route + extprocSettings.GetOverrides().GrpcInitialMetadata = append(extprocSettings.GetOverrides().GetGrpcInitialMetadata(), &envoy_config_core_v3.HeaderValue{ + Key: "x-chat-streaming", + Value: "true", + }) + } + + // TODO: calculate this in upstream, then apply here + transformations := []*transformationWithOutput{ + { + // It's safe to use the first as they will all be of the same type at this point in the code + transformation: getTransformationTemplateForUpstream(ctx, nil, aiConfig), + perFilterConfig: *pCtx.TypedFilterConfig, + }, + } + err := handleAIRoutePolicy(aiConfig, transformations, extprocSettings) + if err != nil { + return err + } + + pCtx.AddTypedConfig(wellknown.ExtProcFilterName, extprocSettings) + } + return nil +} +func handleAIRoutePolicy( + aiConfig *v1alpha1.AIRoutePolicy, + transformations []*transformationWithOutput, + extProcRouteSettings *envoy_ext_proc_v3.ExtProcPerRoute, +) error { + if err := applyDefaults(aiConfig.Defaults, transformations); err != nil { + return err + } - upstreams := getAiUpstreams(pCtx.In.Backends) - if len(upstreams) != 0 { + if err := applyPromptEnrichment(aiConfig.PromptEnrichment, transformations); err != nil { + return err + } + if err := applyPromptGuard(aiConfig.PromptGuard, extProcRouteSettings); err != nil { + return err } return nil } -func getAiUpstreams(backends []ir.HttpBackend) []*ir.Upstream { - var upstreams []*ir.Upstream - for _, backend := range backends { - if backend.Backend.Upstream.ObjIr +func applyDefaults( + defaults []v1alpha1.FieldDefault, + transformations []*transformationWithOutput, +) error { + if len(defaults) == 0 { + return nil + } + for _, field := range defaults { + marshalled, err := json.Marshal(field.Value) + if err != nil { + return err + } + var tmpl string + if field.Override { + // Inja default function will use the default value if the field provided is falsey + tmpl = fmt.Sprintf("{{ default(%s, %s) }}", field.Value, string(marshalled)) + } else { + tmpl = string(marshalled) + } + for _, val := range transformations { + val.transformation.GetMergeJsonKeys().GetJsonKeys()[field.Field] = &envoytransformation.MergeJsonKeys_OverridableTemplate{ + Tmpl: &envoytransformation.InjaTemplate{Text: tmpl}, + } + } + } + return nil +} + +func applyPromptEnrichment( + pe *v1alpha1.AIPromptEnrichment, + transformations []*transformationWithOutput, +) error { + if pe == nil { + return nil + } + // This function does some slightly complex json string work because we're instructing the transformation filter + // to take the existing `messages` field and potentially prepend and append to it. + // JSON is insensitive to new lines, so we don't need to worry about them. We simply need to join the + // user added messages with the request messages + // For example: + // messages = [{"content": "welcopme ", "role": "user"}] + // prepend = [{"content": "hi", "role": "user"}] + // append = [{"content": "bye", "role": "user"}] + // Would result in: + // [{"content": "hi", "role": "user"}, {"content": "welcopme ", "role": "user"}, {"content": "bye", "role": "user"}] + bodyChunk1 := `[` + bodyChunk2 := `{{ join(messages, ", ") }}` + bodyChunk3 := `]` + + prependString := make([]string, 0, len(pe.Prepend)) + for _, toPrepend := range pe.Prepend { + prependString = append( + prependString, + fmt.Sprintf( + contextString, + toPrepend.Content, + strings.ToLower(strings.ToLower(toPrepend.Role)), + )+",", + ) + } + appendString := make([]string, 0, len(pe.Append)) + for idx, toAppend := range pe.Append { + formatted := fmt.Sprintf( + contextString, + toAppend.Content, + strings.ToLower(strings.ToLower(toAppend.Role)), + ) + if idx != len(pe.Append)-1 { + formatted += "," + } + appendString = append(appendString, formatted) + } + builder := &strings.Builder{} + builder.WriteString(bodyChunk1) + builder.WriteString(strings.Join(prependString, "")) + builder.WriteString(bodyChunk2) + if len(appendString) > 0 { + builder.WriteString(",") + builder.WriteString(strings.Join(appendString, "")) + } + builder.WriteString(bodyChunk3) + finalBody := builder.String() + // Overwrite the user messages body key with the templated version + for _, val := range transformations { + val.transformation.GetMergeJsonKeys().GetJsonKeys()["messages"] = &envoytransformation.MergeJsonKeys_OverridableTemplate{ + Tmpl: &envoytransformation.InjaTemplate{Text: finalBody}, + } + } + return nil +} + +func applyPromptGuard(pg *v1alpha1.AIPromptGuard, extProcRouteSettings *envoy_ext_proc_v3.ExtProcPerRoute) error { + if pg == nil { + return nil + } + if req := pg.Request; req != nil { + if mod := req.Moderation; mod != nil { + if mod.OpenAIModeration != nil { + token, err := getAuthToken(mod.OpenAIModeration.AuthToken) + if err != nil { + return err + } + mod.OpenAIModeration.AuthToken = &v1alpha1.SingleAuthToken{ + Inline: token, + } + } else { + // TODO: error, not supported + } + pg.Request.Moderation = mod + } + bin, err := json.Marshal(req) + if err != nil { + return err + } + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-req-guardrails-config", + Value: string(bin), + }, + ) + // Use this in the server to key per-route-config + // Better to do it here because we have generated functions + reqHash, _ := hashUnique(req, nil) + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-req-guardrails-config-hash", + Value: fmt.Sprint(reqHash), + }, + ) + } + + if resp := pg.Response; resp != nil { + // Resp needs to be defined in python ai extensions in the same format + bin, err := json.Marshal(resp) + if err != nil { + return err + } + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-resp-guardrails-config", + Value: string(bin), + }, + ) + // Use this in the server to key per-route-config + // Better to do it here because we have generated functions + respHash, _ := hashUnique(resp, nil) + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-resp-guardrails-config-hash", + Value: fmt.Sprint(respHash), + }, + ) + + } + return nil +} + +func getAuthToken(in *v1alpha1.SingleAuthToken) (token string, err error) { + switch in.Kind { + case v1alpha1.Inline: + token = in.Inline + case v1alpha1.SecretRef: + token, err = getTokenFromHeaderSecret(in.SecretRef) + } + return token, err +} + +// `getTokenFromHeaderSecret` retrieves the auth token from the secret reference. +// Currently, this function will return an error if there are more than one header in the secret +// as we do not know which one to select. +// In addition, this function will strip the "Bearer " prefix from the token as it will get conditionally +// added later depending on the provider. +func getTokenFromHeaderSecret(secretRef *v1.LocalObjectReference) (token string, err error) { + // TODO: get seret from resolved secrets + return "", err +} + +// hashUnique generates a hash of the struct that is unique to the object by +// hashing field name and value pairs +func hashUnique(obj interface{}, hasher hash.Hash64) (uint64, error) { + if obj == nil { + return 0, nil + } + if hasher == nil { + hasher = fnv.New64() + } + + val := reflect.ValueOf(obj) + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + typ := val.Type() + + // Write type name for consistency with proto implementation + _, err := hasher.Write([]byte(typ.PkgPath() + "/" + typ.Name())) + if err != nil { + return 0, err + } + + // Iterate through fields + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Write field name + if _, err := hasher.Write([]byte(fieldType.Name)); err != nil { + return 0, err + } + + // Handle nil pointer fields + if field.Kind() == reflect.Ptr && field.IsNil() { + continue + } + + // Get the actual value if it's a pointer + if field.Kind() == reflect.Ptr { + field = field.Elem() + } + + // Hash the field value + fieldValue, err := hashstructure.Hash(field.Interface(), nil) + if err != nil { + return 0, err + } + + // Write the hash to our hasher + if err := binary.Write(hasher, binary.LittleEndian, fieldValue); err != nil { + return 0, err + } + } + + return hasher.Sum64(), nil +} + +func getTransformationTemplateForUpstream(ctx context.Context, us *v1alpha1.Upstream, routePolicy *v1alpha1.AIRoutePolicy) *envoytransformation.TransformationTemplate { + // Setup initial transformation template. This may be modified by further + transformationTemplate := &envoytransformation.TransformationTemplate{ + // We will add the auth token later + Headers: map[string]*envoytransformation.InjaTemplate{}, + } + + var headerName, prefix, path string + var bodyTransformation *envoytransformation.TransformationTemplate_MergeJsonKeys + if us.Spec.AI.LLM != nil { + headerName, prefix, path, bodyTransformation = getTransformation(ctx, us.Spec.AI.LLM, routePolicy) + } else if us.Spec.AI.MultiPool != nil { + // We already know that all the backends are the same type so we can ust take the first one + llm := us.Spec.AI.MultiPool.Priorities[0].Pool[0] + headerName, prefix, path, bodyTransformation = getTransformation(ctx, &llm, routePolicy) + } + transformationTemplate.GetHeaders()[headerName] = &envoytransformation.InjaTemplate{ + Text: prefix + `{% if host_metadata("auth_token") != "" %}{{host_metadata("auth_token")}}{% else %}{{dynamic_metadata("auth_token","ai.gloo.solo.io")}}{% endif %}`, + } + transformationTemplate.GetHeaders()[":path"] = &envoytransformation.InjaTemplate{ + Text: path, + } + transformationTemplate.BodyTransformation = bodyTransformation + return transformationTemplate + +} + +func getTransformation(ctx context.Context, llm *v1alpha1.LLMProviders, routePolicy *v1alpha1.AIRoutePolicy) (string, string, string, *envoytransformation.TransformationTemplate_MergeJsonKeys) { + headerName := "Authorization" + var prefix, path string + var bodyTransformation *envoytransformation.TransformationTemplate_MergeJsonKeys + if llm.OpenAI != nil { + prefix = "Bearer " + path = "/v1/chat/completions" + bodyTransformation = defaultBodyTransformation() + } else if llm.Mistral != nil { + prefix = "Bearer " + path = "/v1/chat/completions" + bodyTransformation = defaultBodyTransformation() + } else if llm.Anthropic != nil { + headerName = "x-api-key" + path = "/v1/messages" + bodyTransformation = defaultBodyTransformation() + } else if llm.AzureOpenAI != nil { + headerName = "api-key" + path = `/openai/deployments/{{ host_metadata("model") }}/chat/completions?api-version={{ host_metadata("api_version" )}}` + } else if llm.Gemini != nil { + headerName = "key" + path = getGeminiPath(routePolicy) + } else if llm.VertexAI != nil { + prefix = "Bearer " + var modelPath string + modelCall := llm.VertexAI.ModelPath + if modelCall == "" { + switch llm.VertexAI.Publisher { + case v1alpha1.GOOGLE: + modelPath = getVertexAIGeminiModelPath(routePolicy) + default: + // TODO(npolshak): add support for other publishers + contextutils.LoggerFrom(ctx).Warnf("Unsupported Vertex AI publisher: %v. Defaulting to Google", llm.VertexAI.Publisher) + modelPath = getVertexAIGeminiModelPath(routePolicy) + } + } else { + // Use user provided model path + modelPath = fmt.Sprintf(`models/{{host_metadata("model")}}:%s`, modelCall) + } + // https://${LOCATION}-aiplatform.googleapis.com/{VERSION}/projects/${PROJECT_ID}/locations/${LOCATION}/ + path = fmt.Sprintf(`/{{host_metadata("api_version")}}/projects/{{host_metadata("project")}}/locations/{{host_metadata("location")}}/publishers/{{host_metadata("publisher")}}/%s`, modelPath) + } + return headerName, prefix, path, bodyTransformation +} + +func getGeminiPath(rtPolicy *v1alpha1.AIRoutePolicy) string { + generateContentPath := "generateContent" + streamParams := "" + if rtPolicy.RouteType == v1alpha1.CHAT_STREAMING { + generateContentPath = "streamGenerateContent" + streamParams = "&alt=sse" + } + return fmt.Sprintf(`/{{host_metadata("api_version")}}/models/{{host_metadata("model")}}:%s?key={{host_metadata("auth_token")}}%s`, generateContentPath, streamParams) +} + +func getVertexAIGeminiModelPath(rtPolicy *v1alpha1.AIRoutePolicy) string { + generateContentPath := "generateContent" + streamParams := "" + if rtPolicy.RouteType == v1alpha1.CHAT_STREAMING { + generateContentPath = "streamGenerateContent" + streamParams = "?alt=sse" + } + return fmt.Sprintf(`models/{{host_metadata("model")}}:%s%s`, generateContentPath, streamParams) +} + +func defaultBodyTransformation() *envoytransformation.TransformationTemplate_MergeJsonKeys { + return &envoytransformation.TransformationTemplate_MergeJsonKeys{ + MergeJsonKeys: &envoytransformation.MergeJsonKeys{ + JsonKeys: map[string]*envoytransformation.MergeJsonKeys_OverridableTemplate{ + "model": { + Tmpl: &envoytransformation.InjaTemplate{ + // Merge the model into the body + Text: `{% if host_metadata("model") != "" %}"{{host_metadata("model")}}"{% else %}"{{model}}"{% endif %}`, + }, + }, + }, + }, } - return upstreams } diff --git a/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go b/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go index bdef6e319b1..ea6c061ce8c 100644 --- a/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go +++ b/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go @@ -4,10 +4,13 @@ import ( "context" "time" - "github.com/solo-io/go-utils/contextutils" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" + envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" "istio.io/istio/pkg/kube/krt" @@ -38,8 +41,7 @@ func (d *routeOptsPlugin) Equals(in any) bool { return d.spec == d2.spec } -type routeOptsPluginGwPass struct { -} +type routeOptsPluginGwPass struct{} func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensionplug.Plugin { col := krtutil.SetupCollectionDynamic[v1alpha1.RoutePolicy]( @@ -102,6 +104,7 @@ func (p *routeOptsPluginGwPass) ApplyVhostPlugin(ctx context.Context, pCtx *ir.V } // called 0 or more times +// targetRef route -> apply to all routes func (p *routeOptsPluginGwPass) ApplyForRoute(ctx context.Context, pCtx *ir.RouteContext, outputRoute *envoy_config_route_v3.Route) error { policy, ok := pCtx.Policy.(*routeOptsPlugin) if !ok { @@ -113,21 +116,58 @@ func (p *routeOptsPluginGwPass) ApplyForRoute(ctx context.Context, pCtx *ir.Rout } if policy.spec.AI != nil { - err := processAIRoutePolicy(ctx, policy.spec.AI, outputRoute, pCtx) - if err != nil { - // TODO: report error on status - contextutils.LoggerFrom(ctx).Error(err) + return ir.ErrNotAttachable + } + + // TODO: err/warn/ignore if targetRef is set with AI options + + return nil +} + +// ApplyForBackend applies regardless if policy is attached +func (p *routeOptsPluginGwPass) ApplyForBackend( + ctx context.Context, + pCtx *ir.RouteBackendContext, + in ir.HttpBackend, + out *envoy_config_route_v3.Route, +) error { + if pCtx.AutoHostRewrite { + out.GetRoute().HostRewriteSpecifier = &envoy_config_route_v3.RouteAction_AutoHostRewrite{ + AutoHostRewrite: wrapperspb.Bool(true), } } return nil } +// A policy on weighted destination -> output is perFilterConfig +// Only runs with policy attached func (p *routeOptsPluginGwPass) ApplyForRouteBackend( ctx context.Context, policy ir.PolicyIR, pCtx *ir.RouteBackendContext, ) error { + extprocSettingsProto := pCtx.GetConfig(wellknown.ExtProcFilterName) + if extprocSettingsProto != nil { + return nil + } + extprocSettings, ok := extprocSettingsProto.(*envoy_ext_proc_v3.ExtProcPerRoute) + if !ok { + // TODO: internal error + return nil + } + routePolicy, ok := policy.(*routeOptsPlugin) + if !ok { + return nil + } + + err := processAIRoutePolicy(ctx, routePolicy.spec.AI, pCtx, extprocSettings) + if err != nil { + // TODO: report error on status + return err + } + pCtx.AddTypedConfig(wellknown.ExtProcFilterName, extprocSettings) + return nil } diff --git a/internal/kgateway/extensions2/plugins/upstream/ai.go b/internal/kgateway/extensions2/plugins/upstream/ai.go index f1ebc311ee6..a43272cb4de 100644 --- a/internal/kgateway/extensions2/plugins/upstream/ai.go +++ b/internal/kgateway/extensions2/plugins/upstream/ai.go @@ -14,13 +14,14 @@ import ( envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" "github.com/envoyproxy/go-control-plane/pkg/wellknown" - "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" - "github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils" "github.com/rotisserie/eris" "github.com/solo-io/go-utils/contextutils" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils" + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" ) diff --git a/internal/kgateway/extensions2/plugins/upstream/aws.go b/internal/kgateway/extensions2/plugins/upstream/aws.go index dab431742e9..f5e27d020ab 100644 --- a/internal/kgateway/extensions2/plugins/upstream/aws.go +++ b/internal/kgateway/extensions2/plugins/upstream/aws.go @@ -124,11 +124,7 @@ func (p *plugin2) processBackendAws( //UnwrapAsAlb: destination.GetUnwrapAsAlb(), //TransformerConfig: transformerConfig, } - lambdaRouteFuncAny, err := anypb.New(lambdaRouteFunc) - if err != nil { - return err - } - pCtx.AddTypedConfig(FilterName, lambdaRouteFuncAny) + pCtx.AddTypedConfig(FilterName, lambdaRouteFunc) return nil } diff --git a/internal/kgateway/extensions2/plugins/upstream/plugin.go b/internal/kgateway/extensions2/plugins/upstream/plugin.go index 605b1add5bf..c497bb14d62 100644 --- a/internal/kgateway/extensions2/plugins/upstream/plugin.go +++ b/internal/kgateway/extensions2/plugins/upstream/plugin.go @@ -7,16 +7,21 @@ import ( "maps" "time" + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/rotisserie/eris" "github.com/solo-io/go-utils/contextutils" + "google.golang.org/protobuf/types/known/wrapperspb" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" + envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" awspb "github.com/solo-io/envoy-gloo/go/config/filter/http/aws_lambda/v2" skubeclient "istio.io/istio/pkg/config/schema/kubeclient" "istio.io/istio/pkg/kube/kclient" @@ -300,7 +305,114 @@ func (p *plugin2) ApplyForRoute(ctx context.Context, pCtx *ir.RouteContext, outp // Run on upstream, regardless of policy (based on upstream gvk) // share route proto message -//func ApplyForBackend() +func (p *plugin2) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + upstream := pCtx.Upstream.Obj.(*v1alpha1.Upstream) + if upstream.Spec.AI != nil { + + // Setup ext-proc route filter config, we will conditionally modify it based on certain route options. + // A heavily used part of this config is the `GrpcInitialMetadata`. + // This is used to add headers to the ext-proc request. + // These headers are used to configure the AI server on a per-request basis. + // This was the best available way to pass per-route configuration to the AI server. + extProcRouteSettings := &envoy_ext_proc_v3.ExtProcPerRoute{ + Override: &envoy_ext_proc_v3.ExtProcPerRoute_Overrides{ + Overrides: &envoy_ext_proc_v3.ExtProcOverrides{}, + }, + } + + var llmModel string + byType := map[string]struct{}{} + aiUpstream := upstream.Spec.AI + if aiUpstream.LLM != nil { + llmModel = getUpstreamModel(aiUpstream.LLM, byType) + } else if aiUpstream.MultiPool != nil { + for _, priority := range aiUpstream.MultiPool.Priorities { + for _, pool := range priority.Pool { + llmModel = getUpstreamModel(&pool, byType) + } + } + } + + if len(byType) != 1 { + return eris.Errorf("multiple AI backend types found for single ai route %+v", byType) + } + + // This is only len(1) + var llmProvider string + for k := range byType { + llmProvider = k + } + + // Add things which require basic AI upstream. + pCtx.AddTypedConfig("AutoHostRewrite", wrapperspb.Bool(true)) + + // We only want to add the transformation filter if we have a single AI backend + // Otherwise we already have the transformation filter added by the weighted destination + // Setup initial transformation template. This may be modified by further AI RoutePolicy config. + //if _, ok := p.transformationsByRoute[in]; !ok { + // p.transformationsByRoute[in] = []*transformationWithOutput{ + // { + // // It's safe to use the first as they will all be of the same type at this point in the code + // transformation: getTransformationTemplateForUpstream(params.Ctx, aiUpstreams[0], in.GetOptions()), + // perFilterConfig: out.TypedPerFilterConfig, + // }, + // } + //} + + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-llm-provider", + Value: llmProvider, + }, + ) + // If the Upstream specifies a model, add a header to the ext-proc request + if llmModel != "" { + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-llm-model", + Value: llmModel, + }) + } + + // Add the x-request-id header to the ext-proc request. + // This is an optimization to allow us to not have to wait for the headers request to + // Initialize our logger/handler classes. + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-request-id", + Value: "%REQ(X-REQUEST-ID)%", + }, + ) + + pCtx.AddTypedConfig(wellknown.ExtProcFilterName, extProcRouteSettings) + } + + return nil +} + +func getUpstreamModel(llm *v1alpha1.LLMProviders, byType map[string]struct{}) string { + llmModel := "" + if llm.OpenAI != nil { + byType["openai"] = struct{}{} + llmModel = llm.OpenAI.Model + } else if llm.Mistral != nil { + byType["mistral"] = struct{}{} + llmModel = llm.Mistral.Model + } else if llm.Anthropic != nil { + byType["anthropic"] = struct{}{} + llmModel = llm.Anthropic.Model + } else if llm.AzureOpenAI != nil { + byType["azure_openai"] = struct{}{} + llmModel = llm.AzureOpenAI.DeploymentName + } else if llm.Gemini != nil { + byType["gemini"] = struct{}{} + llmModel = llm.Gemini.Model + } else if llm.VertexAI != nil { + byType["vertex-ai"] = struct{}{} + llmModel = llm.VertexAI.Model + } + return llmModel +} // Only called if policy attatched (extension ref) // Can implement in route policy for ai (prompt guard, etc.) diff --git a/internal/kgateway/ir/iface.go b/internal/kgateway/ir/iface.go index 121f28bfc87..f271394d33c 100644 --- a/internal/kgateway/ir/iface.go +++ b/internal/kgateway/ir/iface.go @@ -8,7 +8,7 @@ import ( envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - anypb "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/proto" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/plugins" @@ -24,15 +24,22 @@ type RouteBackendContext struct { FilterChainName string Upstream *Upstream // todo: make this not public - // TODO: or change this to be proto.mesage (do the any conversion -> get the extproc) - TypedFiledConfig *map[string]*anypb.Any + TypedFilterConfig *map[string]proto.Message + AutoHostRewrite bool } -func (r *RouteBackendContext) AddTypedConfig(key string, v *anypb.Any) { - if *r.TypedFiledConfig == nil { - *r.TypedFiledConfig = make(map[string]*anypb.Any) +func (r *RouteBackendContext) AddTypedConfig(key string, v proto.Message) { + if *r.TypedFilterConfig == nil { + *r.TypedFilterConfig = make(map[string]proto.Message) } - (*r.TypedFiledConfig)[key] = v + (*r.TypedFilterConfig)[key] = v +} + +func (r *RouteBackendContext) GetConfig(key string) proto.Message { + if *r.TypedFilterConfig == nil { + return nil + } + return (*r.TypedFilterConfig)[key] } type RouteContext struct { @@ -59,11 +66,20 @@ type ProxyTranslationPass interface { ctx context.Context, pCtx *RouteContext, out *envoy_config_route_v3.Route) error + // runs for policy applied ApplyForRouteBackend( ctx context.Context, policy PolicyIR, pCtx *RouteBackendContext, ) error + // no policy applied + ApplyForBackend( + ctx context.Context, + pCtx *RouteBackendContext, + in HttpBackend, + out *envoy_config_route_v3.Route, + ) error + // called 1 time per listener // if a plugin emits new filters, they must be with a plugin unique name. // any filter returned from route config must be disabled, so it doesnt impact other routes. diff --git a/internal/kgateway/ir/upstream.go b/internal/kgateway/ir/upstream.go index ab6bac04df4..e68e49ef1b4 100644 --- a/internal/kgateway/ir/upstream.go +++ b/internal/kgateway/ir/upstream.go @@ -54,7 +54,7 @@ type Upstream struct { // optional port for if ObjectSource is a service that can have multiple ports. Port int32 - // prefix the cluster name with this string to distringuish it from other GVKs. + // prefix the cluster name with this string to distinguish it from other GVKs. // here explicitly as it shows up in stats. each (group, kind) pair should have a unique prefix. GvPrefix string // for things that integrate with destination rule, we need to know what hostname to use. @@ -66,9 +66,6 @@ type Upstream struct { // i think so, assuming obj -> objir is a 1:1 mapping. ObjIr interface{ Equals(any) bool } - // AI config - LLM string - AttachedPolicies AttachedPolicies } diff --git a/internal/kgateway/krtcollections/builtin.go b/internal/kgateway/krtcollections/builtin.go index f82ac351a64..e07fb2194ef 100644 --- a/internal/kgateway/krtcollections/builtin.go +++ b/internal/kgateway/krtcollections/builtin.go @@ -57,6 +57,11 @@ func (d *builtinPlugin) Equals(in any) bool { type builtinPluginGwPass struct { } +func (p *builtinPluginGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func NewBuiltInIr(kctx krt.HandlerContext, f gwv1.HTTPRouteFilter, fromgk schema.GroupKind, fromns string, refgrants *RefGrantIndex, ups *UpstreamIndex) ir.PolicyIR { return &builtinPlugin{ spec: f, diff --git a/internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml b/internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml deleted file mode 100644 index 8835d7aeaac..00000000000 --- a/internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml +++ /dev/null @@ -1,150 +0,0 @@ -clusters: -- connectTimeout: 5s - edsClusterConfig: - edsConfig: - ads: {} - resourceApiVersion: V3 - metadata: {} - name: kube_default_kubernetes_443 - transportSocketMatches: - - match: - tlsMode: istio - name: tlsMode-istio - transportSocket: - name: envoy.transport_sockets.tls - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - commonTlsContext: - alpnProtocols: - - istio - tlsCertificateSdsSecretConfigs: - - name: istio_server_cert - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - tlsParams: {} - validationContextSdsSecretConfig: - name: istio_validation_context - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - sni: outbound_.443_._.kubernetes.default.svc.cluster.local - - match: {} - name: tlsMode-disabled - transportSocket: - name: envoy.transport_sockets.raw_buffer - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer - type: EDS -- connectTimeout: 5s - dnsLookupFamily: V4_ONLY - loadAssignment: - clusterName: upstream_gwtest_anthropic_0 - endpoints: - - lbEndpoints: - - endpoint: - address: - socketAddress: - address: api.anthropic.com - portValue: 443 - hostname: api.anthropic.com - metadata: - filterMetadata: - envoy.transport_socket_match: - tls: api.anthropic.com - io.solo.transformation: - auth_token: "" - metadata: {} - name: upstream_gwtest_anthropic_0 - transportSocketMatches: - - match: - tlsMode: istio - name: tlsMode-istio - transportSocket: - name: envoy.transport_sockets.tls - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - commonTlsContext: - alpnProtocols: - - istio - tlsCertificateSdsSecretConfigs: - - name: istio_server_cert - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - tlsParams: {} - validationContextSdsSecretConfig: - name: istio_validation_context - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - - match: {} - name: tlsMode-disabled - transportSocket: - name: envoy.transport_sockets.raw_buffer - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer - type: STRICT_DNS -listeners: -- address: - socketAddress: - address: '::' - ipv4Compat: true - portValue: 8080 - filterChains: - - filters: - - name: envoy.filters.network.http_connection_manager - typedConfig: - '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - httpFilters: - - name: envoy.filters.http.router - typedConfig: - '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router - mergeSlashes: true - normalizePath: true - rds: - configSource: - ads: {} - resourceApiVersion: V3 - routeConfigName: http - statPrefix: http - name: http - name: http -routes: -- ignorePortInHostMatching: true - name: http - virtualHosts: - - domains: - - test - name: http~test - routes: - - match: - prefix: / - name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 - route: - cluster: upstream_gwtest_anthropic_0 - clusterNotFoundResponseCode: INTERNAL_SERVER_ERROR diff --git a/internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml b/internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml new file mode 100644 index 00000000000..162d5d955ed --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml @@ -0,0 +1,64 @@ +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: http-gw-for-test + namespace: gwtest +spec: + gatewayClassName: kgateway + listeners: + - protocol: HTTP + port: 8080 + name: http + allowedRoutes: + namespaces: + from: All +--- +apiVersion: gloo.solo.io/v1 +kind: Upstream +metadata: + labels: + app: kgateway + name: anthropic + namespace: gwtest +spec: + ai: + anthropic: + authToken: + kind: "Passthrough" +--- +apiVersion: gateway.networking.k8s.io/v1beta1 +kind: HTTPRoute +metadata: + name: route-to-upstream + namespace: gwtest +spec: + parentRefs: + - name: http-gw-for-test + hostnames: + - "test" + rules: + - matches: + - path: + type: Exact + value: /v1/chat/completions + - backendRefs: + - name: anthropic + kind: Upstream + group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev/v1alpha1 + kind: RoutePolicy + name: route-test +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: RoutePolicy +metadata: + name: route-test + namespace: gwtest +spec: + ai: + defaults: + - field: "temperature" + value: "0.5" \ No newline at end of file diff --git a/internal/kgateway/setup/testdata/ai-prompt-guard-out.yaml b/internal/kgateway/setup/testdata/ai-prompt-guard-out.yaml deleted file mode 100644 index e4a640c7b76..00000000000 --- a/internal/kgateway/setup/testdata/ai-prompt-guard-out.yaml +++ /dev/null @@ -1,150 +0,0 @@ -clusters: -- connectTimeout: 5s - edsClusterConfig: - edsConfig: - ads: {} - resourceApiVersion: V3 - metadata: {} - name: kube_default_kubernetes_443 - transportSocketMatches: - - match: - tlsMode: istio - name: tlsMode-istio - transportSocket: - name: envoy.transport_sockets.tls - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - commonTlsContext: - alpnProtocols: - - istio - tlsCertificateSdsSecretConfigs: - - name: istio_server_cert - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - tlsParams: {} - validationContextSdsSecretConfig: - name: istio_validation_context - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - sni: outbound_.443_._.kubernetes.default.svc.cluster.local - - match: {} - name: tlsMode-disabled - transportSocket: - name: envoy.transport_sockets.raw_buffer - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer - type: EDS -- connectTimeout: 5s - dnsLookupFamily: V4_ONLY - loadAssignment: - clusterName: upstream_gwtest_openai_0 - endpoints: - - lbEndpoints: - - endpoint: - address: - socketAddress: - address: api.openai.com - portValue: 443 - hostname: api.openai.com - metadata: - filterMetadata: - envoy.transport_socket_match: - tls: api.openai.com - io.solo.transformation: - auth_token: mysecretkey - metadata: {} - name: upstream_gwtest_openai_0 - transportSocketMatches: - - match: - tlsMode: istio - name: tlsMode-istio - transportSocket: - name: envoy.transport_sockets.tls - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - commonTlsContext: - alpnProtocols: - - istio - tlsCertificateSdsSecretConfigs: - - name: istio_server_cert - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - tlsParams: {} - validationContextSdsSecretConfig: - name: istio_validation_context - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - - match: {} - name: tlsMode-disabled - transportSocket: - name: envoy.transport_sockets.raw_buffer - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer - type: STRICT_DNS -listeners: -- address: - socketAddress: - address: '::' - ipv4Compat: true - portValue: 8080 - filterChains: - - filters: - - name: envoy.filters.network.http_connection_manager - typedConfig: - '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - httpFilters: - - name: envoy.filters.http.router - typedConfig: - '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router - mergeSlashes: true - normalizePath: true - rds: - configSource: - ads: {} - resourceApiVersion: V3 - routeConfigName: http - statPrefix: http - name: http - name: http -routes: -- ignorePortInHostMatching: true - name: http - virtualHosts: - - domains: - - test - name: http~test - routes: - - match: - prefix: / - name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 - route: - cluster: upstream_gwtest_openai_0 - clusterNotFoundResponseCode: INTERNAL_SERVER_ERROR diff --git a/internal/kgateway/setup/testdata/ai-prompt-guard.yaml b/internal/kgateway/setup/testdata/ai-prompt-guard.yaml index 114a36e83e6..37e002c7e59 100644 --- a/internal/kgateway/setup/testdata/ai-prompt-guard.yaml +++ b/internal/kgateway/setup/testdata/ai-prompt-guard.yaml @@ -57,6 +57,12 @@ spec: - name: openai kind: Upstream group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev/v1alpha1 + kind: RoutePolicy + name: route-test --- apiVersion: gateway.kgateway.dev/v1alpha1 kind: RoutePolicy @@ -64,10 +70,6 @@ metadata: name: route-test namespace: gwtest spec: - targetRef: - group: gateway.networking.k8s.io - kind: HTTPRoute - name: openai ai: promptEnrichment: prepend: diff --git a/internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml deleted file mode 100644 index d5ebb4cb962..00000000000 --- a/internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml +++ /dev/null @@ -1,155 +0,0 @@ -clusters: -- connectTimeout: 5s - edsClusterConfig: - edsConfig: - ads: {} - resourceApiVersion: V3 - metadata: {} - name: kube_default_kubernetes_443 - transportSocketMatches: - - match: - tlsMode: istio - name: tlsMode-istio - transportSocket: - name: envoy.transport_sockets.tls - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - commonTlsContext: - alpnProtocols: - - istio - tlsCertificateSdsSecretConfigs: - - name: istio_server_cert - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - tlsParams: {} - validationContextSdsSecretConfig: - name: istio_validation_context - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - sni: outbound_.443_._.kubernetes.default.svc.cluster.local - - match: {} - name: tlsMode-disabled - transportSocket: - name: envoy.transport_sockets.raw_buffer - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer - type: EDS -- connectTimeout: 5s - dnsLookupFamily: V4_ONLY - loadAssignment: - clusterName: upstream_gwtest_vertexai_0 - endpoints: - - lbEndpoints: - - endpoint: - address: - socketAddress: - address: us-central1-aiplatform.googleapis.com - portValue: 443 - hostname: us-central1-aiplatform.googleapis.com - metadata: - filterMetadata: - envoy.transport_socket_match: - tls: us-central1-aiplatform.googleapis.com - io.solo.transformation: - api_version: v1 - auth_token: mysecretkey - location: us-central1 - model: gemini-1.5-flash-001 - project: gloo-ee - publisher: google - metadata: {} - name: upstream_gwtest_vertexai_0 - transportSocketMatches: - - match: - tlsMode: istio - name: tlsMode-istio - transportSocket: - name: envoy.transport_sockets.tls - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - commonTlsContext: - alpnProtocols: - - istio - tlsCertificateSdsSecretConfigs: - - name: istio_server_cert - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - tlsParams: {} - validationContextSdsSecretConfig: - name: istio_validation_context - sdsConfig: - apiConfigSource: - apiType: GRPC - grpcServices: - - envoyGrpc: - clusterName: gateway_proxy_sds - setNodeOnFirstMessageOnly: true - transportApiVersion: V3 - resourceApiVersion: V3 - - match: {} - name: tlsMode-disabled - transportSocket: - name: envoy.transport_sockets.raw_buffer - typedConfig: - '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer - type: STRICT_DNS -listeners: -- address: - socketAddress: - address: '::' - ipv4Compat: true - portValue: 8080 - filterChains: - - filters: - - name: envoy.filters.network.http_connection_manager - typedConfig: - '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - httpFilters: - - name: envoy.filters.http.router - typedConfig: - '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router - mergeSlashes: true - normalizePath: true - rds: - configSource: - ads: {} - resourceApiVersion: V3 - routeConfigName: http - statPrefix: http - name: http - name: http -routes: -- ignorePortInHostMatching: true - name: http - virtualHosts: - - domains: - - test - name: http~test - routes: - - match: - prefix: / - name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 - route: - cluster: upstream_gwtest_vertexai_0 - clusterNotFoundResponseCode: INTERNAL_SERVER_ERROR diff --git a/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml index eb905a5a9bc..ac609ffc401 100644 --- a/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml +++ b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml @@ -32,6 +32,12 @@ spec: - name: vertexai kind: Upstream group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev/v1alpha1 + kind: RoutePolicy + name: route-test --- apiVersion: gateway.kgateway.dev/v1alpha1 kind: RoutePolicy @@ -39,10 +45,6 @@ metadata: name: route-test namespace: gwtest spec: - targetRef: - group: gateway.networking.k8s.io - kind: HTTPRoute - name: vertexai ai: routeType: CHAT_STREAMING --- diff --git a/internal/kgateway/translator/irtranslator/route.go b/internal/kgateway/translator/irtranslator/route.go index 573956d8274..f15b915e060 100644 --- a/internal/kgateway/translator/irtranslator/route.go +++ b/internal/kgateway/translator/irtranslator/route.go @@ -11,6 +11,8 @@ import ( envoy_type_matcher_v3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" "github.com/solo-io/go-utils/contextutils" "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" gwv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -123,9 +125,9 @@ func (h *httpRouteConfigurationTranslator) envoyRoutes(ctx context.Context, out := h.initRoutes(in, generatedName) if len(in.Backends) > 0 { - out.Action = h.translateRouteAction(in, out) + out.Action = h.translateRouteAction(ctx, in, out) } - // run plugins here that may set actoin + // run plugins here that may set action err := h.runRoutePlugins(ctx, routeReport, in, out) if err == nil { @@ -259,7 +261,21 @@ func (h *httpRouteConfigurationTranslator) runBackendPolicies(ctx context.Contex return errors.Join(errs...) } +func (h *httpRouteConfigurationTranslator) runBackend(ctx context.Context, in ir.HttpBackend, pCtx *ir.RouteBackendContext, outRoute *envoy_config_route_v3.Route) error { + var errs []error + if in.Backend.Upstream != nil { + // TODO: fix + err := h.PluginPass[in.Backend.Upstream.GetGroupKind()].ApplyForBackend(ctx, pCtx, in, outRoute) + if err != nil { + errs = append(errs, err) + } + } + // TODO: check return value, if error returned, log error and report condition + return errors.Join(errs...) +} + func (h *httpRouteConfigurationTranslator) translateRouteAction( + ctx context.Context, in ir.HttpRouteRuleMatchIR, outRoute *envoy_config_route_v3.Route, ) *envoy_config_route_v3.Route_Route { @@ -274,17 +290,48 @@ func (h *httpRouteConfigurationTranslator) translateRouteAction( Name: clusterName, Weight: wrapperspb.UInt32(backend.Backend.Weight), } + + typedPerFilterConfig := map[string]proto.Message{} + pCtx := ir.RouteBackendContext{ - FilterChainName: h.fc.FilterChainName, - Upstream: backend.Backend.Upstream, - TypedFiledConfig: &cw.TypedPerFilterConfig, + FilterChainName: h.fc.FilterChainName, + Upstream: backend.Backend.Upstream, + TypedFilterConfig: &typedPerFilterConfig, + } + + err := h.runBackendPolicies( + ctx, + backend, + &pCtx, + ) + if err != nil { + // TODO: error on status + contextutils.LoggerFrom(ctx).Error(err) } - h.runBackendPolicies( - context.TODO(), + // non attached policy translation + err = h.runBackend( + ctx, backend, &pCtx, + outRoute, ) + if err != nil { + // TODO: error on status + contextutils.LoggerFrom(ctx).Error(err) + } + + typedPerFilterConfigAny := map[string]*anypb.Any{} + for k, v := range typedPerFilterConfig { + config, err := utils.MessageToAny(v) + if err != nil { + // TODO: error on status + contextutils.LoggerFrom(ctx).Error(err) + continue + } + typedPerFilterConfigAny[k] = config + } + cw.TypedPerFilterConfig = typedPerFilterConfigAny clusters = append(clusters, cw) } diff --git a/internal/kgateway/wellknown/constants.go b/internal/kgateway/wellknown/constants.go index 79480cdae9c..183d9dccfb4 100644 --- a/internal/kgateway/wellknown/constants.go +++ b/internal/kgateway/wellknown/constants.go @@ -26,3 +26,9 @@ const ( SdsClusterName = "gateway_proxy_sds" SdsTargetURI = "127.0.0.1:8234" ) + +const ( + TransformationFilterName = "ai.transformation.kgateway.io" + ExtProcFilterName = "ai.extproc.kgateway.io" + SetMetadataFilterName = "envoy.filters.http.set_filter_state" +)