Skip to content

Commit

Permalink
Update downlink
Browse files Browse the repository at this point in the history
  • Loading branch information
louisroyer committed Jan 15, 2025
1 parent b3cdeea commit 74ff307
Showing 1 changed file with 130 additions and 22 deletions.
152 changes: 130 additions & 22 deletions internal/app/rules-pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type RulesPusher struct {
ues sync.Map
}

type RuleAction struct {
Url *url.URL
Action *n4tosrv6.Action
GtpDstPrefix netip.Prefix
}
type ueInfos struct {
sync.Mutex

Expand All @@ -44,11 +49,11 @@ type ueInfos struct {
Gnb string
Pushed bool

AnchorsRules []*url.URL
AnchorsLock sync.Mutex
AnchorsRules []*RuleAction
AnchorsLock sync.RWMutex

SRGWRules []*url.URL
SRGWLock sync.Mutex
SRGWRules []*RuleAction
SRGWLock sync.RWMutex
}

func NewRulesPusher(config *config.CtrlConfig) *RulesPusher {
Expand All @@ -59,6 +64,29 @@ func NewRulesPusher(config *config.CtrlConfig) *RulesPusher {
}
}

func (pusher *RulesPusher) pushUpdateAction(ctx context.Context, client http.Client, url *url.URL, data []byte) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url.String(), bytes.NewBuffer(data))
if err != nil {
logrus.WithError(err).Error("could not create http request")
return err
}
req.Header.Add("User-Agent", UserAgent)
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
resp, err := client.Do(req)
if err != nil {
logrus.WithError(err).Error("Could not push update action: server not responding")
return fmt.Errorf("Could not push update action: server not responding")
}
defer resp.Body.Close()
if resp.StatusCode == 400 {
logrus.WithError(err).Error("HTTP Bad Request")
return fmt.Errorf("HTTP Bad request")
} else if resp.StatusCode >= 500 {
logrus.WithError(err).Error("HTTP internal error")
return fmt.Errorf("HTTP internal error")
}
return nil
}
func (pusher *RulesPusher) pushSingleRule(ctx context.Context, client http.Client, uri jsonapi.ControlURI, data []byte) (*url.URL, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.JoinPath("rules").String(), bytes.NewBuffer(data))
if err != nil {
Expand Down Expand Up @@ -92,16 +120,16 @@ func (pusher *RulesPusher) pushSingleRule(ctx context.Context, client http.Clien

func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error {
i, ok := pusher.ues.Load(ue_ip)
if !ok {
return fmt.Errorf("UE not in ue list")
}
infos := i.(*ueInfos)
infos.Lock()
defer infos.Unlock()
if infos.Pushed {
return nil // already pushed, nothing to do
}
infos.Pushed = true
if !ok {
return fmt.Errorf("UE not in ue list")
}
service_ip := "10.4.0.1"
logrus.WithFields(logrus.Fields{
"ue-ip": ue_ip,
Expand Down Expand Up @@ -134,6 +162,9 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
// if no area is defined, create a new-one with only this gnb
area = []netip.Prefix{netip.PrefixFrom(gnb_addr, 32)}
}
action := n4tosrv6.Action{
SRH: *srh,
}
rule := n4tosrv6.Rule{
Enabled: r.Enabled,
Type: "uplink",
Expand All @@ -147,9 +178,7 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
Dst: service_addr,
},
},
Action: n4tosrv6.Action{
SRH: *srh,
},
Action: action,
}
rule_json, err := json.Marshal(rule)
if err != nil {
Expand All @@ -163,7 +192,10 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
defer infos.SRGWLock.Unlock()
url, err := pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json)
if err == nil {
infos.SRGWRules = append(infos.SRGWRules, url)
infos.SRGWRules = append(infos.SRGWRules, &RuleAction{
Url: url,
Action: &action,
})
}
return err
}()
Expand All @@ -190,6 +222,8 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
if !ok {
return fmt.Errorf("could not convert MGTP4IPv6Dst to netip.Addr")
}
// note: in srv6, segment[n] is the first segment of the path, and segment[0] is the last segment in the path
// because Segment-Left is a pointer to the current segment and is decremented each SR-hop
segList[0] = dstIp.String()

srh, err := n4tosrv6.NewSRH(segList)
Expand All @@ -199,6 +233,9 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
}).WithError(err).Error("Creation of SRH downlink failed")
return err
}
action := n4tosrv6.Action{
SRH: *srh,
}
rule := n4tosrv6.Rule{
Enabled: true,
Type: "downlink",
Expand All @@ -207,9 +244,7 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
Dst: ue_addr,
},
},
Action: n4tosrv6.Action{
SRH: *srh,
},
Action: action,
}
rule_json, err := json.Marshal(rule)
if err != nil {
Expand All @@ -223,18 +258,86 @@ func (pusher *RulesPusher) pushRTRRule(ctx context.Context, ue_ip string) error
defer infos.AnchorsLock.Unlock()
url, err := pusher.pushSingleRule(ctx, client, r.ControlURI, rule_json)
if err == nil {
infos.AnchorsRules = append(infos.AnchorsRules, url)
infos.AnchorsRules = append(infos.AnchorsRules, &RuleAction{
Url: url,
Action: &action,
GtpDstPrefix: prefix,
})
}
return err
}()

}
wg.Wait()
pusher.ues.Store(ue_ip, infos)

return nil
}

func (pusher *RulesPusher) pushHandover(ctx context.Context, ue string, handoverTo jsonapi.Fteid) {
//TODO Handover
func (pusher *RulesPusher) pushHandover(ctx context.Context, ue_ip string, handoverTo jsonapi.Fteid) error {
i, ok := pusher.ues.Load(ue_ip)
if !ok {
return fmt.Errorf("UE not in ue list")
}
infos := i.(*ueInfos)
infos.Lock()
defer infos.Unlock()

client := http.Client{}
var wg sync.WaitGroup

infos.AnchorsLock.RLock()
defer infos.AnchorsLock.RUnlock()

logrus.WithFields(logrus.Fields{
"nb-uplink": len(infos.AnchorsRules),
"ue-ip": ue_ip,
"handover-to-addr": handoverTo.Addr,
"handover-to-teid": handoverTo.Teid,
}).Debug("Pushing new downlink rules for handover")

for _, r := range infos.AnchorsRules {
dst := encoding.NewMGTP4IPv6Dst(r.GtpDstPrefix, handoverTo.Addr.As4(), encoding.NewArgsMobSession(0, false, false, handoverTo.Teid))
dstB, err := dst.Marshal()
if err != nil {
return err
}
dstIp, ok := netip.AddrFromSlice(dstB)
if !ok {
return fmt.Errorf("could not convert MGTP4IPv6Dst to netip.Addr")
}
seg0, err := n4tosrv6.NewSegment(dstIp.String())
if err != nil {
return err
}
// note: in srv6, segment[n] is the first segment of the path, and segment[0] is the last segment in the path
// because Segment-Left is a pointer to the current segment and is decremented each SR-hop
r.Action.SRH[0] = seg0

action_json, err := json.Marshal(r.Action)
if err != nil {
logrus.WithError(err).Error("Could not marshal json")
return err
}
wg.Add(1)
go func() error {
defer wg.Done()
err := pusher.pushUpdateAction(ctx, client, r.Url.JoinPath("update-action"), action_json)
if err != nil {
logrus.WithError(err).Error("Could not push update action")
} else {
logrus.WithFields(logrus.Fields{
"path": r.Url.JoinPath("update-action").String(),
}).Debug("pushed update action")
}
return err
}()
}
wg.Wait()
infos.Gnb = handoverTo.Addr.String()
infos.DownlinkTeid = handoverTo.Teid
pusher.ues.Store(ue_ip, infos)
return nil

}

Expand Down Expand Up @@ -320,7 +423,12 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu
"pdrid": pdrid,
"ue": ue.IPv4Address.String(),
}).Debug("UE identified for handover")
pusher.pushHandover(ctx, ue.IPv4Address.String(), handoverTo)
go func() {
err := pusher.pushHandover(ctx, ue.IPv4Address.String(), handoverTo)
if err != nil {
logrus.WithError(err).Error("Could not push handover rule")
}
}()
handoverDone = true
return nil
})
Expand Down Expand Up @@ -369,8 +477,8 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu
}
if ue, loaded := pusher.ues.LoadOrStore(ue_ipv4, &ueInfos{
UplinkFTeid: jsonapi.Fteid{Teid: fteid.TEID, Addr: addr},
AnchorsRules: make([]*url.URL, 0),
SRGWRules: make([]*url.URL, 0),
AnchorsRules: make([]*RuleAction, 0),
SRGWRules: make([]*RuleAction, 0),
}); loaded {
logrus.WithFields(logrus.Fields{
"teid-uplink": fteid.TEID,
Expand Down Expand Up @@ -405,8 +513,8 @@ func (pusher *RulesPusher) updateRoutersRules(ctx context.Context, msgType pfcpu
if ue, loaded := pusher.ues.LoadOrStore(ue_ipv4, &ueInfos{
DownlinkTeid: teid_downlink,
Gnb: gnb_ipv4,
AnchorsRules: make([]*url.URL, 0),
SRGWRules: make([]*url.URL, 0),
AnchorsRules: make([]*RuleAction, 0),
SRGWRules: make([]*RuleAction, 0),
}); loaded {
logrus.WithFields(logrus.Fields{
"gnb-ipv4": gnb_ipv4,
Expand Down

0 comments on commit 74ff307

Please sign in to comment.