Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory hoarding in the unit test #2028

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Xrootd:
DetailedMonitoringPort: 9930
SummaryMonitoringPort: 9931
AuthRefreshInterval: 5m
EnableLocalMonitoring: true
Transport:
DialerTimeout: 10s
DialerKeepAlive: 30s
Expand Down
3 changes: 3 additions & 0 deletions director/resources/director-public.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Director:
# to trigger any memory hoarding issues.
#AssumePresenceAtSingleOrigin: false
CachePresenceCapacity: 100
CheckCachePresence: true
CheckOriginPresence: true
EnableStat: true

Origin:
# Things that configure the origin itself
Expand Down
6 changes: 4 additions & 2 deletions director/stat_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

Expand All @@ -52,6 +53,7 @@ var (
func TestStatMemory(t *testing.T) {
server_utils.ResetTestState()

viper.Set(param.Xrootd_EnableLocalMonitoring.GetName(), false)
fed := fed_test_utils.NewFedTest(t, directorPublicCfg)
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
assert.NoError(t, err)
Expand Down Expand Up @@ -102,6 +104,6 @@ func TestStatMemory(t *testing.T) {
log.Infoln("Go routine count after warm-up:", goCnt)
log.Infoln("Go routine count after test:", afterGoCnt)

assert.Less(t, afterStats.HeapAlloc, stats.HeapAlloc+7e5)
assert.Less(t, afterGoCnt, goCnt+10)
assert.Less(t, afterStats.HeapAlloc, stats.HeapAlloc+5e5)
assert.Less(t, afterGoCnt, goCnt+20)
}
12 changes: 12 additions & 0 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2590,6 +2590,18 @@ type: int
default: 9930
components: ["origin", "cache"]
---
name: Xrootd.EnableLocalMonitoring
description: |+
Controls whether pelican will consume and monitor the detailed monitoring from
XRootD. The detailed monitoring requires a modest amount of memory (typically, a
few megabytes) and is recommended to always leave enabled.

This option exists to minimize memory churn during unit tests.
type: bool
default: true
hidden: true
components: ["origin", "cache"]
---
name: Xrootd.LocalMonitoringHost
description: |+
A URL pointing toward the XRootD instance's Local Monitoring Host.
Expand Down
39 changes: 22 additions & 17 deletions metrics/xrootd_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
Org string
Groups []string
Project string
Host string
XrdUserId XrdUserId // Back reference to the XRootD user ID generating this record
}

FileId struct {
Expand Down Expand Up @@ -400,6 +400,8 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error)
return nil
})

enableHandlePacket := param.Xrootd_EnableLocalMonitoring.GetBool()

go func() {
var buf [65536]byte
for {
Expand All @@ -412,6 +414,9 @@ func ConfigureMonitoring(ctx context.Context, egrp *errgroup.Group) (int, error)
continue
}
PacketsReceived.Inc()
if !enableHandlePacket {
continue
}
if err = HandlePacket(buf[:plen]); err != nil {
log.Errorln("Failed to handle packet:", err)
}
Expand Down Expand Up @@ -562,7 +567,7 @@ func ParseTokenAuth(tokenauth string) (userId UserId, record UserRecord, err err

func ParseFileHeader(packet []byte) (XrdXrootdMonFileHdr, error) {
if len(packet) < 8 {
return XrdXrootdMonFileHdr{}, fmt.Errorf("Passed header of size %v which is below the minimum header size of 8 bytes", len(packet))
return XrdXrootdMonFileHdr{}, fmt.Errorf("passed header of size %v which is below the minimum header size of 8 bytes", len(packet))
}
fileHdr := XrdXrootdMonFileHdr{
RecType: recTval(packet[0]),
Expand Down Expand Up @@ -627,7 +632,7 @@ func HandlePacket(packet []byte) error {
}
firstHeaderSize := binary.BigEndian.Uint16(packet[10:12])
if firstHeaderSize < 24 {
return fmt.Errorf("First entry in f-stream packet is %v bytes, smaller than the minimum XrdXrootdMonFileTOD size of 24 bytes", firstHeaderSize)
return fmt.Errorf("first entry in f-stream packet is %v bytes, smaller than the minimum XrdXrootdMonFileTOD size of 24 bytes", firstHeaderSize)
}
offset := uint32(firstHeaderSize + 8)
bytesRemain := header.Plen - uint16(offset)
Expand Down Expand Up @@ -661,10 +666,9 @@ func HandlePacket(packet []byte) error {
var oldWriteBytes uint64 = 0
if xferRecord != nil {
userRecord := sessions.Get(xferRecord.Value().UserId)
sessions.Delete(xferRecord.Value().UserId)
labels["path"] = xferRecord.Value().Path
if userRecord != nil {
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host)
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host)
if !ok {
log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP))
} else {
Expand Down Expand Up @@ -769,7 +773,7 @@ func HandlePacket(packet []byte) error {
userRecord := sessions.Get(record.UserId)
labels["path"] = record.Path
if userRecord != nil {
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host)
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host)
if !ok {
log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP))
} else {
Expand Down Expand Up @@ -818,8 +822,9 @@ func HandlePacket(packet []byte) error {
case isDisc: // XrdXrootdMonFileHdr::isDisc
log.Debug("MonPacket: Received a f-stream disconnect packet")
userId := UserId{Id: fileHdr.UserId}
if session := sessions.Get(userId); session != nil {
sessions.Delete(userId)
item, found := sessions.GetAndDelete(userId)
if found {
userids.Delete(item.Value().XrdUserId)
}
default:
log.Debug("MonPacket: Received an unhandled file monitoring packet "+
Expand Down Expand Up @@ -902,16 +907,16 @@ func HandlePacket(packet []byte) error {
log.Debug("HandlePacket: Received an appinfo packet")
infoSize := uint32(header.Plen - 12)
if xrdUserId, appinfo, err := GetSIDRest(packet[12 : 12+infoSize]); err == nil {
if userids.Has(xrdUserId) {
userId := userids.Get(xrdUserId).Value()
item := userids.Get(xrdUserId)
if item != nil {
userId := item.Value()
project := utils.ExtractProjectFromUserAgent([]string{appinfo})
if sessions.Has(userId) {
existingRec := sessions.Get(userId).Value()
item, found := sessions.GetOrSet(userId, UserRecord{Project: project, XrdUserId: xrdUserId}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL))
if found {
existingRec := item.Value()
existingRec.Project = project
existingRec.Host = xrdUserId.Host
existingRec.XrdUserId = xrdUserId
sessions.Set(userId, existingRec, ttlcache.DefaultTTL)
} else {
sessions.Set(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.DefaultTTL)
}
}
} else {
Expand Down Expand Up @@ -943,7 +948,7 @@ func HandlePacket(packet []byte) error {
if len(record.AuthenticationProtocol) > 0 {
record.User = xrdUserId.User
}
record.Host = xrdUserId.Host
record.XrdUserId = xrdUserId
sessions.Set(UserId{Id: dictid}, record, ttlcache.DefaultTTL)
userids.Set(xrdUserId, UserId{Id: dictid}, ttlcache.DefaultTTL)
} else {
Expand All @@ -957,7 +962,7 @@ func HandlePacket(packet []byte) error {
if err != nil {
return err
}
userRecord.Host = xrdUserId.Host
userRecord.XrdUserId = xrdUserId
sessions.Set(userId, userRecord, ttlcache.DefaultTTL)
} else {
return err
Expand Down
8 changes: 5 additions & 3 deletions metrics/xrootd_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func TestHandlePacket(t *testing.T) {
Sid: 143152967831384,
Host: "fae8c2865de4",
}
mockUserRecord.XrdUserId = mockXrdUserId
mockInfo := []byte(getUserIdString(mockXrdUserId) + "\n" + getAuthInfoString(mockUserRecord))
mockMonMap := XrdXrootdMonMap{
Hdr: XrdXrootdMonHeader{ // 8B
Expand Down Expand Up @@ -442,7 +443,7 @@ func TestHandlePacket(t *testing.T) {
assert.Equal(t, mockUserRecord.DN, sessionEntry.DN)
assert.Equal(t, mockUserRecord.Role, sessionEntry.Role)
assert.Equal(t, mockUserRecord.Org, sessionEntry.Org)
assert.Equal(t, mockXrdUserId.Host, sessionEntry.Host)
assert.Equal(t, mockXrdUserId.Host, sessionEntry.XrdUserId.Host)

sessions.DeleteAll()
})
Expand Down Expand Up @@ -485,7 +486,7 @@ func TestHandlePacket(t *testing.T) {
require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update")

sessionEntry := sessions.Get(sessions.Keys()[0]).Value()
assert.Equal(t, expectedIPv4, sessionEntry.Host)
assert.Equal(t, expectedIPv4, sessionEntry.XrdUserId.Host)

})

Expand All @@ -504,6 +505,7 @@ func TestHandlePacket(t *testing.T) {
Sid: 143152967831384,
Host: expectedIPv6,
}
mockUserRecord.XrdUserId = mockXrdUserIdIPv6
mockInfoIPv6 := []byte(getUserIdString(mockXrdUserIdIPv6) + "\n" + getAuthInfoString(mockUserRecord))
mockMonMapIPv6 := XrdXrootdMonMap{
Hdr: XrdXrootdMonHeader{ // 8B
Expand All @@ -527,7 +529,7 @@ func TestHandlePacket(t *testing.T) {
require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update")

sessionEntry := sessions.Get(sessions.Keys()[0]).Value()
assert.Equal(t, expectedIPv6, sessionEntry.Host)
assert.Equal(t, expectedIPv6, sessionEntry.XrdUserId.Host)
})

t.Run("file-path-packet-d-should-register-correct-info", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions param/parameters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions param/parameters_struct.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

var (
watermarkUnits = []byte{'k', 'm', 'g', 't'}
uaRegExp = regexp.MustCompile(`^pelican-[^\/]+\/\d+\.\d+\.\d+`)
)

// snakeCaseToCamelCase converts a snake case string to camel case.
Expand Down Expand Up @@ -124,7 +125,6 @@ func ExtractAndMaskIP(ipStr string) (maskedIP string, ok bool) {
// the user agent.
// It will return empty strings if the provided userAgent fails to match against the parser
func ExtractVersionAndServiceFromUserAgent(userAgent string) (reqVer, service string) {
uaRegExp := regexp.MustCompile(`^pelican-[^\/]+\/\d+\.\d+\.\d+`)
if matches := uaRegExp.MatchString(userAgent); !matches {
return "", ""
}
Expand Down
Loading