From 653ca381aa60a5ce1756a282eb8260c27524b709 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Tue, 12 Dec 2023 22:38:50 +0000 Subject: [PATCH 1/2] Include cache in director's service discovery endpoint --- director/director_api.go | 14 +---- director/redirect.go | 25 ++++---- director/redirect_test.go | 128 ++++++++++++++++++++++++++------------ web_ui/prometheus.go | 16 ++--- 4 files changed, 110 insertions(+), 73 deletions(-) diff --git a/director/director_api.go b/director/director_api.go index 092e30407..d38f1ccba 100644 --- a/director/director_api.go +++ b/director/director_api.go @@ -148,7 +148,7 @@ func VerifyDirectorSDToken(strToken string) (bool, error) { } // Create a token for director's Prometheus scraper to access discovered -// origins /metrics endpoint. This function is intended to be called on +// origins and caches `/metrics` endpoint. This function is intended to be called on // a director server func CreateDirectorScrapeToken() (string, error) { // We assume this function is only called on a director server, @@ -156,19 +156,9 @@ func CreateDirectorScrapeToken() (string, error) { directorURL := param.Server_ExternalWebUrl.GetString() tokenExpireTime := param.Monitoring_TokenExpiresIn.GetDuration() - ads := ListServerAds([]ServerType{OriginType, CacheType}) - aud := make([]string, 0) - for _, ad := range ads { - if ad.WebURL.String() != "" { - aud = append(aud, ad.WebURL.String()) - } - } - tok, err := jwt.NewBuilder(). Claim("scope", "monitoring.scrape"). - Issuer(directorURL). - // The audience of this token is all origins/caches that have WebURL set in their serverAds - Audience(aud). + Issuer(directorURL). // Exclude audience from token to prevent http header overflow Subject("director"). Expiration(time.Now().Add(tokenExpireTime)). Build() diff --git a/director/redirect.go b/director/redirect.go index e183b040e..29ad956b5 100644 --- a/director/redirect.go +++ b/director/redirect.go @@ -452,9 +452,9 @@ func registerServeAd(ctx *gin.Context, sType ServerType) { ctx.JSON(200, gin.H{"msg": "Successful registration"}) } -// Return a list of available origins URL in Prometheus HTTP SD format +// Return a list of registered origins and caches in Prometheus HTTP SD format // for director's Prometheus service discovery -func DiscoverOrigins(ctx *gin.Context) { +func DiscoverOriginCache(ctx *gin.Context) { // Check token for authorization tokens, present := ctx.Request.Header["Authorization"] if !present || len(tokens) == 0 { @@ -479,24 +479,21 @@ func DiscoverOrigins(ctx *gin.Context) { serverAds := serverAds.Keys() promDiscoveryRes := make([]PromDiscoveryItem, 0) for _, ad := range serverAds { - // We don't include caches in this discovery for right now - if ad.Type != OriginType { - continue - } if ad.WebURL.String() == "" { - // Oririgns fetched from topology can't be scraped as they + // Oririgns and caches fetched from topology can't be scraped as they // don't have a WebURL continue } promDiscoveryRes = append(promDiscoveryRes, PromDiscoveryItem{ Targets: []string{ad.WebURL.Hostname() + ":" + ad.WebURL.Port()}, Labels: map[string]string{ - "origin_name": ad.Name, - "origin_auth_url": ad.AuthURL.String(), - "origin_url": ad.URL.String(), - "origin_web_url": ad.WebURL.String(), - "origin_lat": fmt.Sprintf("%.4f", ad.Latitude), - "origin_long": fmt.Sprintf("%.4f", ad.Longitude), + "server_type": string(ad.Type), + "server_name": ad.Name, + "server_auth_url": ad.AuthURL.String(), + "server_url": ad.URL.String(), + "server_web_url": ad.WebURL.String(), + "server_lat": fmt.Sprintf("%.4f", ad.Latitude), + "server_long": fmt.Sprintf("%.4f", ad.Longitude), }, }) } @@ -522,7 +519,7 @@ func RegisterDirector(router *gin.RouterGroup) { router.GET("/api/v1.0/director/object/*any", RedirectToCache) router.GET("/api/v1.0/director/origin/*any", RedirectToOrigin) router.POST("/api/v1.0/director/registerOrigin", RegisterOrigin) - router.GET("/api/v1.0/director/discoverOrigins", DiscoverOrigins) + router.GET("/api/v1.0/director/discoverOrigins", DiscoverOriginCache) router.POST("/api/v1.0/director/registerCache", RegisterCache) router.GET("/api/v1.0/director/listNamespaces", ListNamespaces) } diff --git a/director/redirect_test.go b/director/redirect_test.go index fd5cabceb..3f5f6b605 100644 --- a/director/redirect_test.go +++ b/director/redirect_test.go @@ -24,6 +24,7 @@ import ( "github.com/pelicanplatform/pelican/config" "github.com/spf13/viper" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type MockCache struct { @@ -314,9 +315,9 @@ func TestGetAuthzEscaped(t *testing.T) { assert.Equal(t, escapedToken, "tokenstring") } -func TestDiscoverOrigins(t *testing.T) { +func TestDiscoverOriginCache(t *testing.T) { mockPelicanOriginServerAd := ServerAd{ - Name: "test-origin-server", + Name: "1-test-origin-server", AuthURL: url.URL{}, URL: url.URL{ Scheme: "https", @@ -332,11 +333,11 @@ func TestDiscoverOrigins(t *testing.T) { } mockTopoOriginServerAd := ServerAd{ - Name: "test-origin-server", + Name: "test-topology-origin-server", AuthURL: url.URL{}, URL: url.URL{ Scheme: "https", - Host: "fake-origin.org:8443", + Host: "fake-topology-origin.org:8443", }, Type: OriginType, Latitude: 123.05, @@ -344,9 +345,13 @@ func TestDiscoverOrigins(t *testing.T) { } mockCacheServerAd := ServerAd{ - Name: "test-cache-server", + Name: "2-test-cache-server", AuthURL: url.URL{}, URL: url.URL{ + Scheme: "https", + Host: "fake-cache.org:8443", + }, + WebURL: url.URL{ Scheme: "https", Host: "fake-cache.org:8444", }, @@ -412,8 +417,33 @@ func TestDiscoverOrigins(t *testing.T) { return signed } + areSlicesEqualIgnoreOrder := func(slice1, slice2 []PromDiscoveryItem) bool { + if len(slice1) != len(slice2) { + return false + } + + counts := make(map[string]int) + + for _, item := range slice1 { + bytes, err := json.Marshal(item) + require.NoError(t, err) + counts[string(bytes)]++ + } + + for _, item := range slice2 { + bytes, err := json.Marshal(item) + require.NoError(t, err) + counts[string(bytes)]-- + if counts[string(bytes)] < 0 { + return false + } + } + + return true + } + r := gin.Default() - r.GET("/test", DiscoverOrigins) + r.GET("/test", DiscoverOriginCache) t.Run("no-token-should-give-401", func(t *testing.T) { req, err := http.NewRequest(http.MethodGet, "/test", nil) @@ -455,41 +485,58 @@ func TestDiscoverOrigins(t *testing.T) { assert.Equal(t, 200, w.Code) assert.Equal(t, `[]`, w.Body.String()) }) - t.Run("response-origin-should-match-cache", func(t *testing.T) { + t.Run("response-should-match-serverAds", func(t *testing.T) { req, err := http.NewRequest(http.MethodGet, "/test", nil) if err != nil { t.Fatalf("Could not make a GET request: %v", err) } - serverAdMutex.Lock() - serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - // Server fetched from topology should not be present in SD response - serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - serverAds.Set(mockCacheServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - serverAdMutex.Unlock() + func() { + serverAdMutex.Lock() + defer serverAdMutex.Unlock() + serverAds.DeleteAll() + serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + // Server fetched from topology should not be present in SD response + serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + serverAds.Set(mockCacheServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + }() expectedRes := []PromDiscoveryItem{{ + Targets: []string{mockCacheServerAd.WebURL.Hostname() + ":" + mockCacheServerAd.WebURL.Port()}, + Labels: map[string]string{ + "server_type": string(mockCacheServerAd.Type), + "server_name": mockCacheServerAd.Name, + "server_auth_url": mockCacheServerAd.AuthURL.String(), + "server_url": mockCacheServerAd.URL.String(), + "server_web_url": mockCacheServerAd.WebURL.String(), + "server_lat": fmt.Sprintf("%.4f", mockCacheServerAd.Latitude), + "server_long": fmt.Sprintf("%.4f", mockCacheServerAd.Longitude), + }, + }, { Targets: []string{mockPelicanOriginServerAd.WebURL.Hostname() + ":" + mockPelicanOriginServerAd.WebURL.Port()}, Labels: map[string]string{ - "origin_name": mockPelicanOriginServerAd.Name, - "origin_auth_url": mockPelicanOriginServerAd.AuthURL.String(), - "origin_url": mockPelicanOriginServerAd.URL.String(), - "origin_web_url": mockPelicanOriginServerAd.WebURL.String(), - "origin_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude), - "origin_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude), + "server_type": string(mockPelicanOriginServerAd.Type), + "server_name": mockPelicanOriginServerAd.Name, + "server_auth_url": mockPelicanOriginServerAd.AuthURL.String(), + "server_url": mockPelicanOriginServerAd.URL.String(), + "server_web_url": mockPelicanOriginServerAd.WebURL.String(), + "server_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude), + "server_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude), }, }} - resStr, err := json.Marshal(expectedRes) - assert.NoError(t, err, "Could not marshal json response") - req.Header.Set("Authorization", "Bearer "+string(setupToken(""))) w := httptest.NewRecorder() r.ServeHTTP(w, req) - assert.Equal(t, 200, w.Code) - assert.Equal(t, string(resStr), w.Body.String(), "Reponse doesn't match expected") + require.Equal(t, 200, w.Code) + + var resMarshalled []PromDiscoveryItem + err = json.Unmarshal(w.Body.Bytes(), &resMarshalled) + require.NoError(t, err, "Error unmarshall response to json") + + assert.True(t, areSlicesEqualIgnoreOrder(expectedRes, resMarshalled)) }) t.Run("no-duplicated-origins", func(t *testing.T) { @@ -498,25 +545,28 @@ func TestDiscoverOrigins(t *testing.T) { t.Fatalf("Could not make a GET request: %v", err) } - serverAdMutex.Lock() - // Add multiple same serverAds - serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - // Server fetched from topology should not be present in SD response - serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - serverAds.Set(mockCacheServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) - serverAdMutex.Unlock() + func() { + serverAdMutex.Lock() + defer serverAdMutex.Unlock() + serverAds.DeleteAll() + // Add multiple same serverAds + serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + // Server fetched from topology should not be present in SD response + serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL) + }() expectedRes := []PromDiscoveryItem{{ Targets: []string{mockPelicanOriginServerAd.WebURL.Hostname() + ":" + mockPelicanOriginServerAd.WebURL.Port()}, Labels: map[string]string{ - "origin_name": mockPelicanOriginServerAd.Name, - "origin_auth_url": mockPelicanOriginServerAd.AuthURL.String(), - "origin_url": mockPelicanOriginServerAd.URL.String(), - "origin_web_url": mockPelicanOriginServerAd.WebURL.String(), - "origin_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude), - "origin_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude), + "server_type": string(mockPelicanOriginServerAd.Type), + "server_name": mockPelicanOriginServerAd.Name, + "server_auth_url": mockPelicanOriginServerAd.AuthURL.String(), + "server_url": mockPelicanOriginServerAd.URL.String(), + "server_web_url": mockPelicanOriginServerAd.WebURL.String(), + "server_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude), + "server_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude), }, }} diff --git a/web_ui/prometheus.go b/web_ui/prometheus.go index a09135d17..7abc593ce 100644 --- a/web_ui/prometheus.go +++ b/web_ui/prometheus.go @@ -144,9 +144,9 @@ func runtimeInfo() (api_v1.RuntimeInfo, error) { return api_v1.RuntimeInfo{}, nil } -// Configure director's Prometheus scraper to use HTTP service discovery for origins +// Configure director's Prometheus scraper to use HTTP service discovery for origins/caches func configDirectorPromScraper() (*config.ScrapeConfig, error) { - originDiscoveryUrl, err := url.Parse(param.Server_ExternalWebUrl.GetString()) + serverDiscoveryUrl, err := url.Parse(param.Server_ExternalWebUrl.GetString()) if err != nil { return nil, fmt.Errorf("parse external URL %v: %w", param.Server_ExternalWebUrl.GetString(), err) } @@ -158,9 +158,9 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) { if err != nil { return nil, fmt.Errorf("Failed to generate token for director scraper at start: %v", err) } - originDiscoveryUrl.Path = "/api/v1.0/director/discoverOrigins" + serverDiscoveryUrl.Path = "/api/v1.0/director/discoverOrigins" scrapeConfig := config.DefaultScrapeConfig - scrapeConfig.JobName = "origins" + scrapeConfig.JobName = "origin_cache_servers" scrapeConfig.Scheme = "https" // This will cause the director to maintain a CA bundle, including the custom CA, at @@ -173,7 +173,7 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) { scraperHttpClientConfig := common_config.HTTPClientConfig{ TLSConfig: common_config.TLSConfig{ - // For the scraper to origins' metrics, we get TLSSkipVerify from config + // For the scraper to origin/caches' metrics, we get TLSSkipVerify from config // As this request is to external address InsecureSkipVerify: param.TLSSkipVerify.GetBool(), }, @@ -201,7 +201,7 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) { }, } scrapeConfig.ServiceDiscoveryConfigs[0] = &prom_http.SDConfig{ - URL: originDiscoveryUrl.String(), + URL: serverDiscoveryUrl.String(), RefreshInterval: model.Duration(15 * time.Second), HTTPClientConfig: sdHttpClientConfig, } @@ -366,7 +366,7 @@ func ConfigureEmbeddedPrometheus(engine *gin.Engine, isDirector bool) error { } promCfg.ScrapeConfigs[0] = &scrapeConfig - // Add origins monitoring to director's prometheus instance + // Add origins/caches monitoring to director's prometheus instance if isDirector { dirPromScraperConfig, err := configDirectorPromScraper() if err != nil { @@ -701,7 +701,7 @@ func ConfigureEmbeddedPrometheus(engine *gin.Engine, isDirector bool) error { if isDirector { // Refresh service discovery token by re-configure scraper if len(promCfg.ScrapeConfigs) < 2 { - return errors.New("Prometheus scraper config didn't include origins HTTP SD config. Length of configs less than 2.") + return errors.New("Prometheus scraper config didn't include origin/cache HTTP SD config. Length of configs less than 2.") } // Index 0 is the default config for servers // Create new director-scrap token & service discovery token From abd6e3831474322eb4aa7b91bf81b7c29d2fea1e Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Thu, 21 Dec 2023 16:14:07 +0000 Subject: [PATCH 2/2] Address code reivew feedback --- director/redirect.go | 13 +++++++++++-- web_ui/prometheus.go | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/director/redirect.go b/director/redirect.go index 29ad956b5..452d1751d 100644 --- a/director/redirect.go +++ b/director/redirect.go @@ -49,6 +49,12 @@ var ( healthTestCancelFuncsMutex = sync.RWMutex{} ) +// The endpoint for director Prometheus instance to discover Pelican servers +// for scraping (origins/caches). +// +// TODO: Add registry server as well to this endpoint when we need to scrape from it +const DirectorServerDiscoveryEndpoint = "/api/v1.0/director/discoverServers" + func getRedirectURL(reqPath string, ad ServerAd, requiresAuth bool) (redirectURL url.URL) { var serverURL url.URL if requiresAuth { @@ -480,7 +486,7 @@ func DiscoverOriginCache(ctx *gin.Context) { promDiscoveryRes := make([]PromDiscoveryItem, 0) for _, ad := range serverAds { if ad.WebURL.String() == "" { - // Oririgns and caches fetched from topology can't be scraped as they + // Origins and caches fetched from topology can't be scraped as they // don't have a WebURL continue } @@ -519,7 +525,10 @@ func RegisterDirector(router *gin.RouterGroup) { router.GET("/api/v1.0/director/object/*any", RedirectToCache) router.GET("/api/v1.0/director/origin/*any", RedirectToOrigin) router.POST("/api/v1.0/director/registerOrigin", RegisterOrigin) - router.GET("/api/v1.0/director/discoverOrigins", DiscoverOriginCache) + // In the foreseeable feature, director will scrape all servers in Pelican ecosystem (including registry) + // so that director can be our point of contact for collecting system-level metrics. + // Rename the endpoint to reflect such plan. + router.GET(DirectorServerDiscoveryEndpoint, DiscoverOriginCache) router.POST("/api/v1.0/director/registerCache", RegisterCache) router.GET("/api/v1.0/director/listNamespaces", ListNamespaces) } diff --git a/web_ui/prometheus.go b/web_ui/prometheus.go index 7abc593ce..746cb960f 100644 --- a/web_ui/prometheus.go +++ b/web_ui/prometheus.go @@ -158,7 +158,7 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) { if err != nil { return nil, fmt.Errorf("Failed to generate token for director scraper at start: %v", err) } - serverDiscoveryUrl.Path = "/api/v1.0/director/discoverOrigins" + serverDiscoveryUrl.Path = "/api/v1.0/director/discoverServers" scrapeConfig := config.DefaultScrapeConfig scrapeConfig.JobName = "origin_cache_servers" scrapeConfig.Scheme = "https"