From d38e2dade941cb2f0c362219f1c2d82fdc076d64 Mon Sep 17 00:00:00 2001 From: Howard Zhong Date: Wed, 15 Jan 2025 21:48:25 +0000 Subject: [PATCH 1/4] director and origin API --- director/cache_ads.go | 67 ++++++----- director/director.go | 100 +++++++++++++++++ director/director_test.go | 225 +++++++++++++++++++++++++++++++++++++ origin/origin_ui.go | 137 ++++++++++++++++++++++ server_structs/director.go | 5 + 5 files changed, 506 insertions(+), 28 deletions(-) diff --git a/director/cache_ads.go b/director/cache_ads.go index cf89b1b64..6e55463d8 100644 --- a/director/cache_ads.go +++ b/director/cache_ads.go @@ -74,31 +74,16 @@ func (f filterType) String() string { } } -// recordAd does following for an incoming ServerAd and []NamespaceAdV2 pair: -// -// 1. Update the ServerAd by setting server location and updating server topology attribute -// 2. Record the ServerAd and NamespaceAdV2 to the TTL cache -// 3. Set up the server `stat` call utilities -// 4. Set up utilities for collecting origin/health server file transfer test status -// 5. Return the updated ServerAd. The ServerAd passed in will not be modified -func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]server_structs.NamespaceAdV2) (updatedAd server_structs.ServerAd) { - if err := updateLatLong(&sAd); err != nil { - if geoIPError, ok := err.(geoIPError); ok { - labels := geoIPError.labels - metrics.PelicanDirectorGeoIPErrors.With(labels).Inc() - } - log.Debugln("Failed to lookup GeoIP coordinates for host", sAd.URL.Host) - } - - if sAd.URL.String() == "" { - log.Errorf("The URL of the serverAd %#v is empty. Cannot set the TTL cache.", sAd) - return +// getServerAd returns the existing server ad of the given server url from the cache +func getServerAd(serverUrl string) (*server_structs.Advertisement, error) { + if serverUrl == "" { + return nil, errors.Errorf("The URL of the serverAd is empty. Cannot set the TTL cache.") } // Since servers from topology always use http, while servers from Pelican always use https // we want to ignore the scheme difference when checking duplicates (only consider hostname:port) - rawURL := sAd.URL.String() // could be http (topology) or https (Pelican or some topology ones) - httpURL := sAd.URL.String() - httpsURL := sAd.URL.String() + rawURL := serverUrl // could be http (topology) or https (Pelican or some topology ones) + httpURL := serverUrl + httpsURL := serverUrl if strings.HasPrefix(rawURL, "https") { httpURL = "http" + strings.TrimPrefix(rawURL, "https") } @@ -113,21 +98,47 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[] if existing == nil { existing = serverAds.Get(rawURL) } + if existing == nil { + return nil, errors.Errorf("The server ad for %s is not found in the cache", serverUrl) + } + return existing.Value(), nil +} + +// recordAd does following for an incoming ServerAd and []NamespaceAdV2 pair: +// +// 1. Update the ServerAd by setting server location and updating server topology attribute +// 2. Record the ServerAd and NamespaceAdV2 to the TTL cache +// 3. Set up the server `stat` call utilities +// 4. Set up utilities for collecting origin/health server file transfer test status +// 5. Return the updated ServerAd. The ServerAd passed in will not be modified +func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]server_structs.NamespaceAdV2) (updatedAd server_structs.ServerAd) { + if err := updateLatLong(&sAd); err != nil { + if geoIPError, ok := err.(geoIPError); ok { + labels := geoIPError.labels + metrics.PelicanDirectorGeoIPErrors.With(labels).Inc() + } + log.Debugln("Failed to lookup GeoIP coordinates for host", sAd.URL.Host) + } + + existing, err := getServerAd(sAd.URL.String()) + if err != nil { + log.Debugf("No existing server ad for %s: %v", sAd.URL.String(), err) + } // There's an existing ad in the cache if existing != nil { - if sAd.FromTopology && !existing.Value().FromTopology { + if sAd.FromTopology && !existing.FromTopology { // if the incoming is from topology but the existing is from Pelican log.Debugf("The ServerAd generated from topology with name %s and URL %s was ignored because there's already a Pelican ad for this server", sAd.Name, sAd.URL.String()) return } - if !sAd.FromTopology && existing.Value().FromTopology { + if !sAd.FromTopology && existing.FromTopology { // Pelican server will overwrite topology one. We leave a message to let admin know - log.Debugf("The existing ServerAd generated from topology with name %s and URL %s is replaced by the Pelican server with name %s", existing.Value().Name, existing.Value().URL.String(), sAd.Name) - serverAds.Delete(existing.Value().URL.String()) + log.Debugf("The existing ServerAd generated from topology with name %s and URL %s is replaced by the Pelican server with name %s", existing.Name, existing.URL.String(), sAd.Name) + serverAds.Delete(existing.URL.String()) } - if !sAd.FromTopology && !existing.Value().FromTopology { // Only copy the IO Load value for Pelican server - sAd.IOLoad = existing.Value().GetIOLoad() // we copy the value from the existing serverAD to be consistent + if !sAd.FromTopology && !existing.FromTopology { // Only copy the IO Load value for Pelican server + sAd.IOLoad = existing.GetIOLoad() // we copy the value from the existing serverAD to be consistent } } diff --git a/director/director.go b/director/director.go index 11f99001e..731663ec4 100644 --- a/director/director.go +++ b/director/director.go @@ -1516,6 +1516,103 @@ func collectClientVersionMetric(reqVer *version.Version, service string) { metrics.PelicanDirectorClientVersionTotal.With(prometheus.Labels{"version": shortenedVersion, "service": service}).Inc() } +// Endpoint for origin and cache servers to set their own downtime in director +func setDowntimebyServer(ctx *gin.Context) { + downtimeRequest := server_structs.DowntimeRequest{} + err := ctx.ShouldBindBodyWith(&downtimeRequest, binding.JSON) + if err != nil { + ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Invalid downtime setting request", + }) + return + } + + // Get the server name via server url in the request body + serverUrl := downtimeRequest.ServerUrl // A full URL to the server starting with http:// or https:// + ad, err := getServerAd(serverUrl) + if err != nil { + ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server not found in cached server ads: " + serverUrl, + }) + return + } + serverName := ad.Name + if serverName == "" { + ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server name not found", + }) + return + } + + tokens, present := ctx.Request.Header["Authorization"] + if !present || len(tokens) == 0 { + ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Bearer token not present in the 'Authorization' header", + }) + return + } + + token := strings.TrimPrefix(tokens[0], "Bearer ") + + serverType := ad.ServerAd.Type + var serverNs string + if serverType == "Origin" { + serverNs = server_structs.GetOriginNs(serverName) + } else if serverType == "Cache" { + serverNs = server_structs.GetCacheNS(serverName) + } + if serverNs == "" { + ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server namespace not found", + }) + return + } + + ok, err := verifyAdvertiseToken(ctx, token, serverNs) + if err != nil { + if err == adminApprovalErr { + ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("%s was not approved by an administrator", downtimeRequest.ServerUrl), + }) + return + } else { + ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Authorization token verification failed %v", err), + }) + return + } + } + + if !ok { + ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Authorization token verification failed. Token missing required scope", + }) + return + } + + ctx.Params = append(ctx.Params, gin.Param{Key: "name", Value: serverName}) + + // Set the downtime + if downtimeRequest.EnableDowntime { + handleFilterServer(ctx) + } else { + handleAllowServer(ctx) + } + + ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{ + Status: server_structs.RespOK, + Msg: "Successfully set downtime", + }) +} + func collectDirectorRedirectionMetric(ctx *gin.Context, destination string) { labels := prometheus.Labels{ "destination": destination, @@ -1571,6 +1668,9 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) { // so that director can be our point of contact for collecting system-level metrics. // Rename the endpoint to reflect such plan. directorAPIV1.GET("/discoverServers", discoverOriginCache) + + // Cache/Origin sets their own downtime + directorAPIV1.POST("/downtime", setDowntimebyServer) } directorAPIV2 := router.Group("/api/v2.0/director", web_ui.ServerHeaderMiddleware) diff --git a/director/director_test.go b/director/director_test.go index d95407f46..241ed7ac8 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -32,6 +32,7 @@ import ( "net/http/httptest" "net/url" "path/filepath" + "strings" "testing" "time" @@ -2611,3 +2612,227 @@ func TestExtractProjectFromUserAgent(t *testing.T) { assert.Equal(t, "", result) }) } + +func TestSetDowntimebyServer(t *testing.T) { + server_utils.ResetTestState() + ctx, cancel, egrp := test_utils.TestContext(context.Background(), t) + SetupMockDirectorDB(t) + t.Cleanup(func() { + TeardownMockDirectorDB(t) + cancel() + assert.NoError(t, egrp.Wait()) + server_utils.ResetTestState() + }) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.Method == "POST" && req.URL.Path == "/api/v1.0/registry/checkNamespaceStatus" { + reqBody, err := io.ReadAll(req.Body) + require.NoError(t, err) + reqJson := server_structs.CheckNamespaceStatusReq{} + err = json.Unmarshal(reqBody, &reqJson) + require.NoError(t, err) + + // Validate the expected prefixes + // We expect the registration to use "test" for namespace, /caches/test for cache, and /origins/test for origin + if reqJson.Prefix != "test" && reqJson.Prefix != "/caches/test" && reqJson.Prefix != "/origins/test" { + w.WriteHeader(http.StatusNotFound) + return + } + + // Respond with approved status + res := server_structs.CheckNamespaceStatusRes{Approved: true} + resByte, err := json.Marshal(res) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + _, err = w.Write(resByte) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + viper.Set("Federation.RegistryUrl", ts.URL) + viper.Set("Director.CacheSortMethod", "distance") + viper.Set("Director.StatTimeout", 300*time.Millisecond) + viper.Set("Director.StatConcurrencyLimit", 1) + + setupContext := func() (*gin.Context, *gin.Engine, *httptest.ResponseRecorder) { + w := httptest.NewRecorder() + c, r := gin.CreateTestContext(w) + return c, r, w + } + + generateToken := func() (jwk.Key, string, url.URL) { + // Create a private key to use for the test + privateKey, err := ecdsa.GenerateKey(elliptic.P521(), rand.Reader) + assert.NoError(t, err, "Error generating private key") + + // Convert from raw ecdsa to jwk.Key + pKey, err := jwk.FromRaw(privateKey) + assert.NoError(t, err, "Unable to convert ecdsa.PrivateKey to jwk.Key") + + //Assign Key id to the private key + err = jwk.AssignKeyID(pKey) + assert.NoError(t, err, "Error assigning kid to private key") + + //Set an algorithm for the key + err = pKey.Set(jwk.AlgorithmKey, jwa.ES256) + assert.NoError(t, err, "Unable to set algorithm for pKey") + + issuerURL := url.URL{ + Scheme: "https", + Path: ts.URL, + } + + // Create a token to be inserted + tok, err := jwt.NewBuilder(). + Issuer(issuerURL.String()). + Claim("scope", token_scopes.Pelican_Advertise.String()). + Audience([]string{"director.test"}). + Subject("origin"). + Build() + assert.NoError(t, err, "Error creating token") + + signed, err := jwt.Sign(tok, jwt.WithKey(jwa.ES256, pKey)) + assert.NoError(t, err, "Error signing token") + + return pKey, string(signed), issuerURL + } + + setupRegisterRequest := func(c *gin.Context, r *gin.Engine, bodyByte []byte, token string, stype server_structs.ServerType) { + r.POST("/", func(gctx *gin.Context) { registerServeAd(ctx, gctx, stype) }) + c.Request, _ = http.NewRequest(http.MethodPost, "/", bytes.NewBuffer(bodyByte)) + c.Request.Header.Set("Authorization", "Bearer "+token) + c.Request.Header.Set("Content-Type", "application/json") + // Hard code the current min version. When this test starts failing because of new stuff in the Director, + // we'll know that means it's time to update the min version in redirect.go + c.Request.Header.Set("User-Agent", "pelican-origin/7.0.0") + } + + setupDowntimeRequest := func(c *gin.Context, bodyByte []byte, token string) { + + c.Request, _ = http.NewRequest(http.MethodPost, "/api/v1.0/director/downtime", bytes.NewBuffer(bodyByte)) + c.Request.Header.Set("Authorization", "Bearer "+token) + c.Request.Header.Set("Content-Type", "application/json") + // Hard code the current min version. When this test starts failing because of new stuff in the Director, + // we'll know that means it's time to update the min version in redirect.go + c.Request.Header.Set("User-Agent", "pelican-origin/7.0.0") + } + + setupJwksCache := func(t *testing.T, ns string, key jwk.Key) { + jwks := jwk.NewSet() + err := jwks.AddKey(key) + require.NoError(t, err) + namespaceKeys.Set(ts.URL+"/api/v1.0/registry"+ns+"/.well-known/issuer.jwks", jwks, ttlcache.DefaultTTL) + } + + getLastJsonObj := func(data string) (server_structs.SimpleApiResp, error) { + // Split the string into individual JSON objects + objects := strings.Split(data, "}{") + + // Get the last object + lastObj := objects[len(objects)-1] + + // Add closing brace if missing + if !strings.HasSuffix(lastObj, "}") { + lastObj = lastObj + "}" + } + + // Add opening brace if missing + if !strings.HasPrefix(lastObj, "{") { + lastObj = "{" + lastObj + } + + // Unmarshal only the last object + var resp server_structs.SimpleApiResp + err := json.Unmarshal([]byte(lastObj), &resp) + return resp, err + } + + teardown := func() { + serverAds.DeleteAll() + namespaceKeys.DeleteAll() + } + + t.Run("successful-and-failed-downtime-enable", func(t *testing.T) { + c, r, w := setupContext() + + // Server advertisement setting and retrieval + pKey, token, _ := generateToken() + publicKey, err := jwk.PublicKeyOf(pKey) + assert.NoError(t, err, "Error creating public key from private key") + + setupJwksCache(t, "/foo/bar", publicKey) + setupJwksCache(t, "/origins/test", publicKey) + + isurl := url.URL{} + isurl.Path = ts.URL + + ad := server_structs.OriginAdvertiseV2{ + BrokerURL: "https://broker-url.org", + DataURL: "https://or-url.org", + Name: "test", + Namespaces: []server_structs.NamespaceAdV2{{ + Path: "/foo/bar", + Issuer: []server_structs.TokenIssuer{{IssuerUrl: isurl}}, + }}, + } + + jsonad, err := json.Marshal(ad) + assert.NoError(t, err, "Error marshalling OriginAdvertise") + + setupRegisterRequest(c, r, jsonad, token, server_structs.OriginType) + + r.ServeHTTP(w, c.Request) + + // Check to see that the code exits with status code 200 after given it a good token + assert.Equal(t, 200, w.Result().StatusCode, "Expected status code of 200") + + get := serverAds.Get("https://or-url.org") + getAd := get.Value() + require.NotNil(t, get, "Coudln't find server in the director cache.") + assert.Equal(t, getAd.Name, ad.Name) + require.Len(t, getAd.NamespaceAds, 1) + assert.Equal(t, getAd.NamespaceAds[0].Path, "/foo/bar") + + // Second request - downtime setting + downtimeRequest := server_structs.DowntimeRequest{ + ServerUrl: "https://or-url.org", + EnableDowntime: true, + } + requestBody, _ := json.Marshal(downtimeRequest) + + r.POST("/api/v1.0/director/downtime", setDowntimebyServer) + setupDowntimeRequest(c, requestBody, token) + + r.ServeHTTP(w, c.Request) + + assert.Equal(t, http.StatusOK, w.Code) + + response, err := getLastJsonObj(w.Body.String()) + assert.NoError(t, err, "Error unmarshaling response") + assert.Equal(t, server_structs.RespOK, response.Status) + assert.Equal(t, "Successfully set downtime", response.Msg) + + // Test invalid token + wrongToken := "i-am-a-wrong-token" + setupDowntimeRequest(c, requestBody, wrongToken) + + r.ServeHTTP(w, c.Request) + + response, err = getLastJsonObj(w.Body.String()) + assert.NoError(t, err, "Error unmarshaling response") + assert.Equal(t, server_structs.RespFailed, response.Status) + assert.Equal(t, "Authorization token verification failed invalid JWT", response.Msg) + + teardown() + }) + +} diff --git a/origin/origin_ui.go b/origin/origin_ui.go index 27ab5a923..62b8cff29 100644 --- a/origin/origin_ui.go +++ b/origin/origin_ui.go @@ -19,6 +19,9 @@ package origin import ( + "bytes" + "encoding/json" + "fmt" "net/http" "net/url" "time" @@ -160,10 +163,144 @@ func handleExports(ctx *gin.Context) { ctx.JSON(http.StatusOK, res) } +func handleSetDowntime(ctx *gin.Context) { + serverUrl := param.Server_ExternalWebUrl.GetString() + + fedInfo, err := config.GetFederation(ctx) + if err != nil { + log.Error("failed to get federaion:", err) + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server error when getting federation information: " + err.Error(), + }) + } + directorUrlStr := fedInfo.DirectorEndpoint + if directorUrlStr == "" { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Director endpoint URL is not known", + }) + } + directorUrl, err := url.Parse(directorUrlStr) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to parse Federation.DirectorURL: " + directorUrlStr, + }) + } + + directorUrl.Path = "/api/v1.0/director/downtime" + + // Construct the request body + downtimeRequest := server_structs.DowntimeRequest{ + ServerUrl: serverUrl, + EnableDowntime: true, // or false, depending on what the UI sends below + } + // Get the downtime enable/disable status from the request body + // Assuming webUI sends a JSON payload like {"enableDowntime": true/false} + var downtimeStatus struct { + EnableDowntime bool `json:"enableDowntime"` + } + if err := ctx.ShouldBindJSON(&downtimeStatus); err != nil { + ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Invalid downtime status in request body", + }) + return + } + downtimeRequest.EnableDowntime = downtimeStatus.EnableDowntime + + reqBody, err := json.Marshal(downtimeRequest) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to marshal downtime request", + }) + return + } + + // Create token for setting downtime + issuerUrl, err := config.GetServerIssuerURL() + if err != nil { + log.Errorf("Failed to get server issuer url %v", err) + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server encountered error when getting server issuer url " + err.Error(), + }) + return + } + advTokenCfg := token.NewWLCGToken() + advTokenCfg.Lifetime = time.Minute + advTokenCfg.Issuer = issuerUrl + advTokenCfg.AddAudiences(fedInfo.DirectorEndpoint) + advTokenCfg.Subject = "origin" // For cache: advTokenCfg.Subject = "cache" + advTokenCfg.AddScopes(token_scopes.Pelican_Advertise) + + tok, err := advTokenCfg.CreateToken() + if err != nil { + log.Errorf("Failed to create token for setting downtime %v", err) + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server encountered error when creating token for setting downtime " + err.Error(), + }) + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, directorUrl.String(), bytes.NewBuffer(reqBody)) + if err != nil { + log.Error("Failed to create a POST request for downtime setting") + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server encountered error when creating a POST request for downtime setting " + err.Error(), + }) + return + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+tok) + userAgent := "pelican-origin/" + config.GetVersion() // For cache: "pelican-cache/" + req.Header.Set("User-Agent", userAgent) + log.Warningf("request url: %s", req.URL.String()) + + tr := config.GetTransport() + client := http.Client{Transport: tr} + + resp, err := client.Do(req) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Failed to send request to director: %v", err), + }) + return + } + defer resp.Body.Close() + + // Process the response from director + var directorResponse server_structs.SimpleApiResp + err = json.NewDecoder(resp.Body).Decode(&directorResponse) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to parse director's response: " + err.Error(), + }) + return + } + if resp.StatusCode != http.StatusOK { + ctx.JSON(resp.StatusCode, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Director returned error: %s", directorResponse.Msg), + }) + return + } + + ctx.JSON(http.StatusOK, directorResponse) +} + func RegisterOriginWebAPI(engine *gin.Engine) error { originWebAPI := engine.Group("/api/v1.0/origin_ui", web_ui.ServerHeaderMiddleware) { originWebAPI.GET("/exports", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleExports) + originWebAPI.POST("/downtime", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleSetDowntime) } // Globus backend specific. Config other origin routes above this line diff --git a/server_structs/director.go b/server_structs/director.go index 49058643f..87b92b86f 100644 --- a/server_structs/director.go +++ b/server_structs/director.go @@ -149,6 +149,11 @@ type ( DeviceEndpoint string `json:"device_authorization_endpoint,omitempty"` } + DowntimeRequest struct { + ServerUrl string `json:"server_url"` + EnableDowntime bool `json:"enable_downtime"` + } + XPelHeader interface { GetName() string ParseRawHeader(*http.Response) error From 5dcb2590a9e0e4728351becd9315e1fb352b986a Mon Sep 17 00:00:00 2001 From: Howard Zhong Date: Thu, 6 Feb 2025 22:02:36 +0000 Subject: [PATCH 2/4] integrate the new hostname as prefix in origin/cache namespace registration --- director/director.go | 10 +++++----- director/director_test.go | 2 +- origin/origin_ui.go | 38 +++++++++++++++++++++----------------- server_structs/director.go | 1 + 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/director/director.go b/director/director.go index 731663ec4..0899fe4cf 100644 --- a/director/director.go +++ b/director/director.go @@ -1517,7 +1517,7 @@ func collectClientVersionMetric(reqVer *version.Version, service string) { } // Endpoint for origin and cache servers to set their own downtime in director -func setDowntimebyServer(ctx *gin.Context) { +func setDowntimeByServer(ctx *gin.Context) { downtimeRequest := server_structs.DowntimeRequest{} err := ctx.ShouldBindBodyWith(&downtimeRequest, binding.JSON) if err != nil { @@ -1528,8 +1528,8 @@ func setDowntimebyServer(ctx *gin.Context) { return } - // Get the server name via server url in the request body - serverUrl := downtimeRequest.ServerUrl // A full URL to the server starting with http:// or https:// + // Server's XRootD URL starting with http:// or https://, and ending with a port number + serverUrl := downtimeRequest.ServerUrl ad, err := getServerAd(serverUrl) if err != nil { ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ @@ -1609,7 +1609,7 @@ func setDowntimebyServer(ctx *gin.Context) { ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{ Status: server_structs.RespOK, - Msg: "Successfully set downtime", + Msg: fmt.Sprintf("Successfully set the downtime of %s %s to %v", ad.Type, serverName, downtimeRequest.EnableDowntime), }) } @@ -1670,7 +1670,7 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) { directorAPIV1.GET("/discoverServers", discoverOriginCache) // Cache/Origin sets their own downtime - directorAPIV1.POST("/downtime", setDowntimebyServer) + directorAPIV1.POST("/downtime", setDowntimeByServer) } directorAPIV2 := router.Group("/api/v2.0/director", web_ui.ServerHeaderMiddleware) diff --git a/director/director_test.go b/director/director_test.go index 241ed7ac8..f780a8472 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -2809,7 +2809,7 @@ func TestSetDowntimebyServer(t *testing.T) { } requestBody, _ := json.Marshal(downtimeRequest) - r.POST("/api/v1.0/director/downtime", setDowntimebyServer) + r.POST("/api/v1.0/director/downtime", setDowntimeByServer) setupDowntimeRequest(c, requestBody, token) r.ServeHTTP(w, c.Request) diff --git a/origin/origin_ui.go b/origin/origin_ui.go index 62b8cff29..1eac6bd8b 100644 --- a/origin/origin_ui.go +++ b/origin/origin_ui.go @@ -164,7 +164,21 @@ func handleExports(ctx *gin.Context) { } func handleSetDowntime(ctx *gin.Context) { - serverUrl := param.Server_ExternalWebUrl.GetString() + // Get the downtime on/off status from the request body + // Assuming webUI sends a JSON payload like {"enableDowntime": true/false} + var downtimeStatus struct { + EnableDowntime bool `json:"enableDowntime"` + } + if err := ctx.ShouldBindJSON(&downtimeStatus); err != nil { + ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Invalid downtime status in request body", + }) + return + } + + hostname := param.Server_Hostname.GetString() + serverUrl := param.Origin_Url.GetString() fedInfo, err := config.GetFederation(ctx) if err != nil { @@ -194,21 +208,9 @@ func handleSetDowntime(ctx *gin.Context) { // Construct the request body downtimeRequest := server_structs.DowntimeRequest{ ServerUrl: serverUrl, - EnableDowntime: true, // or false, depending on what the UI sends below + Hostname: hostname, + EnableDowntime: downtimeStatus.EnableDowntime, } - // Get the downtime enable/disable status from the request body - // Assuming webUI sends a JSON payload like {"enableDowntime": true/false} - var downtimeStatus struct { - EnableDowntime bool `json:"enableDowntime"` - } - if err := ctx.ShouldBindJSON(&downtimeStatus); err != nil { - ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Invalid downtime status in request body", - }) - return - } - downtimeRequest.EnableDowntime = downtimeStatus.EnableDowntime reqBody, err := json.Marshal(downtimeRequest) if err != nil { @@ -292,8 +294,10 @@ func handleSetDowntime(ctx *gin.Context) { }) return } - - ctx.JSON(http.StatusOK, directorResponse) + ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Successfully set the downtime of %s to %v", hostname, downtimeRequest.EnableDowntime), + }) } func RegisterOriginWebAPI(engine *gin.Engine) error { diff --git a/server_structs/director.go b/server_structs/director.go index 87b92b86f..c25f7cdc9 100644 --- a/server_structs/director.go +++ b/server_structs/director.go @@ -151,6 +151,7 @@ type ( DowntimeRequest struct { ServerUrl string `json:"server_url"` + Hostname string `json:"hostname"` EnableDowntime bool `json:"enable_downtime"` } From aef4e85f1375cd332901dea23e1ffc9429bc84fc Mon Sep 17 00:00:00 2001 From: Howard Zhong Date: Thu, 6 Feb 2025 23:46:20 +0000 Subject: [PATCH 3/4] cover cache server and improve test --- director/director_test.go | 9 ++- origin/origin_ui.go | 141 ---------------------------------- web_ui/ui.go | 155 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 145 deletions(-) diff --git a/director/director_test.go b/director/director_test.go index f780a8472..f7b9c6a13 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -2777,7 +2777,7 @@ func TestSetDowntimebyServer(t *testing.T) { ad := server_structs.OriginAdvertiseV2{ BrokerURL: "https://broker-url.org", - DataURL: "https://or-url.org", + DataURL: "https://or-url.org:8443", Name: "test", Namespaces: []server_structs.NamespaceAdV2{{ Path: "/foo/bar", @@ -2795,7 +2795,7 @@ func TestSetDowntimebyServer(t *testing.T) { // Check to see that the code exits with status code 200 after given it a good token assert.Equal(t, 200, w.Result().StatusCode, "Expected status code of 200") - get := serverAds.Get("https://or-url.org") + get := serverAds.Get("https://or-url.org:8443") getAd := get.Value() require.NotNil(t, get, "Coudln't find server in the director cache.") assert.Equal(t, getAd.Name, ad.Name) @@ -2804,7 +2804,8 @@ func TestSetDowntimebyServer(t *testing.T) { // Second request - downtime setting downtimeRequest := server_structs.DowntimeRequest{ - ServerUrl: "https://or-url.org", + ServerUrl: "https://or-url.org:8443", + Hostname: "or-url.org", EnableDowntime: true, } requestBody, _ := json.Marshal(downtimeRequest) @@ -2819,7 +2820,7 @@ func TestSetDowntimebyServer(t *testing.T) { response, err := getLastJsonObj(w.Body.String()) assert.NoError(t, err, "Error unmarshaling response") assert.Equal(t, server_structs.RespOK, response.Status) - assert.Equal(t, "Successfully set downtime", response.Msg) + assert.Equal(t, "Successfully set the downtime of Origin test to true", response.Msg) // Test invalid token wrongToken := "i-am-a-wrong-token" diff --git a/origin/origin_ui.go b/origin/origin_ui.go index 1eac6bd8b..27ab5a923 100644 --- a/origin/origin_ui.go +++ b/origin/origin_ui.go @@ -19,9 +19,6 @@ package origin import ( - "bytes" - "encoding/json" - "fmt" "net/http" "net/url" "time" @@ -163,148 +160,10 @@ func handleExports(ctx *gin.Context) { ctx.JSON(http.StatusOK, res) } -func handleSetDowntime(ctx *gin.Context) { - // Get the downtime on/off status from the request body - // Assuming webUI sends a JSON payload like {"enableDowntime": true/false} - var downtimeStatus struct { - EnableDowntime bool `json:"enableDowntime"` - } - if err := ctx.ShouldBindJSON(&downtimeStatus); err != nil { - ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Invalid downtime status in request body", - }) - return - } - - hostname := param.Server_Hostname.GetString() - serverUrl := param.Origin_Url.GetString() - - fedInfo, err := config.GetFederation(ctx) - if err != nil { - log.Error("failed to get federaion:", err) - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Server error when getting federation information: " + err.Error(), - }) - } - directorUrlStr := fedInfo.DirectorEndpoint - if directorUrlStr == "" { - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Director endpoint URL is not known", - }) - } - directorUrl, err := url.Parse(directorUrlStr) - if err != nil { - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Failed to parse Federation.DirectorURL: " + directorUrlStr, - }) - } - - directorUrl.Path = "/api/v1.0/director/downtime" - - // Construct the request body - downtimeRequest := server_structs.DowntimeRequest{ - ServerUrl: serverUrl, - Hostname: hostname, - EnableDowntime: downtimeStatus.EnableDowntime, - } - - reqBody, err := json.Marshal(downtimeRequest) - if err != nil { - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Failed to marshal downtime request", - }) - return - } - - // Create token for setting downtime - issuerUrl, err := config.GetServerIssuerURL() - if err != nil { - log.Errorf("Failed to get server issuer url %v", err) - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Server encountered error when getting server issuer url " + err.Error(), - }) - return - } - advTokenCfg := token.NewWLCGToken() - advTokenCfg.Lifetime = time.Minute - advTokenCfg.Issuer = issuerUrl - advTokenCfg.AddAudiences(fedInfo.DirectorEndpoint) - advTokenCfg.Subject = "origin" // For cache: advTokenCfg.Subject = "cache" - advTokenCfg.AddScopes(token_scopes.Pelican_Advertise) - - tok, err := advTokenCfg.CreateToken() - if err != nil { - log.Errorf("Failed to create token for setting downtime %v", err) - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Server encountered error when creating token for setting downtime " + err.Error(), - }) - return - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, directorUrl.String(), bytes.NewBuffer(reqBody)) - if err != nil { - log.Error("Failed to create a POST request for downtime setting") - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Server encountered error when creating a POST request for downtime setting " + err.Error(), - }) - return - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+tok) - userAgent := "pelican-origin/" + config.GetVersion() // For cache: "pelican-cache/" - req.Header.Set("User-Agent", userAgent) - log.Warningf("request url: %s", req.URL.String()) - - tr := config.GetTransport() - client := http.Client{Transport: tr} - - resp, err := client.Do(req) - if err != nil { - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: fmt.Sprintf("Failed to send request to director: %v", err), - }) - return - } - defer resp.Body.Close() - - // Process the response from director - var directorResponse server_structs.SimpleApiResp - err = json.NewDecoder(resp.Body).Decode(&directorResponse) - if err != nil { - ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "Failed to parse director's response: " + err.Error(), - }) - return - } - if resp.StatusCode != http.StatusOK { - ctx.JSON(resp.StatusCode, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: fmt.Sprintf("Director returned error: %s", directorResponse.Msg), - }) - return - } - ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: fmt.Sprintf("Successfully set the downtime of %s to %v", hostname, downtimeRequest.EnableDowntime), - }) -} - func RegisterOriginWebAPI(engine *gin.Engine) error { originWebAPI := engine.Group("/api/v1.0/origin_ui", web_ui.ServerHeaderMiddleware) { originWebAPI.GET("/exports", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleExports) - originWebAPI.POST("/downtime", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleSetDowntime) } // Globus backend specific. Config other origin routes above this line diff --git a/web_ui/ui.go b/web_ui/ui.go index bd1bdb2aa..4cd8e1d01 100644 --- a/web_ui/ui.go +++ b/web_ui/ui.go @@ -19,14 +19,17 @@ package web_ui import ( + "bytes" "context" "crypto/tls" "embed" + "encoding/json" "fmt" "math/rand" "mime" "net" "net/http" + "net/url" "os" "os/signal" "path" @@ -51,6 +54,8 @@ import ( "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/token" + "github.com/pelicanplatform/pelican/token_scopes" ) var ( @@ -179,6 +184,152 @@ func getEnabledServers(ctx *gin.Context) { ctx.JSON(200, gin.H{"servers": enabledServers}) } +func handleSetDowntime(ctx *gin.Context) { + // Ensure the server is origin or cache + if !config.IsServerEnabled(server_structs.OriginType) && !config.IsServerEnabled(server_structs.CacheType) { + ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Downtime setting is only available for origin or cache servers", + }) + return + } + + // Get the downtime on/off status from the request body + // Assuming webUI sends a JSON payload like {"enableDowntime": true/false} + var downtimeStatus struct { + EnableDowntime bool `json:"enableDowntime"` + } + if err := ctx.ShouldBindJSON(&downtimeStatus); err != nil { + ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Invalid downtime status in request body", + }) + return + } + + hostname := param.Server_Hostname.GetString() + serverUrl := param.Origin_Url.GetString() + + fedInfo, err := config.GetFederation(ctx) + if err != nil { + log.Error("failed to get federaion:", err) + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server error when getting federation information: " + err.Error(), + }) + } + directorUrlStr := fedInfo.DirectorEndpoint + if directorUrlStr == "" { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Director endpoint URL is not known", + }) + } + directorUrl, err := url.Parse(directorUrlStr) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to parse Federation.DirectorURL: " + directorUrlStr, + }) + } + + directorUrl.Path = "/api/v1.0/director/downtime" + + // Construct the request body + downtimeRequest := server_structs.DowntimeRequest{ + ServerUrl: serverUrl, + Hostname: hostname, + EnableDowntime: downtimeStatus.EnableDowntime, + } + + reqBody, err := json.Marshal(downtimeRequest) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to marshal downtime request", + }) + return + } + + // Create token for setting downtime + issuerUrl, err := config.GetServerIssuerURL() + if err != nil { + log.Errorf("Failed to get server issuer url %v", err) + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server encountered error when getting server issuer url " + err.Error(), + }) + return + } + advTokenCfg := token.NewWLCGToken() + advTokenCfg.Lifetime = time.Minute + advTokenCfg.Issuer = issuerUrl + advTokenCfg.AddAudiences(fedInfo.DirectorEndpoint) + advTokenCfg.Subject = "origin" // For cache: advTokenCfg.Subject = "cache" + advTokenCfg.AddScopes(token_scopes.Pelican_Advertise) + + tok, err := advTokenCfg.CreateToken() + if err != nil { + log.Errorf("Failed to create token for setting downtime %v", err) + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server encountered error when creating token for setting downtime " + err.Error(), + }) + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, directorUrl.String(), bytes.NewBuffer(reqBody)) + if err != nil { + log.Error("Failed to create a POST request for downtime setting") + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Server encountered error when creating a POST request for downtime setting " + err.Error(), + }) + return + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+tok) + userAgent := "pelican-origin/" + config.GetVersion() // For cache: "pelican-cache/" + req.Header.Set("User-Agent", userAgent) + log.Warningf("request url: %s", req.URL.String()) + + tr := config.GetTransport() + client := http.Client{Transport: tr} + + resp, err := client.Do(req) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Failed to send request to director: %v", err), + }) + return + } + defer resp.Body.Close() + + // Process the response from director + var directorResponse server_structs.SimpleApiResp + err = json.NewDecoder(resp.Body).Decode(&directorResponse) + if err != nil { + ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to parse director's response: " + err.Error(), + }) + return + } + if resp.StatusCode != http.StatusOK { + ctx.JSON(resp.StatusCode, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Director returned error: %s", directorResponse.Msg), + }) + return + } + ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Successfully set the downtime of %s to %v", hostname, downtimeRequest.EnableDowntime), + }) +} + func handleGlobusPages(ctx *gin.Context) { // /foo/bar requestPath := ctx.Param("requestPath") @@ -407,6 +558,10 @@ func configureCommonEndpoints(engine *gin.Engine) error { engine.PATCH("/api/v1.0/config", AuthHandler, AdminAuthHandler, updateConfigValues) engine.POST("/api/v1.0/restart", AuthHandler, AdminAuthHandler, hotRestartServer) engine.GET("/api/v1.0/servers", getEnabledServers) + + // Both origin and cache can use this endpoint to set downtime, director will handle the difference + engine.POST("/api/v1.0/downtime", AuthHandler, AdminAuthHandler, handleSetDowntime) + // Health check endpoint for web engine engine.GET("/api/v1.0/health", func(ctx *gin.Context) { ctx.JSON(http.StatusOK, gin.H{"message": fmt.Sprintf("Web Engine Running. Time: %s", time.Now().String())}) From 4fc17d3161a9ce94058f8ca2edfb794c9451cbaa Mon Sep 17 00:00:00 2001 From: Howard Zhong Date: Fri, 7 Feb 2025 16:41:44 +0000 Subject: [PATCH 4/4] fix linter problem --- director/director_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/director/director_test.go b/director/director_test.go index f7b9c6a13..008ac8e35 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -2818,7 +2818,7 @@ func TestSetDowntimebyServer(t *testing.T) { assert.Equal(t, http.StatusOK, w.Code) response, err := getLastJsonObj(w.Body.String()) - assert.NoError(t, err, "Error unmarshaling response") + assert.NoError(t, err, "Error unmarshalling response") assert.Equal(t, server_structs.RespOK, response.Status) assert.Equal(t, "Successfully set the downtime of Origin test to true", response.Msg) @@ -2829,7 +2829,7 @@ func TestSetDowntimebyServer(t *testing.T) { r.ServeHTTP(w, c.Request) response, err = getLastJsonObj(w.Body.String()) - assert.NoError(t, err, "Error unmarshaling response") + assert.NoError(t, err, "Error unmarshalling response") assert.Equal(t, server_structs.RespFailed, response.Status) assert.Equal(t, "Authorization token verification failed invalid JWT", response.Msg)