Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Probe: Better mapping of NATted connections #3451

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,19 @@ func newConnectionTracker(conf ReporterConfig) connectionTracker {
}

func flowToTuple(f conntrack.Conn) (ft fourTuple) {
ft = fourTuple{
f.Orig.Src.String(),
f.Orig.Dst.String(),
uint16(f.Orig.SrcPort),
uint16(f.Orig.DstPort),
}
// Handle DNAT-ed connections in the initial state
if !f.Orig.Dst.Equal(f.Reply.Src) {
if (f.Status & conntrack.IPS_DST_NAT) == 0 {
ft = fourTuple{
f.Orig.Src.String(),
f.Orig.Dst.String(),
uint16(f.Orig.SrcPort),
uint16(f.Orig.DstPort),
}
} else {
// Handle DNAT-ed connections in the initial state
ft = fourTuple{
f.Reply.Dst.String(),
f.Orig.Src.String(),
f.Reply.Src.String(),
uint16(f.Reply.DstPort),
uint16(f.Orig.SrcPort),
uint16(f.Reply.SrcPort),
}
}
Expand Down
121 changes: 89 additions & 32 deletions probe/endpoint/nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,102 @@ func makeNATMapper(fw flowWalker) natMapper {
return natMapper{fw}
}

func toMapping(f conntrack.Conn) *endpointMapping {
var mapping endpointMapping
if f.Orig.Src.Equal(f.Reply.Dst) {
mapping = endpointMapping{
originalIP: f.Reply.Src,
originalPort: f.Reply.SrcPort,
rewrittenIP: f.Orig.Dst,
rewrittenPort: f.Orig.DstPort,
}
} else {
mapping = endpointMapping{
originalIP: f.Orig.Src,
originalPort: f.Orig.SrcPort,
rewrittenIP: f.Reply.Dst,
rewrittenPort: f.Reply.DstPort,
}
}

return &mapping
func endpointNodeID(scope string, ip net.IP, port uint16) string {
return report.MakeEndpointNodeID(scope, "", ip.String(), strconv.Itoa(int(port)))
}

// applyNAT duplicates Nodes in the endpoint topology of a report, based on
/*

Some examples of connections with NAT:

Pod to pod via Kubernetes service
picked up by ebpf as 10.32.0.16:47600->10.105.173.176:5432 and 10.32.0.6:5432 (??)
NAT IPS_DST_NAT orig: 10.32.0.16:47600->10.105.173.176:5432, reply: 10.32.0.6:5432->10.32.0.16:47600
We want: 10.32.0.16:47600->10.32.0.6:5432
- replace the destination (== NAT orig dst) with the NAT reply source

Incoming from outside the cluster to a NodePort:
picked up by ebpf as 10.32.0.1:13488->10.32.0.7:80
NAT: IPS_SRC_NAT IPS_DST_NAT orig: 37.157.33.76:13488->172.31.2.17:30081, reply: 10.32.0.7:80->10.32.0.1:13488
We want: 37.157.33.76:13488->10.32.0.7:80
- replace the source (== NAT reply dst) with the NAT original source

Outgoing from a pod:
picked up by ebpf as 10.32.0.7:36078->18.221.99.178:443
NAT: IPS_SRC_NAT orig: 10.32.0.7:36078->18.221.99.178:443, reply: 18.221.99.178:443->172.31.2.17:36078
We want: 10.32.0.7:36078->18.221.99.178:443
- leave it alone.

Docker container exposing port to similar on different host
host1:
picked up by ebpf as ip-172-31-5-80;172.17.0.2:43042->172.31.2.17:8080
NAT: IPS_SRC_NAT orig: 172.17.0.2:43042->172.31.2.17:8080, reply: 172.31.2.17:8080-> 172.31.5.80:43042
applying standard rule: ip-172-31-5-80;172.17.0.2:43042->172.31.2.17:8080 (i.e. no change)
we could add 172.31.5.80:43042 (nat reply destination) as a copy of ip-172-31-5-80;172.17.0.2:43042 (nat orig source)
host2:
picked up by ebpf as 172.31.5.80:43042->ip-172-31-2-17;172.17.0.2:80
NAT: IPS_DST_NAT orig: 172.31.5.80:43042->172.31.2.17:8080, reply: 172.17.0.2:80->172.31.5.80:43042
Ideally we might want: ip-172-31-5-80;172.17.0.2:43042->ip-172-31-2-17;172.17.0.2:80
applying standard rule: 172.31.5.80:43042->ip-172-31-2-17;172.17.0.2:80 (i.e. no change)
we could add 172.31.2.17:8080 (nat original destination) as a copy of ip-172-31-2-17;172.17.0.2:80 (nat reply source)

All of the above can be satisfied by these rules:
For SRC_NAT either add NAT orig source as a copy of NAT reply destination
or add NAT reply destination as a copy of NAT original source
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we not need to replace adjacencies here?

For DST_NAT replace the destination in adjacencies with the NAT reply source
and add nat original destination as a copy of nat reply source
*/

// applyNAT modifies Nodes in the endpoint topology of a report, based on
// the NAT table.
func (n natMapper) applyNAT(rpt report.Report, scope string) {
n.flowWalker.walkFlows(func(f conntrack.Conn, _ bool) {
mapping := toMapping(f)
replyDstID := endpointNodeID(scope, f.Reply.Dst, f.Reply.DstPort)
origSrcID := endpointNodeID(scope, f.Orig.Src, f.Orig.SrcPort)

realEndpointPort := strconv.Itoa(int(mapping.originalPort))
copyEndpointPort := strconv.Itoa(int(mapping.rewrittenPort))
realEndpointID := report.MakeEndpointNodeID(scope, "", mapping.originalIP.String(), realEndpointPort)
copyEndpointID := report.MakeEndpointNodeID(scope, "", mapping.rewrittenIP.String(), copyEndpointPort)

node, ok := rpt.Endpoint.Nodes[realEndpointID]
if !ok {
return
if (f.Status & conntrack.IPS_SRC_NAT) != 0 {
if replyDstID != origSrcID {
// either add NAT orig source as a copy of NAT reply destination
if replyDstNode, ok := rpt.Endpoint.Nodes[replyDstID]; ok {
newNode := replyDstNode.WithID(origSrcID).WithLatests(map[string]string{
CopyOf: replyDstID,
})
rpt.Endpoint.AddNode(newNode)
} else if origSrcNode, ok := rpt.Endpoint.Nodes[origSrcID]; ok {
// or add NAT reply destination as a copy of NAT original source
newNode := origSrcNode.WithID(replyDstID).WithLatests(map[string]string{
CopyOf: origSrcID,
})
rpt.Endpoint.AddNode(newNode)
}
}
}

rpt.Endpoint.AddNode(node.WithID(copyEndpointID).WithLatests(map[string]string{
CopyOf: realEndpointID,
}))
if (f.Status & conntrack.IPS_DST_NAT) != 0 {
fromID := endpointNodeID(scope, f.Reply.Dst, f.Reply.DstPort)
fromNode, ok := rpt.Endpoint.Nodes[fromID]
if !ok {
return
}
toID := endpointNodeID(scope, f.Orig.Dst, f.Orig.DstPort)

// replace destination with reply source
replySrcID := endpointNodeID(scope, f.Reply.Src, f.Reply.SrcPort)
if replySrcID != toID {
fromNode.Adjacency = fromNode.Adjacency.Minus(toID)
fromNode = fromNode.WithAdjacent(replySrcID)

// add nat original destination as a copy of nat reply source
replySrcNode, ok := rpt.Endpoint.Nodes[replySrcID]
if !ok {
replySrcNode = report.MakeNode(replySrcID)
}
newNode := replySrcNode.WithID(toID).WithLatests(map[string]string{
CopyOf: replySrcID,
})
rpt.Endpoint.AddNode(newNode)
}

}
})
}
32 changes: 20 additions & 12 deletions probe/endpoint/nat_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@ func TestNat(t *testing.T) {
{
f := conntrack.Conn{
MsgType: conntrack.NfctMsgUpdate,
Status: conntrack.IPS_DST_NAT,
Orig: conntrack.Tuple{
Src: host2,
Dst: host1,
SrcPort: 22222,
SrcPort: 22223,
DstPort: 80,
Proto: syscall.IPPROTO_TCP,
},
Reply: conntrack.Tuple{
Src: c1,
Dst: host2,
SrcPort: 80,
DstPort: 22222,
DstPort: 22223,
Proto: syscall.IPPROTO_TCP,
},
CtId: 1,
Expand All @@ -70,15 +71,18 @@ func TestNat(t *testing.T) {

have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host1", "", "10.0.47.1", "80")
have.Endpoint.AddNode(report.MakeNodeWith(originalID, map[string]string{
originalNode := report.MakeNodeWith(originalID, map[string]string{
"foo": "bar",
}))
})
have.Endpoint.AddNode(originalNode)
fromID := report.MakeEndpointNodeID("host2", "", "2.3.4.5", "22223")
have.Endpoint.AddNode(report.MakeNodeWith(fromID, nil).WithAdjacent(originalID))

want := have.Copy()
wantID := report.MakeEndpointNodeID("host1", "", "1.2.3.4", "80")
want.Endpoint.AddNode(report.MakeNodeWith(wantID, map[string]string{
// add nat original destination as a copy of nat reply source
origDstID := report.MakeEndpointNodeID("host1", "", "1.2.3.4", "80")
want.Endpoint.AddNode(originalNode.WithID(origDstID).WithLatests(map[string]string{
CopyOf: originalID,
"foo": "bar",
}))

makeNATMapper(ct).applyNAT(have, "host1")
Expand All @@ -91,6 +95,7 @@ func TestNat(t *testing.T) {
{
f := conntrack.Conn{
MsgType: conntrack.NfctMsgUpdate,
Status: conntrack.IPS_SRC_NAT,
Orig: conntrack.Tuple{
Src: c2,
Dst: host1,
Expand All @@ -112,16 +117,19 @@ func TestNat(t *testing.T) {
}

have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host2", "", "10.0.47.2", "22222")
have.Endpoint.AddNode(report.MakeNodeWith(originalID, map[string]string{
fromID := report.MakeEndpointNodeID("host2", "", "10.0.47.2", "22222")
toID := report.MakeEndpointNodeID("host1", "", "1.2.3.4", "80")
have.Endpoint.AddNode(report.MakeNodeWith(toID, nil))
have.Endpoint.AddNode(report.MakeNodeWith(fromID, map[string]string{
"foo": "baz",
}))
}).WithAdjacent(toID))

// add NAT reply destination as a copy of NAT original source
want := have.Copy()
want.Endpoint.AddNode(report.MakeNodeWith(report.MakeEndpointNodeID("host2", "", "2.3.4.5", "22223"), map[string]string{
CopyOf: originalID,
CopyOf: report.MakeEndpointNodeID("host1", "", "10.0.47.2", "22222"),
"foo": "baz",
}))
}).WithAdjacent(toID))

makeNATMapper(ct).applyNAT(have, "host1")
if !reflect.DeepEqual(want, have) {
Expand Down
4 changes: 1 addition & 3 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}

if flags.dockerEnabled {
// Don't add the bridge in Kubernetes since container IPs are global and
// shouldn't be scoped
if flags.dockerBridge != "" && !flags.kubernetesEnabled {
if flags.dockerBridge != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an improvement for when we have "unmanaged" containers running on a k8s host. However, it will also result in

  1. spurious error messages when docker isn't running on a k8s host (as will happen by default on some future k8s releases)

  2. incorrect treatment of the docker bridge as local if it is in fact the bridge used by k8s

We could avoid the first one by only erroring when the bridge name has been specified on the command line (rather than left to the default 'docker0' value).

Is the 2nd problem real?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't think it is practical to use the docker0 bridge as the Kubernetes bridge nowadays except on a single-node cluster.
    I think Flannel used to connect one machine's Docker bridge to another, but nowadays that is driven via CNI with its own bridge. And Docker's own overlay networking will create a different bridge for each network.

if err := report.AddLocalBridge(flags.dockerBridge); err != nil {
log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err)
}
Expand Down
5 changes: 5 additions & 0 deletions report/id_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func (a IDList) Contains(id string) bool {
func (a IDList) Intersection(b IDList) IDList {
return IDList(StringSet(a).Intersection(StringSet(b)))
}

// Minus returns the set with id removed
func (a IDList) Minus(id string) IDList {
return IDList(StringSet(a).Minus(id))
}
9 changes: 9 additions & 0 deletions report/string_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ func (s StringSet) Intersection(b StringSet) StringSet {
return result
}

// Minus returns the set with str removed
func (s StringSet) Minus(str string) StringSet {
i := sort.Search(len(s), func(i int) bool { return s[i] >= str })
if i < len(s) && s[i] == str {
return append(s[:i], s[i+1:]...)
}
return s
}

// Equal returns true if a and b have the same contents
func (s StringSet) Equal(b StringSet) bool {
if len(s) != len(b) {
Expand Down