From 9b071a04c1a6bde2fbb7451eadfd93bdd3c469a4 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Tue, 14 Jan 2025 18:37:26 +0100 Subject: [PATCH 1/3] [wip] Handover srv6 --- internal/app/rules-pusher.go | 99 +++++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/internal/app/rules-pusher.go b/internal/app/rules-pusher.go index 2e2a348..fbca8b5 100644 --- a/internal/app/rules-pusher.go +++ b/internal/app/rules-pusher.go @@ -24,6 +24,7 @@ import ( "github.com/sirupsen/logrus" "github.com/wmnsk/go-pfcp/ie" + "github.com/wmnsk/go-pfcp/message" ) const UserAgent = "go-github-nextmn-srv6-ctrl" @@ -209,8 +210,104 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error return nil } -func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcputil.MessageType, message pfcp_networking.ReceivedMessage, e *pfcp_networking.PFCPEntityUP) { +func (pusher *RulesPusher) pushHandover(ctx context.Context, ue string, handoverTo jsonapi.Fteid) { + //TODO Handover + +} + +func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcputil.MessageType, msg pfcp_networking.ReceivedMessage, e *pfcp_networking.PFCPEntityUP) { logrus.Debug("Into updateRoutersRules") + if msgType == message.MsgTypeSessionModificationRequest { + logrus.Debug("session modification request") + // check if handover + msgMod, ok := msg.Message.(*message.SessionModificationRequest) + if !ok { + logrus.Error("could not cast to sessionModifationRequest") + return + } + logrus.Debug("checking session modification request for handover") + if (len(msgMod.CreatePDR) == 0) && (len(msgMod.UpdatePDR) == 0) && (len(msgMod.CreateFAR) == 0) && (len(msgMod.UpdateFAR) == 1) { + // this is only a far update, so it is probably an handover… + updateFpIes, err := msgMod.UpdateFAR[0].UpdateForwardingParameters() + if err != nil { + logrus.WithError(err).Debug("No Update Forwarding parameters: not a valid handover") + return + } + updateFp := ie.NewUpdateForwardingParameters(updateFpIes...) + + if dest_interface, err := updateFp.DestinationInterface(); err != nil { + logrus.Debug("No destination interface: not a valid handover") + return + } else if dest_interface != ie.DstInterfaceAccess { + logrus.Debug("Destination interface is not access: not a valid handover") + return + } + farid, err := msgMod.UpdateFAR[0].FARID() + if err != nil { + logrus.Debug("No FARID: not a valid handover") + return + } + logrus.WithFields(logrus.Fields{ + "farid": farid, + }).Debug("handover detected") + + ohc, err := updateFp.OuterHeaderCreation() + if err != nil { + return + } + addr, ok := netip.AddrFromSlice(ohc.IPv4Address.To4()) + if !ok { + return + } + handoverTo := jsonapi.Fteid{ + Teid: ohc.TEID, + Addr: addr, + } + + handoverDone := false + // looking for a pdr with this farid to find ue ip address + for _, session := range e.GetPFCPSessions() { + if handoverDone { + break + } + s := make(chan struct{}) + go func() { // in a goroutine to trigger the defer + session.RLock() + defer session.RUnlock() + session.ForeachUnsortedPDR(func(pdr pfcpapi.PDRInterface) error { + id, err := pdr.FARID() + if err != nil { + // skip + return nil + } + if id != farid { + // skip + return nil + } + pdrid, err := pdr.ID() + if err != nil { + return nil + } + ue, err := pdr.UEIPAddress() + if err != nil { + return nil + } + logrus.WithFields(logrus.Fields{ + "farid": farid, + "pdrid": pdrid, + "ue": ue, + }).Debug("UE identified for handover") + pusher.pushHandover(ctx, ue.IPv4Address.String(), handoverTo) + handoverDone = true + return nil + }) + s <- struct{}{} + }() + <-s + } + return + } + } var wg0 sync.WaitGroup for _, session := range e.GetPFCPSessions() { logrus.Trace("In for loop…") From b3cdeea915e65b583283850d5a721b87d2867079 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 15 Jan 2025 14:20:55 +0100 Subject: [PATCH 2/3] Store pushed locations --- internal/app/rules-pusher.go | 57 ++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/internal/app/rules-pusher.go b/internal/app/rules-pusher.go index fbca8b5..0399b3f 100644 --- a/internal/app/rules-pusher.go +++ b/internal/app/rules-pusher.go @@ -12,6 +12,7 @@ import ( "fmt" "net/http" "net/netip" + "net/url" "sync" pfcp_networking "github.com/nextmn/go-pfcp-networking/pfcp" @@ -36,11 +37,18 @@ type RulesPusher struct { } type ueInfos struct { + sync.Mutex + UplinkFTeid jsonapi.Fteid DownlinkTeid uint32 Gnb string Pushed bool - sync.Mutex + + AnchorsRules []*url.URL + AnchorsLock sync.Mutex + + SRGWRules []*url.URL + SRGWLock sync.Mutex } func NewRulesPusher(config *config.CtrlConfig) *RulesPusher { @@ -51,32 +59,35 @@ func NewRulesPusher(config *config.CtrlConfig) *RulesPusher { } } -func (pusher *RulesPusher) pushSingleRule(ctx context.Context, client http.Client, uri jsonapi.ControlURI, data []byte) error { +func (pusher *RulesPusher) pushSingleRule(ctx context.Context, client http.Client, uri jsonapi.ControlURI, data []byte) (*url.URL, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.JoinPath("rules").String(), bytes.NewBuffer(data)) if err != nil { logrus.WithError(err).Error("could not create http request") - return err + return nil, err } req.Header.Add("User-Agent", UserAgent) req.Header.Set("Content-Type", "application/json; charset=UTF-8") resp, err := client.Do(req) if err != nil { logrus.WithError(err).Error("Could not push rules: server not responding") - return fmt.Errorf("Could not push rules: server not responding") + return nil, fmt.Errorf("Could not push rules: server not responding") } defer resp.Body.Close() if resp.StatusCode == 400 { logrus.WithError(err).Error("HTTP Bad Request") - return fmt.Errorf("HTTP Bad request") + return nil, fmt.Errorf("HTTP Bad request") } else if resp.StatusCode >= 500 { logrus.WithError(err).Error("HTTP internal error") - return fmt.Errorf("HTTP internal error") + return nil, fmt.Errorf("HTTP internal error") + } else if resp.StatusCode == 201 { + loc := resp.Header.Get("Location") + uloc, err := url.Parse(loc) + if err != nil { + return nil, err + } + return uri.ResolveReference(uloc), nil } - //else if resp.StatusCode == 201{ - //OK: store resource - //_ := resp.Header.Get("Location") - //} - return nil + return nil, fmt.Errorf("No Location provided") } func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error { @@ -148,7 +159,13 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error wg.Add(1) go func() error { defer wg.Done() - return pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json) + infos.SRGWLock.Lock() + defer infos.SRGWLock.Unlock() + url, err := pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json) + if err == nil { + infos.SRGWRules = append(infos.SRGWRules, url) + } + return err }() } @@ -202,7 +219,13 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error wg.Add(1) go func() error { defer wg.Done() - return pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json) + infos.AnchorsLock.Lock() + defer infos.AnchorsLock.Unlock() + url, err := pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json) + if err == nil { + infos.AnchorsRules = append(infos.AnchorsRules, url) + } + return err }() } @@ -295,7 +318,7 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu logrus.WithFields(logrus.Fields{ "farid": farid, "pdrid": pdrid, - "ue": ue, + "ue": ue.IPv4Address.String(), }).Debug("UE identified for handover") pusher.pushHandover(ctx, ue.IPv4Address.String(), handoverTo) handoverDone = true @@ -345,7 +368,9 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu return nil } if ue, loaded := pusher.ues.LoadOrStore(ue_ipv4, &ueInfos{ - UplinkFTeid: jsonapi.Fteid{Teid: fteid.TEID, Addr: addr}, + UplinkFTeid: jsonapi.Fteid{Teid: fteid.TEID, Addr: addr}, + AnchorsRules: make([]*url.URL, 0), + SRGWRules: make([]*url.URL, 0), }); loaded { logrus.WithFields(logrus.Fields{ "teid-uplink": fteid.TEID, @@ -380,6 +405,8 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu if ue, loaded := pusher.ues.LoadOrStore(ue_ipv4, &ueInfos{ DownlinkTeid: teid_downlink, Gnb: gnb_ipv4, + AnchorsRules: make([]*url.URL, 0), + SRGWRules: make([]*url.URL, 0), }); loaded { logrus.WithFields(logrus.Fields{ "gnb-ipv4": gnb_ipv4, From 74ff307e5fcf9b511a991abd2b4d6f27e20fabc9 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 15 Jan 2025 15:42:31 +0100 Subject: [PATCH 3/3] Update downlink --- internal/app/rules-pusher.go | 152 ++++++++++++++++++++++++++++++----- 1 file changed, 130 insertions(+), 22 deletions(-) diff --git a/internal/app/rules-pusher.go b/internal/app/rules-pusher.go index 0399b3f..683cf4a 100644 --- a/internal/app/rules-pusher.go +++ b/internal/app/rules-pusher.go @@ -36,6 +36,11 @@ type RulesPusher struct { ues sync.Map } +type RuleAction struct { + Url *url.URL + Action *n4tosrv6.Action + GtpDstPrefix netip.Prefix +} type ueInfos struct { sync.Mutex @@ -44,11 +49,11 @@ type ueInfos struct { Gnb string Pushed bool - AnchorsRules []*url.URL - AnchorsLock sync.Mutex + AnchorsRules []*RuleAction + AnchorsLock sync.RWMutex - SRGWRules []*url.URL - SRGWLock sync.Mutex + SRGWRules []*RuleAction + SRGWLock sync.RWMutex } func NewRulesPusher(config *config.CtrlConfig) *RulesPusher { @@ -59,6 +64,29 @@ func NewRulesPusher(config *config.CtrlConfig) *RulesPusher { } } +func (pusher *RulesPusher) pushUpdateAction(ctx context.Context, client http.Client, url *url.URL, data []byte) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url.String(), bytes.NewBuffer(data)) + if err != nil { + logrus.WithError(err).Error("could not create http request") + return err + } + req.Header.Add("User-Agent", UserAgent) + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + resp, err := client.Do(req) + if err != nil { + logrus.WithError(err).Error("Could not push update action: server not responding") + return fmt.Errorf("Could not push update action: server not responding") + } + defer resp.Body.Close() + if resp.StatusCode == 400 { + logrus.WithError(err).Error("HTTP Bad Request") + return fmt.Errorf("HTTP Bad request") + } else if resp.StatusCode >= 500 { + logrus.WithError(err).Error("HTTP internal error") + return fmt.Errorf("HTTP internal error") + } + return nil +} func (pusher *RulesPusher) pushSingleRule(ctx context.Context, client http.Client, uri jsonapi.ControlURI, data []byte) (*url.URL, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.JoinPath("rules").String(), bytes.NewBuffer(data)) if err != nil { @@ -92,6 +120,9 @@ func (pusher *RulesPusher) pushSingleRule(ctx context.Context, client http.Clien func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error { i, ok := pusher.ues.Load(ue_ip) + if !ok { + return fmt.Errorf("UE not in ue list") + } infos := i.(*ueInfos) infos.Lock() defer infos.Unlock() @@ -99,9 +130,6 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error return nil // already pushed, nothing to do } infos.Pushed = true - if !ok { - return fmt.Errorf("UE not in ue list") - } service_ip := "10.4.0.1" logrus.WithFields(logrus.Fields{ "ue-ip": ue_ip, @@ -134,6 +162,9 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error // if no area is defined, create a new-one with only this gnb area = []netip.Prefix{netip.PrefixFrom(gnb_addr, 32)} } + action := n4tosrv6.Action{ + SRH: *srh, + } rule := n4tosrv6.Rule{ Enabled: r.Enabled, Type: "uplink", @@ -147,9 +178,7 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error Dst: service_addr, }, }, - Action: n4tosrv6.Action{ - SRH: *srh, - }, + Action: action, } rule_json, err := json.Marshal(rule) if err != nil { @@ -163,7 +192,10 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error defer infos.SRGWLock.Unlock() url, err := pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json) if err == nil { - infos.SRGWRules = append(infos.SRGWRules, url) + infos.SRGWRules = append(infos.SRGWRules, &RuleAction{ + Url: url, + Action: &action, + }) } return err }() @@ -190,6 +222,8 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error if !ok { return fmt.Errorf("could not convert MGTP4IPv6Dst to netip.Addr") } + // note: in srv6, segment[n] is the first segment of the path, and segment[0] is the last segment in the path + // because Segment-Left is a pointer to the current segment and is decremented each SR-hop segList[0] = dstIp.String() srh, err := n4tosrv6.NewSRH(segList) @@ -199,6 +233,9 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error }).WithError(err).Error("Creation of SRH downlink failed") return err } + action := n4tosrv6.Action{ + SRH: *srh, + } rule := n4tosrv6.Rule{ Enabled: true, Type: "downlink", @@ -207,9 +244,7 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error Dst: ue_addr, }, }, - Action: n4tosrv6.Action{ - SRH: *srh, - }, + Action: action, } rule_json, err := json.Marshal(rule) if err != nil { @@ -223,18 +258,86 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error defer infos.AnchorsLock.Unlock() url, err := pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json) if err == nil { - infos.AnchorsRules = append(infos.AnchorsRules, url) + infos.AnchorsRules = append(infos.AnchorsRules, &RuleAction{ + Url: url, + Action: &action, + GtpDstPrefix: prefix, + }) } return err }() } wg.Wait() + pusher.ues.Store(ue_ip, infos) + return nil } -func (pusher *RulesPusher) pushHandover(ctx context.Context, ue string, handoverTo jsonapi.Fteid) { - //TODO Handover +func (pusher *RulesPusher) pushHandover(ctx context.Context, ue_ip string, handoverTo jsonapi.Fteid) error { + i, ok := pusher.ues.Load(ue_ip) + if !ok { + return fmt.Errorf("UE not in ue list") + } + infos := i.(*ueInfos) + infos.Lock() + defer infos.Unlock() + + client := http.Client{} + var wg sync.WaitGroup + + infos.AnchorsLock.RLock() + defer infos.AnchorsLock.RUnlock() + + logrus.WithFields(logrus.Fields{ + "nb-uplink": len(infos.AnchorsRules), + "ue-ip": ue_ip, + "handover-to-addr": handoverTo.Addr, + "handover-to-teid": handoverTo.Teid, + }).Debug("Pushing new downlink rules for handover") + + for _, r := range infos.AnchorsRules { + dst := encoding.NewMGTP4IPv6Dst(r.GtpDstPrefix, handoverTo.Addr.As4(), encoding.NewArgsMobSession(0, false, false, handoverTo.Teid)) + dstB, err := dst.Marshal() + if err != nil { + return err + } + dstIp, ok := netip.AddrFromSlice(dstB) + if !ok { + return fmt.Errorf("could not convert MGTP4IPv6Dst to netip.Addr") + } + seg0, err := n4tosrv6.NewSegment(dstIp.String()) + if err != nil { + return err + } + // note: in srv6, segment[n] is the first segment of the path, and segment[0] is the last segment in the path + // because Segment-Left is a pointer to the current segment and is decremented each SR-hop + r.Action.SRH[0] = seg0 + + action_json, err := json.Marshal(r.Action) + if err != nil { + logrus.WithError(err).Error("Could not marshal json") + return err + } + wg.Add(1) + go func() error { + defer wg.Done() + err := pusher.pushUpdateAction(ctx, client, r.Url.JoinPath("update-action"), action_json) + if err != nil { + logrus.WithError(err).Error("Could not push update action") + } else { + logrus.WithFields(logrus.Fields{ + "path": r.Url.JoinPath("update-action").String(), + }).Debug("pushed update action") + } + return err + }() + } + wg.Wait() + infos.Gnb = handoverTo.Addr.String() + infos.DownlinkTeid = handoverTo.Teid + pusher.ues.Store(ue_ip, infos) + return nil } @@ -320,7 +423,12 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu "pdrid": pdrid, "ue": ue.IPv4Address.String(), }).Debug("UE identified for handover") - pusher.pushHandover(ctx, ue.IPv4Address.String(), handoverTo) + go func() { + err := pusher.pushHandover(ctx, ue.IPv4Address.String(), handoverTo) + if err != nil { + logrus.WithError(err).Error("Could not push handover rule") + } + }() handoverDone = true return nil }) @@ -369,8 +477,8 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu } if ue, loaded := pusher.ues.LoadOrStore(ue_ipv4, &ueInfos{ UplinkFTeid: jsonapi.Fteid{Teid: fteid.TEID, Addr: addr}, - AnchorsRules: make([]*url.URL, 0), - SRGWRules: make([]*url.URL, 0), + AnchorsRules: make([]*RuleAction, 0), + SRGWRules: make([]*RuleAction, 0), }); loaded { logrus.WithFields(logrus.Fields{ "teid-uplink": fteid.TEID, @@ -405,8 +513,8 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu if ue, loaded := pusher.ues.LoadOrStore(ue_ipv4, &ueInfos{ DownlinkTeid: teid_downlink, Gnb: gnb_ipv4, - AnchorsRules: make([]*url.URL, 0), - SRGWRules: make([]*url.URL, 0), + AnchorsRules: make([]*RuleAction, 0), + SRGWRules: make([]*RuleAction, 0), }); loaded { logrus.WithFields(logrus.Fields{ "gnb-ipv4": gnb_ipv4,