Skip to content

Commit

Permalink
Improve virtual workspace-aware OpenAPI caching
Browse files Browse the repository at this point in the history
Signed-off-by: Marko Mudrinić <[email protected]>
  • Loading branch information
xmudrii committed Feb 11, 2025
1 parent 98ef245 commit 255aaf1
Showing 1 changed file with 121 additions and 73 deletions.
194 changes: 121 additions & 73 deletions pkg/virtual/framework/dynamic/apiserver/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/kube-openapi/pkg/common/restfuladapter"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/utils/keymutex"
"k8s.io/utils/lru"

"github.com/kcp-dev/kcp/pkg/virtual/framework/dynamic/apidefinition"
Expand All @@ -53,6 +54,12 @@ const (
DefaultServiceCacheSize = 50
)

type openAPIServiceItem struct {
done chan struct{}
err error
service http.Handler
}

// openAPIHandler implements the OpenAPI v3 handler for virtual workspaces.
//
// This handler generates the OpenAPI v3 spec by:
Expand All @@ -74,6 +81,7 @@ type openAPIHandler struct {
delegate http.Handler

services *lru.Cache
servicesLock keymutex.KeyMutex
openAPIV3Service *handler3.OpenAPIService
}

Expand All @@ -92,6 +100,7 @@ func newOpenAPIHandler(
delegate: delegate,

services: lru.New(serviceCacheSize),
servicesLock: keymutex.NewHashed(0),
openAPIV3Service: openAPIV3Service,
}
}
Expand Down Expand Up @@ -122,101 +131,140 @@ func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
log = log.WithValues("key", key)

// We want to make sure we don't generate OpenAPI v3 spec if not necessary because it's an expensive operation.
// Putting the generated OpenAPI v3 service in a cache using the configuration key generated above solves this in
// the most of cases.
// However, it doesn't cover the case where we have two concurrent requests with the same caching key, in which
// we would generate the OpenAPI v3 spec two times for the same key (if the spec is not cached already).
// That's why we have a "double-locking mechanism":
// 1) we store a special struct in a cache that contains a channel and an OpenAPI v3 service; if the channel is
// closed, we know that we generated the OpenAPI v3 spec for that key and we can just serve it
// 2) that struct is supposed to be a singleton for a given caching key, and because of that, we need some
// guarantee that no two threads are going to try to create an instance of that struct for the same key
// (that's why we have servicesLock)
o.servicesLock.LockKey(key)
entry, ok := o.services.Get(key)
if !ok {
log.V(7).Info("Generating OpenAPI v3 specs")

// Collect routes registered in the virtual workspace API server: /api, /apis, and /version.
// Subroutes (e.g. /apis/<group> and /apis/<group>/<version>) are not included and are handled separately below.
webservices := make(map[string][]*restful.WebService)
for _, t := range o.goRestfulContainer.RegisteredWebServices() {
// Strip the "/" prefix from the name
gvPath := t.RootPath()[1:]
webservices[gvPath] = []*restful.WebService{t}
var item *openAPIServiceItem

// If we found the struct, and it's channel is closed, and the spec was generated successfully, serve the already
// generated spec, otherwise regenerate it.
if ok {
o.servicesLock.UnlockKey(key)
item = entry.(*openAPIServiceItem)
_, ok := <-item.done
if !ok && item != nil && item.err == nil {
item.service.ServeHTTP(w, req)
return
}
}

// Create web services for /apis/<group>
versionsForDiscoveryMap := make(map[schema.GroupVersion][]metav1.GroupVersionForDiscovery)
for gvr := range apiSet {
if gvr.Group == "" {
continue
}
gv := schema.GroupVersion{
Group: gvr.Group,
Version: gvr.Version,
}

if versionsForDiscoveryMap[gv] == nil {
versionsForDiscoveryMap[gv] = []metav1.GroupVersionForDiscovery{}
}

versionsForDiscoveryMap[gv] = append(versionsForDiscoveryMap[gv], metav1.GroupVersionForDiscovery{
GroupVersion: gvr.GroupVersion().String(),
Version: gvr.Version,
})
item = &openAPIServiceItem{
done: make(chan struct{}, 1),
}
o.services.Add(key, item)
o.servicesLock.UnlockKey(key)

log.V(7).Info("Generating OpenAPI v3 specs")

// Collect routes registered in the virtual workspace API server: /api, /apis, and /version.
// Subroutes (e.g. /apis/<group> and /apis/<group>/<version>) are not included and are handled separately below.
webservices := make(map[string][]*restful.WebService)
for _, t := range o.goRestfulContainer.RegisteredWebServices() {
// Strip the "/" prefix from the name
gvPath := t.RootPath()[1:]
webservices[gvPath] = []*restful.WebService{t}
}

// Create web services for /apis/<group>
versionsForDiscoveryMap := make(map[schema.GroupVersion][]metav1.GroupVersionForDiscovery)
for gvr := range apiSet {
if gvr.Group == "" {
continue
}
for gv := range versionsForDiscoveryMap {
apiGroup := metav1.APIGroup{
Name: gv.Group,
Versions: versionsForDiscoveryMap[gv],
// the preferred versions for a group is the first item in
// apiVersionsForDiscovery after it put in the right ordered
PreferredVersion: versionsForDiscoveryMap[gv][0],
}

groupPath := "apis/" + gv.Group
webservices[groupPath] = append(webservices[groupPath], discovery.NewAPIGroupHandler(codecs, apiGroup).WebService())
gv := schema.GroupVersion{
Group: gvr.Group,
Version: gvr.Version,
}

// Build OpenAPI v3 spec for /apis/<group> webservices
routeSpecs := make(map[string][]*spec3.OpenAPI)
for groupPath, ws := range webservices {
spec, err := builder3.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(ws), o.openapiv3Config)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
routeSpecs[groupPath] = []*spec3.OpenAPI{spec}
if versionsForDiscoveryMap[gv] == nil {
versionsForDiscoveryMap[gv] = []metav1.GroupVersionForDiscovery{}
}

// Build OpenAPI v3 specs for /apis/<group>/<version>, i.e. build OpenAPI v3 specs for each APIDefinition
resourceSpecs := make([]map[string][]*spec3.OpenAPI, 0, len(apiSet))
for _, apiDefinition := range apiSet {
specs, err := apiResourceSchemaToSpec(apiDefinition.GetAPIResourceSchema())
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
resourceSpecs = append(resourceSpecs, specs)
versionsForDiscoveryMap[gv] = append(versionsForDiscoveryMap[gv], metav1.GroupVersionForDiscovery{
GroupVersion: gvr.GroupVersion().String(),
Version: gvr.Version,
})
}
for gv := range versionsForDiscoveryMap {
apiGroup := metav1.APIGroup{
Name: gv.Group,
Versions: versionsForDiscoveryMap[gv],
// the preferred versions for a group is the first item in
// apiVersionsForDiscovery after it put in the right ordered
PreferredVersion: versionsForDiscoveryMap[gv][0],
}

// Create a new OpenAPI service
log.V(7).Info("Creating new OpenAPI v3 service")
// log.V(7).Info("Reusing OpenAPI v3 service from cache")
m := mux.NewPathRecorderMux("virtual-workspace-openapi-v3")
service := handler3.NewOpenAPIService()
err = service.RegisterOpenAPIV3VersionedService("/openapi/v3", m)
groupPath := "apis/" + gv.Group
webservices[groupPath] = append(webservices[groupPath], discovery.NewAPIGroupHandler(codecs, apiGroup).WebService())
}

// Build OpenAPI v3 spec for /apis/<group> webservices
routeSpecs := make(map[string][]*spec3.OpenAPI)
for groupPath, ws := range webservices {
spec, err := builder3.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(ws), o.openapiv3Config)
if err != nil {
item.err = err
close(item.done)

responsewriters.InternalError(w, req, err)
return
}
routeSpecs[groupPath] = []*spec3.OpenAPI{spec}
}

// Add all OpenAPI v3 specs to the OpenAPI service
err = addSpecs(service, routeSpecs, resourceSpecs, log)
// Build OpenAPI v3 specs for /apis/<group>/<version>, i.e. build OpenAPI v3 specs for each APIDefinition
resourceSpecs := make([]map[string][]*spec3.OpenAPI, 0, len(apiSet))
for _, apiDefinition := range apiSet {
specs, err := apiResourceSchemaToSpec(apiDefinition.GetAPIResourceSchema())
if err != nil {
item.err = err
close(item.done)

responsewriters.InternalError(w, req, err)
return
}
resourceSpecs = append(resourceSpecs, specs)
}

// Save the service in the cache
o.services.Add(key, m)
entry = m
} else {
log.V(7).Info("Reusing OpenAPI v3 service from cache")
// Create a new OpenAPI service
log.V(7).Info("Creating new OpenAPI v3 service")
// log.V(7).Info("Reusing OpenAPI v3 service from cache")
m := mux.NewPathRecorderMux("virtual-workspace-openapi-v3")
service := handler3.NewOpenAPIService()
err = service.RegisterOpenAPIV3VersionedService("/openapi/v3", m)
if err != nil {
item.err = err
close(item.done)

responsewriters.InternalError(w, req, err)
return
}

// Add all OpenAPI v3 specs to the OpenAPI service
err = addSpecs(service, routeSpecs, resourceSpecs, log)
if err != nil {
item.err = err
close(item.done)

responsewriters.InternalError(w, req, err)
return
}

service := entry.(http.Handler)
service.ServeHTTP(w, req)
// Save the service in the cache
item.service = m
close(item.done)
item.service.ServeHTTP(w, req)
}

func apiResourceSchemaToSpec(apiResourceSchema *apisv1alpha1.APIResourceSchema) (map[string][]*spec3.OpenAPI, error) {
Expand Down

0 comments on commit 255aaf1

Please sign in to comment.