diff --git a/metrics/xrootd_metrics.go b/metrics/xrootd_metrics.go index f1e37b5b4..e57a8da81 100644 --- a/metrics/xrootd_metrics.go +++ b/metrics/xrootd_metrics.go @@ -67,7 +67,7 @@ type ( Org string Groups []string Project string - Host string + XrdUserId XrdUserId // Back reference to the XRootD user ID generating this record } FileId struct { @@ -567,7 +567,7 @@ func ParseTokenAuth(tokenauth string) (userId UserId, record UserRecord, err err func ParseFileHeader(packet []byte) (XrdXrootdMonFileHdr, error) { if len(packet) < 8 { - return XrdXrootdMonFileHdr{}, fmt.Errorf("Passed header of size %v which is below the minimum header size of 8 bytes", len(packet)) + return XrdXrootdMonFileHdr{}, fmt.Errorf("passed header of size %v which is below the minimum header size of 8 bytes", len(packet)) } fileHdr := XrdXrootdMonFileHdr{ RecType: recTval(packet[0]), @@ -666,10 +666,9 @@ func HandlePacket(packet []byte) error { var oldWriteBytes uint64 = 0 if xferRecord != nil { userRecord := sessions.Get(xferRecord.Value().UserId) - sessions.Delete(xferRecord.Value().UserId) labels["path"] = xferRecord.Value().Path if userRecord != nil { - maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host) + maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host) if !ok { log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP)) } else { @@ -774,7 +773,7 @@ func HandlePacket(packet []byte) error { userRecord := sessions.Get(record.UserId) labels["path"] = record.Path if userRecord != nil { - maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().Host) + maskedIP, ok := utils.ExtractAndMaskIP(userRecord.Value().XrdUserId.Host) if !ok { log.Warning(fmt.Sprintf("Failed to mask IP address: %s", maskedIP)) } else { @@ -823,7 +822,10 @@ func HandlePacket(packet []byte) error { case isDisc: // XrdXrootdMonFileHdr::isDisc log.Debug("MonPacket: Received a f-stream disconnect packet") userId := UserId{Id: fileHdr.UserId} - sessions.Delete(userId) + item, found := sessions.GetAndDelete(userId) + if found { + userids.Delete(item.Value().XrdUserId) + } default: log.Debug("MonPacket: Received an unhandled file monitoring packet "+ "of type ", fileHdr.RecType) @@ -909,18 +911,13 @@ func HandlePacket(packet []byte) error { if item != nil { userId := item.Value() project := utils.ExtractProjectFromUserAgent([]string{appinfo}) - item, found := sessions.GetOrSet(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL)) + item, found := sessions.GetOrSet(userId, UserRecord{Project: project, XrdUserId: xrdUserId}, ttlcache.WithTTL[UserId, UserRecord](ttlcache.DefaultTTL)) if found { existingRec := item.Value() existingRec.Project = project - existingRec.Host = xrdUserId.Host + existingRec.XrdUserId = xrdUserId sessions.Set(userId, existingRec, ttlcache.DefaultTTL) - } else { - sessions.Set(userId, UserRecord{Project: project, Host: xrdUserId.Host}, ttlcache.DefaultTTL) } - // Remove the user id record as this is the only use for it; otherwise, it'll sit around in the cache, - // using memory. - userids.Delete(xrdUserId) } } else { return err @@ -951,7 +948,7 @@ func HandlePacket(packet []byte) error { if len(record.AuthenticationProtocol) > 0 { record.User = xrdUserId.User } - record.Host = xrdUserId.Host + record.XrdUserId = xrdUserId sessions.Set(UserId{Id: dictid}, record, ttlcache.DefaultTTL) userids.Set(xrdUserId, UserId{Id: dictid}, ttlcache.DefaultTTL) } else { @@ -965,7 +962,7 @@ func HandlePacket(packet []byte) error { if err != nil { return err } - userRecord.Host = xrdUserId.Host + userRecord.XrdUserId = xrdUserId sessions.Set(userId, userRecord, ttlcache.DefaultTTL) } else { return err diff --git a/metrics/xrootd_metrics_test.go b/metrics/xrootd_metrics_test.go index 99a37f8e7..1e2a7abb0 100644 --- a/metrics/xrootd_metrics_test.go +++ b/metrics/xrootd_metrics_test.go @@ -414,6 +414,7 @@ func TestHandlePacket(t *testing.T) { Sid: 143152967831384, Host: "fae8c2865de4", } + mockUserRecord.XrdUserId = mockXrdUserId mockInfo := []byte(getUserIdString(mockXrdUserId) + "\n" + getAuthInfoString(mockUserRecord)) mockMonMap := XrdXrootdMonMap{ Hdr: XrdXrootdMonHeader{ // 8B @@ -442,7 +443,7 @@ func TestHandlePacket(t *testing.T) { assert.Equal(t, mockUserRecord.DN, sessionEntry.DN) assert.Equal(t, mockUserRecord.Role, sessionEntry.Role) assert.Equal(t, mockUserRecord.Org, sessionEntry.Org) - assert.Equal(t, mockXrdUserId.Host, sessionEntry.Host) + assert.Equal(t, mockXrdUserId.Host, sessionEntry.XrdUserId.Host) sessions.DeleteAll() }) @@ -485,7 +486,7 @@ func TestHandlePacket(t *testing.T) { require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update") sessionEntry := sessions.Get(sessions.Keys()[0]).Value() - assert.Equal(t, expectedIPv4, sessionEntry.Host) + assert.Equal(t, expectedIPv4, sessionEntry.XrdUserId.Host) }) @@ -504,6 +505,7 @@ func TestHandlePacket(t *testing.T) { Sid: 143152967831384, Host: expectedIPv6, } + mockUserRecord.XrdUserId = mockXrdUserIdIPv6 mockInfoIPv6 := []byte(getUserIdString(mockXrdUserIdIPv6) + "\n" + getAuthInfoString(mockUserRecord)) mockMonMapIPv6 := XrdXrootdMonMap{ Hdr: XrdXrootdMonHeader{ // 8B @@ -527,7 +529,7 @@ func TestHandlePacket(t *testing.T) { require.Equal(t, 1, len(sessions.Keys()), "Session cache didn't update") sessionEntry := sessions.Get(sessions.Keys()[0]).Value() - assert.Equal(t, expectedIPv6, sessionEntry.Host) + assert.Equal(t, expectedIPv6, sessionEntry.XrdUserId.Host) }) t.Run("file-path-packet-d-should-register-correct-info", func(t *testing.T) {