From a34225938e1a63ea4ba8587272c2f5a8502fa8f5 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 18 Feb 2025 08:38:46 -0600 Subject: [PATCH 1/4] Decrease potential memory usage of the monitoring subsystem Various internal caches in the origin/cache monitoring were filling with temporary session information, causing unpredictable results in the memory "stress test" unit tests. This commit: - Allows monitoring to be turned off completely if desired; labelled as a hidden variable. - Removes the user ID from the users cache as soon as it is used. Since there's a single use case - and no other indication the user ID cache entry isn't needed - we remove it here. - Reduce redundant calls into the ttlcache. --- config/resources/defaults.yaml | 1 + director/stat_stress_test.go | 6 ++++-- docs/parameters.yaml | 12 ++++++++++++ metrics/xrootd_metrics.go | 24 ++++++++++++++++-------- param/parameters.go | 1 + param/parameters_struct.go | 2 ++ 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index 14e82cfd1..7c79d7c28 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -114,6 +114,7 @@ Xrootd: DetailedMonitoringPort: 9930 SummaryMonitoringPort: 9931 AuthRefreshInterval: 5m + EnableLocalMonitoring: true Transport: DialerTimeout: 10s DialerKeepAlive: 30s diff --git a/director/stat_stress_test.go b/director/stat_stress_test.go index 53bb02e90..a137ac974 100644 --- a/director/stat_stress_test.go +++ b/director/stat_stress_test.go @@ -30,6 +30,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "golang.org/x/sync/errgroup" @@ -52,6 +53,7 @@ var ( func TestStatMemory(t *testing.T) { server_utils.ResetTestState() + viper.Set(param.Xrootd_EnableLocalMonitoring.GetName(), false) fed := fed_test_utils.NewFedTest(t, directorPublicCfg) discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString()) assert.NoError(t, err) @@ -102,6 +104,6 @@ func TestStatMemory(t *testing.T) { log.Infoln("Go routine count after warm-up:", goCnt) log.Infoln("Go routine count after test:", afterGoCnt) - assert.Less(t, afterStats.HeapAlloc, stats.HeapAlloc+7e5) - assert.Less(t, afterGoCnt, goCnt+10) + assert.Less(t, afterStats.HeapAlloc, stats.HeapAlloc+5e5) + assert.Less(t, afterGoCnt, goCnt+20) } diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 12c3fd89e..0ea81382c 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -2590,6 +2590,18 @@ type: int default: 9930 components: ["origin", "cache"] --- +name: Xrootd.EnableLocalMonitoring +description: |+ + Controls whether pelican will consume and monitor the detailed monitoring from + XRootD. The detailed monitoring requires a modest amount of memory (typically, a + few megabytes) and is recommended to always leave enabled. + + This option exists to minimize memory churn during unit tests. +type: bool +default: true +hidden: true +components: ["origin", "cache"] +--- name: Xrootd.LocalMonitoringHost description: |+ A URL pointing toward the XRootD instance's Local Monitoring Host. diff --git a/metrics/xrootd_metrics.go b/metrics/xrootd_metrics.go index 525eef6a9..f1e37b5b4 100644 --- a/metrics/xrootd_metrics.go +++ b/metrics/xrootd_metrics.go @@ -400,6 +400,8 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error) return nil }) + enableHandlePacket := param.Xrootd_EnableLocalMonitoring.GetBool() + go func() { var buf [65536]byte for { @@ -412,6 +414,9 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error) continue } PacketsReceived.Inc() + if !enableHandlePacket { + continue + } if err = HandlePacket(buf[:plen]); err != nil { log.Errorln("Failed to handle packet:", err) } @@ -627,7 +632,7 @@ func HandlePacket(packet []byte) error { } firstHeaderSize := binary.BigEndian.Uint16(packet[10:12]) if firstHeaderSize < 24 { - return fmt.Errorf("First entry in f-stream packet is %v bytes, smaller than the minimum XrdXrootdMonFileTOD size of 24 bytes", firstHeaderSize) + return fmt.Errorf("first entry in f-stream packet is %v bytes, smaller than the minimum XrdXrootdMonFileTOD size of 24 bytes", firstHeaderSize) } offset := uint32(firstHeaderSize + 8) bytesRemain := header.Plen - uint16(offset) @@ -818,9 +823,7 @@ func HandlePacket(packet []byte) error { case isDisc: // XrdXrootdMonFileHdr::isDisc log.Debug("MonPacket: Received a f-stream disconnect packet") userId := UserId{Id: fileHdr.UserId} - if session := sessions.Get(userId); session != nil { - sessions.Delete(userId) - } + sessions.Delete(userId) default: log.Debug("MonPacket: Received an unhandled file monitoring packet "+ "of type ", fileHdr.RecType) @@ -902,17 +905,22 @@ func HandlePacket(packet []byte) error { log.Debug("HandlePacket: Received an appinfo packet") infoSize := uint32(header.Plen - 12) if xrdUserId, appinfo, err := GetSIDRest(packet[12 : 12+infoSize]); err == nil { - if userids.Has(xrdUserId) { - userId := userids.Get(xrdUserId).Value() + item := userids.Get(xrdUserId) + if item != nil { + userId := item.Value() project := utils.ExtractProjectFromUserAgent([]string{appinfo}) - if sessions.Has(userId) { - existingRec := sessions.Get(userId).Value() + item, found := sessions.GetOrSet(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL)) + if found { + existingRec := item.Value() existingRec.Project = project existingRec.Host = xrdUserId.Host sessions.Set(userId, existingRec, ttlcache.DefaultTTL) } else { sessions.Set(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.DefaultTTL) } + // Remove the user id record as this is the only use for it; otherwise, it'll sit around in the cache, + // using memory. + userids.Delete(xrdUserId) } } else { return err diff --git a/param/parameters.go b/param/parameters.go index 03d8aa114..5532be15b 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -398,6 +398,7 @@ var ( Shoveler_VerifyHeader = BoolParam{"Shoveler.VerifyHeader"} StagePlugin_Hook = BoolParam{"StagePlugin.Hook"} TLSSkipVerify = BoolParam{"TLSSkipVerify"} + Xrootd_EnableLocalMonitoring = BoolParam{"Xrootd.EnableLocalMonitoring"} ) var ( diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 1c8817ee6..dfd508cc2 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -334,6 +334,7 @@ type Config struct { ConfigFile string `mapstructure:"configfile" yaml:"ConfigFile"` DetailedMonitoringHost string `mapstructure:"detailedmonitoringhost" yaml:"DetailedMonitoringHost"` DetailedMonitoringPort int `mapstructure:"detailedmonitoringport" yaml:"DetailedMonitoringPort"` + EnableLocalMonitoring bool `mapstructure:"enablelocalmonitoring" yaml:"EnableLocalMonitoring"` LocalMonitoringHost string `mapstructure:"localmonitoringhost" yaml:"LocalMonitoringHost"` MacaroonsKeyFile string `mapstructure:"macaroonskeyfile" yaml:"MacaroonsKeyFile"` ManagerHost string `mapstructure:"managerhost" yaml:"ManagerHost"` @@ -662,6 +663,7 @@ type configWithType struct { ConfigFile struct { Type string; Value string } DetailedMonitoringHost struct { Type string; Value string } DetailedMonitoringPort struct { Type string; Value int } + EnableLocalMonitoring struct { Type string; Value bool } LocalMonitoringHost struct { Type string; Value string } MacaroonsKeyFile struct { Type string; Value string } ManagerHost struct { Type string; Value string } From 0f1a41ca55e0c56120efcf4da69c8aba6a321fcb Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 18 Feb 2025 08:45:28 -0600 Subject: [PATCH 2/4] For the memory stress test, ensure the caches are enabled --- director/resources/director-public.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/director/resources/director-public.yaml b/director/resources/director-public.yaml index 7e584dd64..dcc9d30c4 100644 --- a/director/resources/director-public.yaml +++ b/director/resources/director-public.yaml @@ -11,6 +11,9 @@ Director: # to trigger any memory hoarding issues. #AssumePresenceAtSingleOrigin: false CachePresenceCapacity: 100 + CheckCachePresence: true + CheckOriginPresence: true + EnableStat: true Origin: # Things that configure the origin itself From 5c108a1eae5fe6bb7542a88873f5035221108c50 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 18 Feb 2025 08:46:28 -0600 Subject: [PATCH 3/4] Hoist compilation of common regexp to a global variable Reduces churn of objects on the heap observed in memory profiles --- utils/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/utils.go b/utils/utils.go index 63e2ebeef..ebc16fd9e 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -36,6 +36,7 @@ import ( var ( watermarkUnits = []byte{'k', 'm', 'g', 't'} + uaRegExp = regexp.MustCompile(`^pelican-[^\/]+\/\d+\.\d+\.\d+`) ) // snakeCaseToCamelCase converts a snake case string to camel case. @@ -124,7 +125,6 @@ func ExtractAndMaskIP(ipStr string) (maskedIP string, ok bool) { // the user agent. // It will return empty strings if the provided userAgent fails to match against the parser func ExtractVersionAndServiceFromUserAgent(userAgent string) (reqVer, service string) { - uaRegExp := regexp.MustCompile(`^pelican-[^\/]+\/\d+\.\d+\.\d+`) if matches := uaRegExp.MatchString(userAgent); !matches { return "", "" } From cbac6a62adc1c3c48ca9c3df26f4a9a5f7538ee5 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 23 Feb 2025 13:14:56 -0600 Subject: [PATCH 4/4] Refactor to allow correct lifetime tracking of user ID Keep a back-reference in the user record to the XrdUserId object; this allows us to delete the user ID upon receipt of the disconnect packet. --- metrics/xrootd_metrics.go | 27 ++++++++++++--------------- metrics/xrootd_metrics_test.go | 8 +++++--- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/metrics/xrootd_metrics.go b/metrics/xrootd_metrics.go index f1e37b5b4..e57a8da81 100644 --- a/metrics/xrootd_metrics.go +++ b/metrics/xrootd_metrics.go @@ -67,7 +67,7 @@ type ( Org string Groups []string Project string - Host string + XrdUserId XrdUserId // Back reference to the XRootD user ID generating this record } FileId struct { @@ -567,7 +567,7 @@ func ParseTokenAuth(tokenauth string) (userId UserId, record UserRecord, err err func ParseFileHeader(packet []byte) (XrdXrootdMonFileHdr, error) { if len(packet) < 8 { - return XrdXrootdMonFileHdr{}, fmt.Errorf("Passed header of size %v which is below the minimum header size of 8 bytes", len(packet)) + return XrdXrootdMonFileHdr{}, fmt.Errorf("passed header of size %v which is below the minimum header size of 8 bytes", len(packet)) } fileHdr := XrdXrootdMonFileHdr{ RecType: recTval(packet[0]), @@ -666,10 +666,9 @@ func HandlePacket(packet []byte) error { var oldWriteBytes uint64 = 0 if xferRecord != nil { userRecord := sessions.Get(xferRecord.Value().UserId) - sessions.Delete(xferRecord.Value().UserId) labels["path"] = xferRecord.Value().Path if userRecord != nil { - maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host) + maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host) if !ok { log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP)) } else { @@ -774,7 +773,7 @@ func HandlePacket(packet []byte) error { userRecord := sessions.Get(record.UserId) labels["path"] = record.Path if userRecord != nil { - maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host) + maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host) if !ok { log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP)) } else { @@ -823,7 +822,10 @@ func HandlePacket(packet []byte) error { case isDisc: // XrdXrootdMonFileHdr::isDisc log.Debug("MonPacket: Received a f-stream disconnect packet") userId := UserId{Id: fileHdr.UserId} - sessions.Delete(userId) + item, found := sessions.GetAndDelete(userId) + if found { + userids.Delete(item.Value().XrdUserId) + } default: log.Debug("MonPacket: Received an unhandled file monitoring packet "+ "of type ", fileHdr.RecType) @@ -909,18 +911,13 @@ func HandlePacket(packet []byte) error { if item != nil { userId := item.Value() project := utils.ExtractProjectFromUserAgent([]string{appinfo}) - item, found := sessions.GetOrSet(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL)) + item, found := sessions.GetOrSet(userId, UserRecord{Project: project, XrdUserId: xrdUserId}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL)) if found { existingRec := item.Value() existingRec.Project = project - existingRec.Host = xrdUserId.Host + existingRec.XrdUserId = xrdUserId sessions.Set(userId, existingRec, ttlcache.DefaultTTL) - } else { - sessions.Set(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.DefaultTTL) } - // Remove the user id record as this is the only use for it; otherwise, it'll sit around in the cache, - // using memory. - userids.Delete(xrdUserId) } } else { return err @@ -951,7 +948,7 @@ func HandlePacket(packet []byte) error { if len(record.AuthenticationProtocol) > 0 { record.User = xrdUserId.User } - record.Host = xrdUserId.Host + record.XrdUserId = xrdUserId sessions.Set(UserId{Id: dictid}, record, ttlcache.DefaultTTL) userids.Set(xrdUserId, UserId{Id: dictid}, ttlcache.DefaultTTL) } else { @@ -965,7 +962,7 @@ func HandlePacket(packet []byte) error { if err != nil { return err } - userRecord.Host = xrdUserId.Host + userRecord.XrdUserId = xrdUserId sessions.Set(userId, userRecord, ttlcache.DefaultTTL) } else { return err diff --git a/metrics/xrootd_metrics_test.go b/metrics/xrootd_metrics_test.go index 99a37f8e7..1e2a7abb0 100644 --- a/metrics/xrootd_metrics_test.go +++ b/metrics/xrootd_metrics_test.go @@ -414,6 +414,7 @@ func TestHandlePacket(t *testing.T) { Sid: 143152967831384, Host: "fae8c2865de4", } + mockUserRecord.XrdUserId = mockXrdUserId mockInfo := []byte(getUserIdString(mockXrdUserId) + "\n" + getAuthInfoString(mockUserRecord)) mockMonMap := XrdXrootdMonMap{ Hdr: XrdXrootdMonHeader{ // 8B @@ -442,7 +443,7 @@ func TestHandlePacket(t *testing.T) { assert.Equal(t, mockUserRecord.DN, sessionEntry.DN) assert.Equal(t, mockUserRecord.Role, sessionEntry.Role) assert.Equal(t, mockUserRecord.Org, sessionEntry.Org) - assert.Equal(t, mockXrdUserId.Host, sessionEntry.Host) + assert.Equal(t, mockXrdUserId.Host, sessionEntry.XrdUserId.Host) sessions.DeleteAll() }) @@ -485,7 +486,7 @@ func TestHandlePacket(t *testing.T) { require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update") sessionEntry := sessions.Get(sessions.Keys()[0]).Value() - assert.Equal(t, expectedIPv4, sessionEntry.Host) + assert.Equal(t, expectedIPv4, sessionEntry.XrdUserId.Host) }) @@ -504,6 +505,7 @@ func TestHandlePacket(t *testing.T) { Sid: 143152967831384, Host: expectedIPv6, } + mockUserRecord.XrdUserId = mockXrdUserIdIPv6 mockInfoIPv6 := []byte(getUserIdString(mockXrdUserIdIPv6) + "\n" + getAuthInfoString(mockUserRecord)) mockMonMapIPv6 := XrdXrootdMonMap{ Hdr: XrdXrootdMonHeader{ // 8B @@ -527,7 +529,7 @@ func TestHandlePacket(t *testing.T) { require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update") sessionEntry := sessions.Get(sessions.Keys()[0]).Value() - assert.Equal(t, expectedIPv6, sessionEntry.Host) + assert.Equal(t, expectedIPv6, sessionEntry.XrdUserId.Host) }) t.Run("file-path-packet-d-should-register-correct-info", func(t *testing.T) {