Skip to content

Commit

Permalink
Refactor to allow correct lifetime tracking of user ID
Browse files Browse the repository at this point in the history
Keep a back-reference in the user record to the XrdUserId object;
this allows us to delete the user ID upon receipt of the disconnect
packet.
  • Loading branch information
bbockelm committed Feb 23, 2025
1 parent 5c108a1 commit cbac6a6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
27 changes: 12 additions & 15 deletions metrics/xrootd_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
Org string
Groups []string
Project string
Host string
XrdUserId XrdUserId // Back reference to the XRootD user ID generating this record
}

FileId struct {
Expand Down Expand Up @@ -567,7 +567,7 @@ func ParseTokenAuth(tokenauth string) (userId UserId, record UserRecord, err err

func ParseFileHeader(packet []byte) (XrdXrootdMonFileHdr, error) {
if len(packet) < 8 {
return XrdXrootdMonFileHdr{}, fmt.Errorf("Passed header of size %v which is below the minimum header size of 8 bytes", len(packet))
return XrdXrootdMonFileHdr{}, fmt.Errorf("passed header of size %v which is below the minimum header size of 8 bytes", len(packet))
}
fileHdr := XrdXrootdMonFileHdr{
RecType: recTval(packet[0]),
Expand Down Expand Up @@ -666,10 +666,9 @@ func HandlePacket(packet []byte) error {
var oldWriteBytes uint64 = 0
if xferRecord != nil {
userRecord := sessions.Get(xferRecord.Value().UserId)
sessions.Delete(xferRecord.Value().UserId)
labels["path"] = xferRecord.Value().Path
if userRecord != nil {
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host)
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host)
if !ok {
log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP))
} else {
Expand Down Expand Up @@ -774,7 +773,7 @@ func HandlePacket(packet []byte) error {
userRecord := sessions.Get(record.UserId)
labels["path"] = record.Path
if userRecord != nil {
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host)
maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host)
if !ok {
log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP))
} else {
Expand Down Expand Up @@ -823,7 +822,10 @@ func HandlePacket(packet []byte) error {
case isDisc: // XrdXrootdMonFileHdr::isDisc
log.Debug("MonPacket: Received a f-stream disconnect packet")
userId := UserId{Id: fileHdr.UserId}
sessions.Delete(userId)
item, found := sessions.GetAndDelete(userId)
if found {
userids.Delete(item.Value().XrdUserId)
}
default:
log.Debug("MonPacket: Received an unhandled file monitoring packet "+
"of type ", fileHdr.RecType)
Expand Down Expand Up @@ -909,18 +911,13 @@ func HandlePacket(packet []byte) error {
if item != nil {
userId := item.Value()
project := utils.ExtractProjectFromUserAgent([]string{appinfo})
item, found := sessions.GetOrSet(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL))
item, found := sessions.GetOrSet(userId, UserRecord{Project: project, XrdUserId: xrdUserId}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL))
if found {
existingRec := item.Value()
existingRec.Project = project
existingRec.Host = xrdUserId.Host
existingRec.XrdUserId = xrdUserId
sessions.Set(userId, existingRec, ttlcache.DefaultTTL)
} else {
sessions.Set(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.DefaultTTL)
}
// Remove the user id record as this is the only use for it; otherwise, it'll sit around in the cache,
// using memory.
userids.Delete(xrdUserId)
}
} else {
return err
Expand Down Expand Up @@ -951,7 +948,7 @@ func HandlePacket(packet []byte) error {
if len(record.AuthenticationProtocol) > 0 {
record.User = xrdUserId.User
}
record.Host = xrdUserId.Host
record.XrdUserId = xrdUserId
sessions.Set(UserId{Id: dictid}, record, ttlcache.DefaultTTL)
userids.Set(xrdUserId, UserId{Id: dictid}, ttlcache.DefaultTTL)
} else {
Expand All @@ -965,7 +962,7 @@ func HandlePacket(packet []byte) error {
if err != nil {
return err
}
userRecord.Host = xrdUserId.Host
userRecord.XrdUserId = xrdUserId
sessions.Set(userId, userRecord, ttlcache.DefaultTTL)
} else {
return err
Expand Down
8 changes: 5 additions & 3 deletions metrics/xrootd_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func TestHandlePacket(t *testing.T) {
Sid: 143152967831384,
Host: "fae8c2865de4",
}
mockUserRecord.XrdUserId = mockXrdUserId
mockInfo := []byte(getUserIdString(mockXrdUserId) + "\n" + getAuthInfoString(mockUserRecord))
mockMonMap := XrdXrootdMonMap{
Hdr: XrdXrootdMonHeader{ // 8B
Expand Down Expand Up @@ -442,7 +443,7 @@ func TestHandlePacket(t *testing.T) {
assert.Equal(t, mockUserRecord.DN, sessionEntry.DN)
assert.Equal(t, mockUserRecord.Role, sessionEntry.Role)
assert.Equal(t, mockUserRecord.Org, sessionEntry.Org)
assert.Equal(t, mockXrdUserId.Host, sessionEntry.Host)
assert.Equal(t, mockXrdUserId.Host, sessionEntry.XrdUserId.Host)

sessions.DeleteAll()
})
Expand Down Expand Up @@ -485,7 +486,7 @@ func TestHandlePacket(t *testing.T) {
require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update")

sessionEntry := sessions.Get(sessions.Keys()[0]).Value()
assert.Equal(t, expectedIPv4, sessionEntry.Host)
assert.Equal(t, expectedIPv4, sessionEntry.XrdUserId.Host)

})

Expand All @@ -504,6 +505,7 @@ func TestHandlePacket(t *testing.T) {
Sid: 143152967831384,
Host: expectedIPv6,
}
mockUserRecord.XrdUserId = mockXrdUserIdIPv6
mockInfoIPv6 := []byte(getUserIdString(mockXrdUserIdIPv6) + "\n" + getAuthInfoString(mockUserRecord))
mockMonMapIPv6 := XrdXrootdMonMap{
Hdr: XrdXrootdMonHeader{ // 8B
Expand All @@ -527,7 +529,7 @@ func TestHandlePacket(t *testing.T) {
require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update")

sessionEntry := sessions.Get(sessions.Keys()[0]).Value()
assert.Equal(t, expectedIPv6, sessionEntry.Host)
assert.Equal(t, expectedIPv6, sessionEntry.XrdUserId.Host)
})

t.Run("file-path-packet-d-should-register-correct-info", func(t *testing.T) {
Expand Down

0 comments on commit cbac6a6

Please sign in to comment.