Skip to content

Commit

Permalink
Merge pull request #493 from haoming29/add-cache-to-dir-sd
Browse files Browse the repository at this point in the history
Include cache in director's service discovery endpoint
  • Loading branch information
turetske authored Dec 21, 2023
2 parents b27a6ed + abd6e38 commit 80b8699
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 73 deletions.
14 changes: 2 additions & 12 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,17 @@ func VerifyDirectorSDToken(strToken string) (bool, error) {
}

// Create a token for director's Prometheus scraper to access discovered
// origins /metrics endpoint. This function is intended to be called on
// origins and caches `/metrics` endpoint. This function is intended to be called on
// a director server
func CreateDirectorScrapeToken() (string, error) {
// We assume this function is only called on a director server,
// the external address of which should be the director's URL
directorURL := param.Server_ExternalWebUrl.GetString()
tokenExpireTime := param.Monitoring_TokenExpiresIn.GetDuration()

ads := ListServerAds([]ServerType{OriginType, CacheType})
aud := make([]string, 0)
for _, ad := range ads {
if ad.WebURL.String() != "" {
aud = append(aud, ad.WebURL.String())
}
}

tok, err := jwt.NewBuilder().
Claim("scope", "monitoring.scrape").
Issuer(directorURL).
// The audience of this token is all origins/caches that have WebURL set in their serverAds
Audience(aud).
Issuer(directorURL). // Exclude audience from token to prevent http header overflow
Subject("director").
Expiration(time.Now().Add(tokenExpireTime)).
Build()
Expand Down
34 changes: 20 additions & 14 deletions director/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ var (
healthTestCancelFuncsMutex = sync.RWMutex{}
)

// The endpoint for director Prometheus instance to discover Pelican servers
// for scraping (origins/caches).
//
// TODO: Add registry server as well to this endpoint when we need to scrape from it
const DirectorServerDiscoveryEndpoint = "/api/v1.0/director/discoverServers"

func getRedirectURL(reqPath string, ad ServerAd, requiresAuth bool) (redirectURL url.URL) {
var serverURL url.URL
if requiresAuth {
Expand Down Expand Up @@ -457,9 +463,9 @@ func registerServeAd(ctx *gin.Context, sType ServerType) {
ctx.JSON(http.StatusOK, gin.H{"msg": "Successful registration"})
}

// Return a list of available origins URL in Prometheus HTTP SD format
// Return a list of registered origins and caches in Prometheus HTTP SD format
// for director's Prometheus service discovery
func DiscoverOrigins(ctx *gin.Context) {
func DiscoverOriginCache(ctx *gin.Context) {
// Check token for authorization
tokens, present := ctx.Request.Header["Authorization"]
if !present || len(tokens) == 0 {
Expand All @@ -484,24 +490,21 @@ func DiscoverOrigins(ctx *gin.Context) {
serverAds := serverAds.Keys()
promDiscoveryRes := make([]PromDiscoveryItem, 0)
for _, ad := range serverAds {
// We don't include caches in this discovery for right now
if ad.Type != OriginType {
continue
}
if ad.WebURL.String() == "" {
// Oririgns fetched from topology can't be scraped as they
// Origins and caches fetched from topology can't be scraped as they
// don't have a WebURL
continue
}
promDiscoveryRes = append(promDiscoveryRes, PromDiscoveryItem{
Targets: []string{ad.WebURL.Hostname() + ":" + ad.WebURL.Port()},
Labels: map[string]string{
"origin_name": ad.Name,
"origin_auth_url": ad.AuthURL.String(),
"origin_url": ad.URL.String(),
"origin_web_url": ad.WebURL.String(),
"origin_lat": fmt.Sprintf("%.4f", ad.Latitude),
"origin_long": fmt.Sprintf("%.4f", ad.Longitude),
"server_type": string(ad.Type),
"server_name": ad.Name,
"server_auth_url": ad.AuthURL.String(),
"server_url": ad.URL.String(),
"server_web_url": ad.WebURL.String(),
"server_lat": fmt.Sprintf("%.4f", ad.Latitude),
"server_long": fmt.Sprintf("%.4f", ad.Longitude),
},
})
}
Expand All @@ -527,7 +530,10 @@ func RegisterDirector(router *gin.RouterGroup) {
router.GET("/api/v1.0/director/object/*any", RedirectToCache)
router.GET("/api/v1.0/director/origin/*any", RedirectToOrigin)
router.POST("/api/v1.0/director/registerOrigin", RegisterOrigin)
router.GET("/api/v1.0/director/discoverOrigins", DiscoverOrigins)
// In the foreseeable feature, director will scrape all servers in Pelican ecosystem (including registry)
// so that director can be our point of contact for collecting system-level metrics.
// Rename the endpoint to reflect such plan.
router.GET(DirectorServerDiscoveryEndpoint, DiscoverOriginCache)
router.POST("/api/v1.0/director/registerCache", RegisterCache)
router.GET("/api/v1.0/director/listNamespaces", ListNamespaces)
}
128 changes: 89 additions & 39 deletions director/redirect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pelicanplatform/pelican/config"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockCache struct {
Expand Down Expand Up @@ -314,9 +315,9 @@ func TestGetAuthzEscaped(t *testing.T) {
assert.Equal(t, escapedToken, "tokenstring")
}

func TestDiscoverOrigins(t *testing.T) {
func TestDiscoverOriginCache(t *testing.T) {
mockPelicanOriginServerAd := ServerAd{
Name: "test-origin-server",
Name: "1-test-origin-server",
AuthURL: url.URL{},
URL: url.URL{
Scheme: "https",
Expand All @@ -332,21 +333,25 @@ func TestDiscoverOrigins(t *testing.T) {
}

mockTopoOriginServerAd := ServerAd{
Name: "test-origin-server",
Name: "test-topology-origin-server",
AuthURL: url.URL{},
URL: url.URL{
Scheme: "https",
Host: "fake-origin.org:8443",
Host: "fake-topology-origin.org:8443",
},
Type: OriginType,
Latitude: 123.05,
Longitude: 456.78,
}

mockCacheServerAd := ServerAd{
Name: "test-cache-server",
Name: "2-test-cache-server",
AuthURL: url.URL{},
URL: url.URL{
Scheme: "https",
Host: "fake-cache.org:8443",
},
WebURL: url.URL{
Scheme: "https",
Host: "fake-cache.org:8444",
},
Expand Down Expand Up @@ -412,8 +417,33 @@ func TestDiscoverOrigins(t *testing.T) {
return signed
}

areSlicesEqualIgnoreOrder := func(slice1, slice2 []PromDiscoveryItem) bool {
if len(slice1) != len(slice2) {
return false
}

counts := make(map[string]int)

for _, item := range slice1 {
bytes, err := json.Marshal(item)
require.NoError(t, err)
counts[string(bytes)]++
}

for _, item := range slice2 {
bytes, err := json.Marshal(item)
require.NoError(t, err)
counts[string(bytes)]--
if counts[string(bytes)] < 0 {
return false
}
}

return true
}

r := gin.Default()
r.GET("/test", DiscoverOrigins)
r.GET("/test", DiscoverOriginCache)

t.Run("no-token-should-give-401", func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "/test", nil)
Expand Down Expand Up @@ -455,41 +485,58 @@ func TestDiscoverOrigins(t *testing.T) {
assert.Equal(t, 200, w.Code)
assert.Equal(t, `[]`, w.Body.String())
})
t.Run("response-origin-should-match-cache", func(t *testing.T) {
t.Run("response-should-match-serverAds", func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "/test", nil)
if err != nil {
t.Fatalf("Could not make a GET request: %v", err)
}

serverAdMutex.Lock()
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
// Server fetched from topology should not be present in SD response
serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockCacheServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAdMutex.Unlock()
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
// Server fetched from topology should not be present in SD response
serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockCacheServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
}()

expectedRes := []PromDiscoveryItem{{
Targets: []string{mockCacheServerAd.WebURL.Hostname() + ":" + mockCacheServerAd.WebURL.Port()},
Labels: map[string]string{
"server_type": string(mockCacheServerAd.Type),
"server_name": mockCacheServerAd.Name,
"server_auth_url": mockCacheServerAd.AuthURL.String(),
"server_url": mockCacheServerAd.URL.String(),
"server_web_url": mockCacheServerAd.WebURL.String(),
"server_lat": fmt.Sprintf("%.4f", mockCacheServerAd.Latitude),
"server_long": fmt.Sprintf("%.4f", mockCacheServerAd.Longitude),
},
}, {
Targets: []string{mockPelicanOriginServerAd.WebURL.Hostname() + ":" + mockPelicanOriginServerAd.WebURL.Port()},
Labels: map[string]string{
"origin_name": mockPelicanOriginServerAd.Name,
"origin_auth_url": mockPelicanOriginServerAd.AuthURL.String(),
"origin_url": mockPelicanOriginServerAd.URL.String(),
"origin_web_url": mockPelicanOriginServerAd.WebURL.String(),
"origin_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude),
"origin_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude),
"server_type": string(mockPelicanOriginServerAd.Type),
"server_name": mockPelicanOriginServerAd.Name,
"server_auth_url": mockPelicanOriginServerAd.AuthURL.String(),
"server_url": mockPelicanOriginServerAd.URL.String(),
"server_web_url": mockPelicanOriginServerAd.WebURL.String(),
"server_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude),
"server_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude),
},
}}

resStr, err := json.Marshal(expectedRes)
assert.NoError(t, err, "Could not marshal json response")

req.Header.Set("Authorization", "Bearer "+string(setupToken("")))

w := httptest.NewRecorder()
r.ServeHTTP(w, req)

assert.Equal(t, 200, w.Code)
assert.Equal(t, string(resStr), w.Body.String(), "Reponse doesn't match expected")
require.Equal(t, 200, w.Code)

var resMarshalled []PromDiscoveryItem
err = json.Unmarshal(w.Body.Bytes(), &resMarshalled)
require.NoError(t, err, "Error unmarshall response to json")

assert.True(t, areSlicesEqualIgnoreOrder(expectedRes, resMarshalled))
})

t.Run("no-duplicated-origins", func(t *testing.T) {
Expand All @@ -498,25 +545,28 @@ func TestDiscoverOrigins(t *testing.T) {
t.Fatalf("Could not make a GET request: %v", err)
}

serverAdMutex.Lock()
// Add multiple same serverAds
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
// Server fetched from topology should not be present in SD response
serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockCacheServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAdMutex.Unlock()
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
// Add multiple same serverAds
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
// Server fetched from topology should not be present in SD response
serverAds.Set(mockTopoOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
}()

expectedRes := []PromDiscoveryItem{{
Targets: []string{mockPelicanOriginServerAd.WebURL.Hostname() + ":" + mockPelicanOriginServerAd.WebURL.Port()},
Labels: map[string]string{
"origin_name": mockPelicanOriginServerAd.Name,
"origin_auth_url": mockPelicanOriginServerAd.AuthURL.String(),
"origin_url": mockPelicanOriginServerAd.URL.String(),
"origin_web_url": mockPelicanOriginServerAd.WebURL.String(),
"origin_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude),
"origin_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude),
"server_type": string(mockPelicanOriginServerAd.Type),
"server_name": mockPelicanOriginServerAd.Name,
"server_auth_url": mockPelicanOriginServerAd.AuthURL.String(),
"server_url": mockPelicanOriginServerAd.URL.String(),
"server_web_url": mockPelicanOriginServerAd.WebURL.String(),
"server_lat": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Latitude),
"server_long": fmt.Sprintf("%.4f", mockPelicanOriginServerAd.Longitude),
},
}}

Expand Down
16 changes: 8 additions & 8 deletions web_ui/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func runtimeInfo() (api_v1.RuntimeInfo, error) {
return api_v1.RuntimeInfo{}, nil
}

// Configure director's Prometheus scraper to use HTTP service discovery for origins
// Configure director's Prometheus scraper to use HTTP service discovery for origins/caches
func configDirectorPromScraper() (*config.ScrapeConfig, error) {
originDiscoveryUrl, err := url.Parse(param.Server_ExternalWebUrl.GetString())
serverDiscoveryUrl, err := url.Parse(param.Server_ExternalWebUrl.GetString())
if err != nil {
return nil, fmt.Errorf("parse external URL %v: %w", param.Server_ExternalWebUrl.GetString(), err)
}
Expand All @@ -159,9 +159,9 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) {
if err != nil {
return nil, fmt.Errorf("Failed to generate token for director scraper at start: %v", err)
}
originDiscoveryUrl.Path = "/api/v1.0/director/discoverOrigins"
serverDiscoveryUrl.Path = "/api/v1.0/director/discoverServers"
scrapeConfig := config.DefaultScrapeConfig
scrapeConfig.JobName = "origins"
scrapeConfig.JobName = "origin_cache_servers"
scrapeConfig.Scheme = "https"

// This will cause the director to maintain a CA bundle, including the custom CA, at
Expand All @@ -174,7 +174,7 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) {

scraperHttpClientConfig := common_config.HTTPClientConfig{
TLSConfig: common_config.TLSConfig{
// For the scraper to origins' metrics, we get TLSSkipVerify from config
// For the scraper to origin/caches' metrics, we get TLSSkipVerify from config
// As this request is to external address
InsecureSkipVerify: param.TLSSkipVerify.GetBool(),
},
Expand Down Expand Up @@ -202,7 +202,7 @@ func configDirectorPromScraper() (*config.ScrapeConfig, error) {
},
}
scrapeConfig.ServiceDiscoveryConfigs[0] = &prom_http.SDConfig{
URL: originDiscoveryUrl.String(),
URL: serverDiscoveryUrl.String(),
RefreshInterval: model.Duration(15 * time.Second),
HTTPClientConfig: sdHttpClientConfig,
}
Expand Down Expand Up @@ -367,7 +367,7 @@ func ConfigureEmbeddedPrometheus(engine *gin.Engine, isDirector bool) error {
}
promCfg.ScrapeConfigs[0] = &scrapeConfig

// Add origins monitoring to director's prometheus instance
// Add origins/caches monitoring to director's prometheus instance
if isDirector {
dirPromScraperConfig, err := configDirectorPromScraper()
if err != nil {
Expand Down Expand Up @@ -702,7 +702,7 @@ func ConfigureEmbeddedPrometheus(engine *gin.Engine, isDirector bool) error {
if isDirector {
// Refresh service discovery token by re-configure scraper
if len(promCfg.ScrapeConfigs) < 2 {
return errors.New("Prometheus scraper config didn't include origins HTTP SD config. Length of configs less than 2.")
return errors.New("Prometheus scraper config didn't include origin/cache HTTP SD config. Length of configs less than 2.")
}
// Index 0 is the default config for servers
// Create new director-scrap token & service discovery token
Expand Down

0 comments on commit 80b8699

Please sign in to comment.