Skip to content
This repository has been archived by the owner on Oct 20, 2024. It is now read-only.

Commit

Permalink
feat: Returns remote address in message handlers (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
gruyaume authored Jan 9, 2024
1 parent 4a8ef0c commit a839158
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 54 deletions.
8 changes: 4 additions & 4 deletions network/udpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
type UdpServer struct {
conn *net.UDPConn
closeCh chan struct{}
Handler func([]byte)
Handler func(net.Addr, []byte)
}

func (udpServer *UdpServer) SetHandler(handler func([]byte)) {
func (udpServer *UdpServer) SetHandler(handler func(net.Addr, []byte)) {
udpServer.Handler = handler
}

Expand Down Expand Up @@ -49,15 +49,15 @@ func (udpServer *UdpServer) listen() error {
return nil
default:
buffer := make([]byte, 1024)
length, _, err := udpServer.conn.ReadFrom(buffer)
length, remoteAddress, err := udpServer.conn.ReadFrom(buffer)
if err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
return fmt.Errorf("failed to read from UDP connection: %w", err)
}
continue
}
if udpServer.Handler != nil {
udpServer.Handler(buffer[:length])
udpServer.Handler(remoteAddress, buffer[:length])
}
}
}
Expand Down
67 changes: 34 additions & 33 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,28 @@ package server

import (
"log"
"net"

"github.com/dot-5g/pfcp/messages"
"github.com/dot-5g/pfcp/network"
)

type HandleHeartbeatRequest func(sequenceNumber uint32, msg messages.HeartbeatRequest)
type HandleHeartbeatResponse func(sequenceNumber uint32, msg messages.HeartbeatResponse)
type HandlePFCPAssociationSetupRequest func(sequenceNumber uint32, msg messages.PFCPAssociationSetupRequest)
type HandlePFCPAssociationSetupResponse func(sequenceNumber uint32, msg messages.PFCPAssociationSetupResponse)
type HandlePFCPAssociationUpdateRequest func(sequenceNumber uint32, msg messages.PFCPAssociationUpdateRequest)
type HandlePFCPAssociationUpdateResponse func(sequenceNumber uint32, msg messages.PFCPAssociationUpdateResponse)
type HandlePFCPAssociationReleaseRequest func(sequenceNumber uint32, msg messages.PFCPAssociationReleaseRequest)
type HandlePFCPAssociationReleaseResponse func(sequenceNumber uint32, msg messages.PFCPAssociationReleaseResponse)
type HandlePFCPNodeReportRequest func(sequenceNumber uint32, msg messages.PFCPNodeReportRequest)
type HandlePFCPNodeReportResponse func(sequenceNumber uint32, msg messages.PFCPNodeReportResponse)
type HandlePFCPSessionEstablishmentRequest func(sequenceNumber uint32, seid uint64, msg messages.PFCPSessionEstablishmentRequest)
type HandlePFCPSessionEstablishmentResponse func(sequenceNumber uint32, seid uint64, msg messages.PFCPSessionEstablishmentResponse)
type HandlePFCPSessionDeletionRequest func(sequenceNumber uint32, seid uint64, msg messages.PFCPSessionDeletionRequest)
type HandlePFCPSessionDeletionResponse func(sequenceNumber uint32, seid uint64, msg messages.PFCPSessionDeletionResponse)
type HandlePFCPSessionReportRequest func(sequenceNumber uint32, seid uint64, msg messages.PFCPSessionReportRequest)
type HandlePFCPSessionReportResponse func(sequenceNumber uint32, seid uint64, msg messages.PFCPSessionReportResponse)
type HandleHeartbeatRequest func(address net.Addr, sequenceNumber uint32, msg messages.HeartbeatRequest)
type HandleHeartbeatResponse func(address net.Addr, sequenceNumber uint32, msg messages.HeartbeatResponse)
type HandlePFCPAssociationSetupRequest func(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationSetupRequest)
type HandlePFCPAssociationSetupResponse func(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationSetupResponse)
type HandlePFCPAssociationUpdateRequest func(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationUpdateRequest)
type HandlePFCPAssociationUpdateResponse func(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationUpdateResponse)
type HandlePFCPAssociationReleaseRequest func(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationReleaseRequest)
type HandlePFCPAssociationReleaseResponse func(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationReleaseResponse)
type HandlePFCPNodeReportRequest func(address net.Addr, sequenceNumber uint32, msg messages.PFCPNodeReportRequest)
type HandlePFCPNodeReportResponse func(address net.Addr, sequenceNumber uint32, msg messages.PFCPNodeReportResponse)
type HandlePFCPSessionEstablishmentRequest func(address net.Addr, sequenceNumber uint32, seid uint64, msg messages.PFCPSessionEstablishmentRequest)
type HandlePFCPSessionEstablishmentResponse func(address net.Addr, sequenceNumber uint32, seid uint64, msg messages.PFCPSessionEstablishmentResponse)
type HandlePFCPSessionDeletionRequest func(address net.Addr, sequenceNumber uint32, seid uint64, msg messages.PFCPSessionDeletionRequest)
type HandlePFCPSessionDeletionResponse func(address net.Addr, sequenceNumber uint32, seid uint64, msg messages.PFCPSessionDeletionResponse)
type HandlePFCPSessionReportRequest func(address net.Addr, sequenceNumber uint32, seid uint64, msg messages.PFCPSessionReportRequest)
type HandlePFCPSessionReportResponse func(address net.Addr, sequenceNumber uint32, seid uint64, msg messages.PFCPSessionReportResponse)

type Server struct {
address string
Expand Down Expand Up @@ -128,7 +129,7 @@ func (server *Server) PFCPSessionReportResponse(handler HandlePFCPSessionReportR
server.pfcpSessionReportResponseHandler = handler
}

func (server *Server) handlePFCPMessage(payload []byte) {
func (server *Server) handlePFCPMessage(address net.Addr, payload []byte) {
header, err := messages.DeserializeHeader(payload)
if err != nil {
log.Fatalf("Error deserializing header: %v", err)
Expand Down Expand Up @@ -157,7 +158,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing Heartbeat Request: %v", err)
return
}
server.heartbeatRequestHandler(header.SequenceNumber, msg)
server.heartbeatRequestHandler(address, header.SequenceNumber, msg)
case messages.HeartbeatResponseMessageType:
if server.heartbeatResponseHandler == nil {
log.Printf("No handler for Heartbeat Response")
Expand All @@ -168,7 +169,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing Heartbeat Response: %v", err)
return
}
server.heartbeatResponseHandler(header.SequenceNumber, msg)
server.heartbeatResponseHandler(address, header.SequenceNumber, msg)
case messages.PFCPAssociationSetupRequestMessageType:
if server.pfcpAssociationSetupRequestHandler == nil {
log.Printf("No handler for PFCP Association Setup Request")
Expand All @@ -179,7 +180,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Association Setup Request: %v", err)
return
}
server.pfcpAssociationSetupRequestHandler(header.SequenceNumber, msg)
server.pfcpAssociationSetupRequestHandler(address, header.SequenceNumber, msg)
case messages.PFCPAssociationSetupResponseMessageType:
if server.pfcpAssociationSetupResponseHandler == nil {
log.Printf("No handler for PFCP Association Setup Response")
Expand All @@ -190,7 +191,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Association Setup Response: %v", err)
return
}
server.pfcpAssociationSetupResponseHandler(header.SequenceNumber, msg)
server.pfcpAssociationSetupResponseHandler(address, header.SequenceNumber, msg)
case messages.PFCPAssociationUpdateRequestMessageType:
if server.pfcpAssociationUpdateRequestHandler == nil {
log.Printf("No handler for PFCP Association Update Request")
Expand All @@ -201,7 +202,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Association Update Request: %v", err)
return
}
server.pfcpAssociationUpdateRequestHandler(header.SequenceNumber, msg)
server.pfcpAssociationUpdateRequestHandler(address, header.SequenceNumber, msg)
case messages.PFCPAssociationUpdateResponseMessageType:
if server.pfcpAssociationUpdateResponseHandler == nil {
log.Printf("No handler for PFCP Association Update Response")
Expand All @@ -212,7 +213,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Association Update Response: %v", err)
return
}
server.pfcpAssociationUpdateResponseHandler(header.SequenceNumber, msg)
server.pfcpAssociationUpdateResponseHandler(address, header.SequenceNumber, msg)
case messages.PFCPAssociationReleaseRequestMessageType:
if server.pfcpAssociationReleaseRequestHandler == nil {
log.Printf("No handler for PFCP Association Release Request")
Expand All @@ -223,7 +224,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Association Release Request: %v", err)
return
}
server.pfcpAssociationReleaseRequestHandler(header.SequenceNumber, msg)
server.pfcpAssociationReleaseRequestHandler(address, header.SequenceNumber, msg)
case messages.PFCPAssociationReleaseResponseMessageType:
if server.pfcpAssociationReleaseResponseHandler == nil {
log.Printf("No handler for PFCP Association Release Response")
Expand All @@ -234,7 +235,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Association Release Response: %v", err)
return
}
server.pfcpAssociationReleaseResponseHandler(header.SequenceNumber, msg)
server.pfcpAssociationReleaseResponseHandler(address, header.SequenceNumber, msg)
case messages.PFCPNodeReportRequestMessageType:
if server.pfcpNodeReportRequestHandler == nil {
log.Printf("No handler for PFCP Node Report Request")
Expand All @@ -245,7 +246,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Node Report Request: %v", err)
return
}
server.pfcpNodeReportRequestHandler(header.SequenceNumber, msg)
server.pfcpNodeReportRequestHandler(address, header.SequenceNumber, msg)
case messages.PFCPNodeReportResponseMessageType:
if server.pfcpNodeReportResponseHandler == nil {
log.Printf("No handler for PFCP Node Report Response")
Expand All @@ -256,7 +257,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Node Report Response: %v", err)
return
}
server.pfcpNodeReportResponseHandler(header.SequenceNumber, msg)
server.pfcpNodeReportResponseHandler(address, header.SequenceNumber, msg)
case messages.PFCPSessionEstablishmentRequestMessageType:
if server.pfcpSessionEstablishmentRequestHandler == nil {
log.Printf("No handler for PFCP Session Establishment Request")
Expand All @@ -267,7 +268,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Session Establishment Request: %v", err)
return
}
server.pfcpSessionEstablishmentRequestHandler(header.SequenceNumber, header.SEID, msg)
server.pfcpSessionEstablishmentRequestHandler(address, header.SequenceNumber, header.SEID, msg)
case messages.PFCPSessionEstablishmentResponseMessageType:
if server.pfcpSessionEstablishmentResponseHandler == nil {
log.Printf("No handler for PFCP Session Establishment Response")
Expand All @@ -278,7 +279,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Session Establishment Response: %v", err)
return
}
server.pfcpSessionEstablishmentResponseHandler(header.SequenceNumber, header.SEID, msg)
server.pfcpSessionEstablishmentResponseHandler(address, header.SequenceNumber, header.SEID, msg)
case messages.PFCPSessionDeletionRequestMessageType:
if server.pfcpSessionDeletionRequestHandler == nil {
log.Printf("No handler for PFCP Session Deletion Request")
Expand All @@ -289,7 +290,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Session Deletion Request: %v", err)
return
}
server.pfcpSessionDeletionRequestHandler(header.SequenceNumber, header.SEID, msg)
server.pfcpSessionDeletionRequestHandler(address, header.SequenceNumber, header.SEID, msg)
case messages.PFCPSessionDeletionResponseMessageType:
if server.pfcpSessionDeletionResponseHandler == nil {
log.Printf("No handler for PFCP Session Deletion Response")
Expand All @@ -300,7 +301,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Session Deletion Response: %v", err)
return
}
server.pfcpSessionDeletionResponseHandler(header.SequenceNumber, header.SEID, msg)
server.pfcpSessionDeletionResponseHandler(address, header.SequenceNumber, header.SEID, msg)
case messages.PFCPSessionReportRequestMessageType:
if server.pfcpSessionReportRequestHandler == nil {
log.Printf("No handler for PFCP Session Report Request")
Expand All @@ -311,7 +312,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Session Report Request: %v", err)
return
}
server.pfcpSessionReportRequestHandler(header.SequenceNumber, header.SEID, msg)
server.pfcpSessionReportRequestHandler(address, header.SequenceNumber, header.SEID, msg)
case messages.PFCPSessionReportResponseMessageType:
if server.pfcpSessionReportResponseHandler == nil {
log.Printf("No handler for PFCP Session Report Response")
Expand All @@ -322,7 +323,7 @@ func (server *Server) handlePFCPMessage(payload []byte) {
log.Printf("Error deserializing PFCP Session Report Response: %v", err)
return
}
server.pfcpSessionReportResponseHandler(header.SequenceNumber, header.SEID, msg)
server.pfcpSessionReportResponseHandler(address, header.SequenceNumber, header.SEID, msg)
default:
log.Printf("Unknown PFCP message type: %v", header.MessageType)
}
Expand Down
15 changes: 12 additions & 3 deletions tests/heartbeat_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"net"
"sync"
"testing"
"time"
Expand All @@ -16,6 +17,7 @@ var (
heartbeatRequesthandlerCalled bool
heartbeatRequestreceivedRecoveryTimestamp ie.RecoveryTimeStamp
heartbeatRequestReceivedSequenceNumber uint32
heartbeatRequestReceivedRemoteAddress net.Addr
)

var (
Expand All @@ -33,15 +35,16 @@ var (
heartbeatResponseReceivedSequenceNumber uint32
)

func HandleHeartbeatRequest(sequenceNumber uint32, msg messages.HeartbeatRequest) {
func HandleHeartbeatRequest(address net.Addr, sequenceNumber uint32, msg messages.HeartbeatRequest) {
heartbeatRequestMu.Lock()
defer heartbeatRequestMu.Unlock()
heartbeatRequesthandlerCalled = true
heartbeatRequestreceivedRecoveryTimestamp = msg.RecoveryTimeStamp
heartbeatRequestReceivedSequenceNumber = sequenceNumber
heartbeatRequestReceivedRemoteAddress = address
}

func HandleHeartbeatRequestWithSourceIP(sequenceNumber uint32, msg messages.HeartbeatRequest) {
func HandleHeartbeatRequestWithSourceIP(address net.Addr, sequenceNumber uint32, msg messages.HeartbeatRequest) {
heartbeatRequestWithSourceIPMu.Lock()
defer heartbeatRequestWithSourceIPMu.Unlock()
heartbeatRequestWithSourceIPhandlerCalled = true
Expand All @@ -50,7 +53,7 @@ func HandleHeartbeatRequestWithSourceIP(sequenceNumber uint32, msg messages.Hear
heartbeatRequestWithSourceIPReceivedSequenceNumber = sequenceNumber
}

func HandleHeartbeatResponse(sequenceNumber uint32, msg messages.HeartbeatResponse) {
func HandleHeartbeatResponse(address net.Addr, sequenceNumber uint32, msg messages.HeartbeatResponse) {
heartbeatResponseMu.Lock()
defer heartbeatResponseMu.Unlock()
heartbeatResponsehandlerCalled = true
Expand Down Expand Up @@ -102,6 +105,12 @@ func HeartbeatRequest(t *testing.T) {
if heartbeatRequestReceivedSequenceNumber != sentSequenceNumber {
t.Errorf("Heartbeat request handler was called with wrong sequence number.\n- Sent sequence number: %v\n- Received sequence number %v\n", sentSequenceNumber, heartbeatRequestReceivedSequenceNumber)
}

remoteAddr := heartbeatRequestReceivedRemoteAddress.String()
if remoteAddr[:9] != "127.0.0.1" {
t.Errorf("Heartbeat request handler was called with wrong remote address.\n- Sent remote address: %v\n- Received remote address %v\n", "127.0.0.1", remoteAddr)
}

heartbeatRequestMu.Unlock()
}

Expand Down
5 changes: 3 additions & 2 deletions tests/pfcp_association_release_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"net"
"sync"
"testing"
"time"
Expand All @@ -26,15 +27,15 @@ var (
pfcpAssociationReleaseResponseReceivedCause ie.Cause
)

func HandlePFCPAssociationReleaseRequest(sequenceNumber uint32, msg messages.PFCPAssociationReleaseRequest) {
func HandlePFCPAssociationReleaseRequest(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationReleaseRequest) {
pfcpAssociationReleaseRequestMu.Lock()
defer pfcpAssociationReleaseRequestMu.Unlock()
pfcpAssociationReleaseRequesthandlerCalled = true
pfcpAssociationReleaseRequestReceivedSequenceNumber = sequenceNumber
pfcpAssociationReleaseRequestReceivedNodeID = msg.NodeID
}

func HandlePFCPAssociationReleaseResponse(sequenceNumber uint32, msg messages.PFCPAssociationReleaseResponse) {
func HandlePFCPAssociationReleaseResponse(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationReleaseResponse) {
pfcpAssociationReleaseResponseMu.Lock()
defer pfcpAssociationReleaseResponseMu.Unlock()
pfcpAssociationReleaseResponsehandlerCalled = true
Expand Down
5 changes: 3 additions & 2 deletions tests/pfcp_association_setup_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"net"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -29,7 +30,7 @@ var (
pfcpAssociationSetupResponseReceivedCause ie.Cause
)

func HandlePFCPAssociationSetupRequest(sequenceNumber uint32, msg messages.PFCPAssociationSetupRequest) {
func HandlePFCPAssociationSetupRequest(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationSetupRequest) {
pfcpAssociationSetupRequestMu.Lock()
defer pfcpAssociationSetupRequestMu.Unlock()
pfcpAssociationSetupRequesthandlerCalled = true
Expand All @@ -39,7 +40,7 @@ func HandlePFCPAssociationSetupRequest(sequenceNumber uint32, msg messages.PFCPA
pfcpAssociationSetupRequestReceivedUPFunctionFeatures = msg.UPFunctionFeatures
}

func HandlePFCPAssociationSetupResponse(sequenceNumber uint32, msg messages.PFCPAssociationSetupResponse) {
func HandlePFCPAssociationSetupResponse(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationSetupResponse) {
pfcpAssociationSetupResponseMu.Lock()
defer pfcpAssociationSetupResponseMu.Unlock()
pfcpAssociationSetupResponsehandlerCalled = true
Expand Down
5 changes: 3 additions & 2 deletions tests/pfcp_association_update_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"net"
"sync"
"testing"
"time"
Expand All @@ -26,15 +27,15 @@ var (
pfcpAssociationUpdateResponseReceivedCause ie.Cause
)

func HandlePFCPAssociationUpdateRequest(sequenceNumber uint32, msg messages.PFCPAssociationUpdateRequest) {
func HandlePFCPAssociationUpdateRequest(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationUpdateRequest) {
pfcpAssociationUpdateRequestMu.Lock()
defer pfcpAssociationUpdateRequestMu.Unlock()
pfcpAssociationUpdateRequesthandlerCalled = true
pfcpAssociationUpdateRequestReceivedSequenceNumber = sequenceNumber
pfcpAssociationUpdateRequestReceivedNodeID = msg.NodeID
}

func HandlePFCPAssociationUpdateResponse(sequenceNumber uint32, msg messages.PFCPAssociationUpdateResponse) {
func HandlePFCPAssociationUpdateResponse(address net.Addr, sequenceNumber uint32, msg messages.PFCPAssociationUpdateResponse) {
pfcpAssociationUpdateResponseMu.Lock()
defer pfcpAssociationUpdateResponseMu.Unlock()
pfcpAssociationUpdateResponsehandlerCalled = true
Expand Down
5 changes: 3 additions & 2 deletions tests/pfcp_node_report_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"net"
"sync"
"testing"
"time"
Expand All @@ -27,7 +28,7 @@ var (
pfcpNodeReportResponseReceivedCause ie.Cause
)

func HandlePFCPNodeReportRequest(sequenceNumber uint32, msg messages.PFCPNodeReportRequest) {
func HandlePFCPNodeReportRequest(address net.Addr, sequenceNumber uint32, msg messages.PFCPNodeReportRequest) {
pfcpNodeReportRequestMu.Lock()
defer pfcpNodeReportRequestMu.Unlock()
pfcpNodeReportRequesthandlerCalled = true
Expand All @@ -36,7 +37,7 @@ func HandlePFCPNodeReportRequest(sequenceNumber uint32, msg messages.PFCPNodeRep
pfcpNodeReportRequestReceivedNodeReportType = msg.NodeReportType
}

func HandlePFCPNodeReportResponse(sequenceNumber uint32, msg messages.PFCPNodeReportResponse) {
func HandlePFCPNodeReportResponse(address net.Addr, sequenceNumber uint32, msg messages.PFCPNodeReportResponse) {
pfcpNodeReportResponseMu.Lock()
defer pfcpNodeReportResponseMu.Unlock()
pfcpNodeReportResponsehandlerCalled = true
Expand Down
Loading

0 comments on commit a839158

Please sign in to comment.