From 8037a7a379ed8a676e6010cfa0c4534cfdc3eceb Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Mon, 25 Oct 2021 16:55:34 +0700 Subject: [PATCH 1/6] Rework memif to use abstract sockets Signed-off-by: Vladimir Popov --- go.mod | 2 + pkg/networkservice/chains/forwarder/server.go | 9 +- pkg/networkservice/mechanisms/memif/client.go | 59 +++---- .../mechanisms/memif/client_test.go | 2 +- pkg/networkservice/mechanisms/memif/common.go | 121 ++++++++++---- .../mechanisms/memif/memifproxy/client.go | 123 -------------- .../mechanisms/memif/memifproxy/metadata.go | 47 ++++-- .../memif/memifproxy/proxy_listener.go | 108 ------------ .../mechanisms/memif/memifproxy/server.go | 120 +++++++++++++ .../memif/{metatdata.go => metadata.go} | 24 +-- pkg/networkservice/mechanisms/memif/option.go | 18 +- pkg/networkservice/mechanisms/memif/server.go | 65 +++---- .../proxy}/proxy_connection.go | 11 +- pkg/tools/proxy/proxy_listener.go | 158 ++++++++++++++++++ pkg/tools/proxy/proxy_test.go | 23 +++ pkg/tools/proxy/utils.go | 60 +++++++ 16 files changed, 583 insertions(+), 367 deletions(-) delete mode 100644 pkg/networkservice/mechanisms/memif/memifproxy/client.go delete mode 100644 pkg/networkservice/mechanisms/memif/memifproxy/proxy_listener.go create mode 100644 pkg/networkservice/mechanisms/memif/memifproxy/server.go rename pkg/networkservice/mechanisms/memif/{metatdata.go => metadata.go} (66%) rename pkg/{networkservice/mechanisms/memif/memifproxy => tools/proxy}/proxy_connection.go (94%) create mode 100644 pkg/tools/proxy/proxy_listener.go create mode 100644 pkg/tools/proxy/proxy_test.go create mode 100644 pkg/tools/proxy/utils.go diff --git a/go.mod b/go.mod index fe85cbf5..768b8b95 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/stretchr/testify v1.7.0 github.com/thanhpk/randstr v1.0.4 github.com/vishvananda/netlink v1.1.0 + github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae + golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b google.golang.org/grpc v1.35.0 google.golang.org/protobuf v1.25.0 diff --git a/pkg/networkservice/chains/forwarder/server.go b/pkg/networkservice/chains/forwarder/server.go index 887f15e7..60b4b077 100644 --- a/pkg/networkservice/chains/forwarder/server.go +++ b/pkg/networkservice/chains/forwarder/server.go @@ -63,6 +63,8 @@ import ( // Connection aggregates the api.Connection and api.ChannelProvider interfaces type Connection interface { + IsExternal() bool + api.Connection api.ChannelProvider } @@ -93,7 +95,9 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw tag.NewServer(ctx, vppConn), mtu.NewServer(vppConn), mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ - memif.MECHANISM: memif.NewServer(vppConn, memif.WithDirectMemif()), + memif.MECHANISM: memif.NewServer(ctx, vppConn, + memif.WithDirectMemif(), + memif.WithChangeNetNS()), kernel.MECHANISM: kernel.NewServer(vppConn), vxlan.MECHANISM: vxlan.NewServer(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)), wireguard.MECHANISM: wireguard.NewServer(vppConn, tunnelIP), @@ -113,7 +117,8 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw mtu.NewClient(vppConn), tag.NewClient(ctx, vppConn), // mechanisms - memif.NewClient(vppConn), + memif.NewClient(vppConn, + memif.WithChangeNetNS()), kernel.NewClient(vppConn), vxlan.NewClient(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)), wireguard.NewClient(vppConn, tunnelIP), diff --git a/pkg/networkservice/mechanisms/memif/client.go b/pkg/networkservice/mechanisms/memif/client.go index b127f93e..880a1168 100644 --- a/pkg/networkservice/mechanisms/memif/client.go +++ b/pkg/networkservice/mechanisms/memif/client.go @@ -16,51 +16,44 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows +//+build linux package memif import ( "context" - "git.fd.io/govpp.git/api" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "google.golang.org/grpc" - "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/memif/memifproxy" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/switchcase" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" - "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" - + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/postpone" + + "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/memif/memifproxy" ) type memifClient struct { - vppConn api.Connection + vppConn Connection + changeNetNs bool } // NewClient provides a NetworkServiceClient chain elements that support the memif Mechanism -func NewClient(vppConn api.Connection) networkservice.NetworkServiceClient { - m := &memifClient{ - vppConn: vppConn, +func NewClient(vppConn Connection, options ...Option) networkservice.NetworkServiceClient { + opts := &memifOptions{} + for _, o := range options { + o(opts) } return chain.NewNetworkServiceClient( - m, - switchcase.NewClient(&switchcase.ClientCase{ - Condition: func(ctx context.Context, conn *networkservice.Connection) bool { - _, ok := loadDirectMemifInfo(ctx) - return !ok - }, - Client: memifproxy.New(), - }), + &memifClient{ + vppConn: vppConn, + changeNetNs: opts.changeNetNS, + }, ) } @@ -75,11 +68,11 @@ func mechanismsContain(list []*networkservice.Mechanism, t string) bool { func (m *memifClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { if !mechanismsContain(request.MechanismPreferences, memif.MECHANISM) { - request.MechanismPreferences = append(request.MechanismPreferences, &networkservice.Mechanism{ - Cls: cls.LOCAL, - Type: memif.MECHANISM, - Parameters: make(map[string]string), - }) + var u string + if !m.changeNetNs { + u = netNSURL + } + request.MechanismPreferences = append(request.MechanismPreferences, memif.New(u)) } postponeCtxFunc := postpone.ContextWithValues(ctx) @@ -89,14 +82,16 @@ func (m *memifClient) Request(ctx context.Context, request *networkservice.Netwo return nil, err } - // if direct memif enabled save socket filename to metadata - _, ok := loadDirectMemifInfo(ctx) - if mechanism := memif.ToMechanism(conn.GetMechanism()); mechanism != nil && ok { - storeDirectMemifInfo(ctx, directMemifInfo{socketURL: mechanism.GetSocketFileURL()}) - return conn, nil + // If direct memif case store mechanism to metadata and return. + if info, ok := memifproxy.LoadInfo(ctx); ok { + if mechanism := memif.ToMechanism(conn.GetMechanism()); mechanism != nil && ok { + info.NSURL = mechanism.GetNetNSURL() + info.SocketFile = mechanism.GetSocketFilename() + return conn, nil + } } - if err := create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil { + if err = create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil { closeCtx, cancelClose := postponeCtxFunc() defer cancelClose() diff --git a/pkg/networkservice/mechanisms/memif/client_test.go b/pkg/networkservice/mechanisms/memif/client_test.go index 73eae847..a2c52d81 100644 --- a/pkg/networkservice/mechanisms/memif/client_test.go +++ b/pkg/networkservice/mechanisms/memif/client_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows +// +build linux package memif_test diff --git a/pkg/networkservice/mechanisms/memif/common.go b/pkg/networkservice/mechanisms/memif/common.go index 109ec265..71e5181a 100644 --- a/pkg/networkservice/mechanisms/memif/common.go +++ b/pkg/networkservice/mechanisms/memif/common.go @@ -14,14 +14,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +//+build linux + package memif import ( "context" + "fmt" "net/url" "os" "path/filepath" - "sync/atomic" + "runtime" "time" "git.fd.io/govpp.git/api" @@ -33,42 +36,75 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice/payload" "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/pkg/errors" + "github.com/vishvananda/netns" + "golang.org/x/sys/unix" "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/up" "github.com/networkservicemesh/sdk-vpp/pkg/tools/ifindex" ) -var lastSocketID uint32 +// Connection is an api.Connection with IsExternal method +type Connection interface { + IsExternal() bool + + api.Connection +} + +var ( + netNS netns.NsHandle + netNSURL string +) + +// nolint:gochecknoinits +func init() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() -func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn api.Connection, isClient bool) (socketID uint32, err error) { - // Extract the socket filename - u, err := url.Parse(mechanism.GetSocketFileURL()) + fd, err := unix.Open("/proc/thread-self/ns/net", unix.O_RDONLY|unix.O_CLOEXEC, 0) if err != nil { - return 0, errors.Wrapf(err, "not a valid url %q", mechanism.GetSocketFileURL()) + panic("failed to open '/proc/thread-self/ns/net': " + err.Error()) } - if u.Scheme != memifMech.FileScheme { - return 0, errors.Errorf("socket file url must have scheme %q, actual %q", memifMech.FileScheme, u.Scheme) + path := fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd) + + if netNS, err = netns.GetFromPath(path); err != nil { + panic("failed to get current net NS: " + err.Error()) } - // Create the socketID - socketID = atomic.AddUint32(&lastSocketID, 1) // TODO - work out a solution that works long term - now := time.Now() - memifSocketAddDel := &memif.MemifSocketFilenameAddDel{ + netNSURL = (&url.URL{Scheme: memifMech.SocketFileScheme, Path: path}).String() +} + +func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn Connection, isClient bool) (socketID uint32, err error) { + namespace, err := getNamespace(mechanism, vppConn) + if err != nil { + return 0, err + } + + memifSocketAddDel := &memif.MemifSocketFilenameAddDelV2{ IsAdd: true, - SocketID: socketID, - SocketFilename: u.Path, + SocketID: ^uint32(0), + SocketFilename: mechanism.GetSocketFilename(), + Namespace: namespace, } - if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDel(ctx, memifSocketAddDel); err != nil { + + now := time.Now() + + reply, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDelV2(ctx, memifSocketAddDel) + if err != nil { return 0, errors.WithStack(err) } + memifSocketAddDel.SocketID = reply.SocketID + log.FromContext(ctx). WithField("SocketID", memifSocketAddDel.SocketID). WithField("SocketFilename", memifSocketAddDel.SocketFilename). + WithField("SocketNamespace", memifSocketAddDel.Namespace). WithField("IsAdd", memifSocketAddDel.IsAdd). WithField("duration", time.Since(now)). WithField("vppapi", "MemifSocketFilenameAddDel").Debug("completed") + store(ctx, isClient, memifSocketAddDel) - return socketID, nil + + return memifSocketAddDel.SocketID, nil } func deleteMemifSocket(ctx context.Context, vppConn api.Connection, isClient bool) error { @@ -76,17 +112,23 @@ func deleteMemifSocket(ctx context.Context, vppConn api.Connection, isClient boo if !ok { return nil } + memifSocketAddDel.IsAdd = false + now := time.Now() - if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDel(ctx, memifSocketAddDel); err != nil { + + if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDelV2(ctx, memifSocketAddDel); err != nil { return errors.WithStack(err) } + log.FromContext(ctx). WithField("SocketID", memifSocketAddDel.SocketID). WithField("SocketFilename", memifSocketAddDel.SocketFilename). + WithField("SocketNamespace", memifSocketAddDel.Namespace). WithField("IsAdd", memifSocketAddDel.IsAdd). WithField("duration", time.Since(now)). WithField("vppapi", "MemifSocketFilenameAddDel").Debug("completed") + return nil } @@ -133,7 +175,7 @@ func createMemif(ctx context.Context, vppConn api.Connection, socketID uint32, m } func deleteMemif(ctx context.Context, vppConn api.Connection, isClient bool) error { - swIfIndex, ok := ifindex.LoadAndDelete(ctx, isClient) + swIfIndex, ok := ifindex.Load(ctx, isClient) if !ok { return nil } @@ -152,17 +194,14 @@ func deleteMemif(ctx context.Context, vppConn api.Connection, isClient bool) err return nil } -func create(ctx context.Context, conn *networkservice.Connection, vppConn api.Connection, isClient bool) error { +func create(ctx context.Context, conn *networkservice.Connection, vppConn Connection, isClient bool) error { if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil { // This connection has already been created if _, ok := ifindex.Load(ctx, isClient); ok { return nil } if !isClient { - if err := os.MkdirAll(filepath.Dir(socketFile(conn)), 0700); err != nil { - return errors.Wrapf(err, "failed to create memif socket directory %s", socketFile(conn)) - } - mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.FileScheme, Path: socketFile(conn)}).String()) + mechanism.SetSocketFilename(socketFile(conn)) } mode := memif.MEMIF_MODE_API_IP if conn.GetPayload() == payload.Ethernet { @@ -187,15 +226,39 @@ func del(ctx context.Context, conn *networkservice.Connection, vppConn api.Conne if err := deleteMemifSocket(ctx, vppConn, isClient); err != nil { return err } - if !isClient { - if err := os.RemoveAll(filepath.Dir(socketFile(conn))); err != nil { - return errors.Wrapf(err, "failed to delete %s", filepath.Dir(socketFile(conn))) - } - } } return nil } func socketFile(conn *networkservice.Connection) string { - return filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket") + return "@" + filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket") +} + +func getNamespace(mechanism *memifMech.Mechanism, vppConn Connection) (string, error) { + if mechanism.GetNetNSURL() == netNSURL { + return "", nil + } + + u, err := url.Parse(mechanism.GetNetNSURL()) + if err != nil { + return "", errors.Wrapf(err, "not a valid url %s", mechanism.GetNetNSURL()) + } + if u.Scheme != memifMech.SocketFileScheme { + return "", errors.Errorf("socket file url must have scheme %s, actual %s", memifMech.SocketFileScheme, u.Scheme) + } + + if vppConn.IsExternal() { + return u.Path, nil + } + + targetNetNS, err := netns.GetFromPath(u.Path) + if err != nil { + return "", err + } + defer func() { _ = targetNetNS.Close() }() + + if targetNetNS.Equal(netNS) { + return "", nil + } + return u.Path, nil } diff --git a/pkg/networkservice/mechanisms/memif/memifproxy/client.go b/pkg/networkservice/mechanisms/memif/memifproxy/client.go deleted file mode 100644 index 47825321..00000000 --- a/pkg/networkservice/mechanisms/memif/memifproxy/client.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright (c) 2020-2021 Cisco and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build !windows - -package memifproxy - -import ( - "context" - "net/url" - "os" - "path/filepath" - - "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" - "google.golang.org/grpc" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - memifMech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/postpone" -) - -const ( - memifNetwork = "unixpacket" - maxFDCount = 1 - bufferSize = 128 -) - -type memifProxyClient struct{} - -// New - create a new memifProxy client chain element -func New() networkservice.NetworkServiceClient { - return &memifProxyClient{} -} - -func (m *memifProxyClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - postponeCtxFunc := postpone.ContextWithValues(ctx) - - if mechanism := memifMech.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil { - if listener, ok := load(ctx, metadata.IsClient(m)); ok { - mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.FileScheme, Path: listener.socketFilename}).String()) - } - } - - conn, err := next.Client(ctx).Request(ctx, request, opts...) - if err != nil { - return nil, err - } - - mechanism := memifMech.ToMechanism(conn.GetMechanism()) - if mechanism == nil { - return conn, nil - } - - // If we are already running a proxy... just keep running it - if _, ok := load(ctx, true); ok { - mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.FileScheme, Path: listenSocketFilename(conn)}).String()) - return conn, nil - } - - if err = os.MkdirAll(filepath.Dir(listenSocketFilename(conn)), 0700); err != nil { - err = errors.Wrapf(err, "unable to mkdir %s", filepath.Dir(listenSocketFilename(conn))) - if closeErr := m.closeOnFailure(postponeCtxFunc, conn, opts); closeErr != nil { - err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error()) - } - return nil, err - } - - listener, err := newProxyListener(mechanism, listenSocketFilename(conn)) - if err != nil { - if closeErr := m.closeOnFailure(postponeCtxFunc, conn, opts); closeErr != nil { - err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error()) - } - return nil, err - } - - store(ctx, metadata.IsClient(m), listener) - - return conn, nil -} - -func (m *memifProxyClient) closeOnFailure(postponeCtxFunc func() (context.Context, context.CancelFunc), conn *networkservice.Connection, opts []grpc.CallOption) error { - closeCtx, cancelClose := postponeCtxFunc() - defer cancelClose() - - _, err := m.Close(closeCtx, conn, opts...) - - return err -} - -func (m *memifProxyClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil { - if listener, ok := load(ctx, metadata.IsClient(m)); ok { - mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.FileScheme, Path: listener.socketFilename}).String()) - } - } - - rv, err := next.Client(ctx).Close(ctx, conn) - if listener, ok := loadAndDelete(ctx, metadata.IsClient(m)); ok { - _ = listener.Close() - } - _ = os.RemoveAll(filepath.Dir(listenSocketFilename(conn))) - return rv, err -} - -func listenSocketFilename(conn *networkservice.Connection) string { - return filepath.Join(os.TempDir(), "memifproxy", conn.GetId(), "memif.socket") -} diff --git a/pkg/networkservice/mechanisms/memif/memifproxy/metadata.go b/pkg/networkservice/mechanisms/memif/memifproxy/metadata.go index 92854905..228212ad 100644 --- a/pkg/networkservice/mechanisms/memif/memifproxy/metadata.go +++ b/pkg/networkservice/mechanisms/memif/memifproxy/metadata.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2021 Cisco and/or its affiliates. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,7 +16,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows +// +build linux package memifproxy @@ -26,30 +28,45 @@ import ( type key struct{} -// store sets the *proxyListener stored in per Connection.Id metadata. -func store(ctx context.Context, isClient bool, listener *proxyListener) { - metadata.Map(ctx, isClient).Store(key{}, listener) +func store(ctx context.Context, cancel context.CancelFunc) { + metadata.Map(ctx, false).Store(key{}, cancel) +} + +func load(ctx context.Context) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, false).Load(key{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok } -// load returns the *proxyListener stored in per Connection.Id metadata, or nil if no -// value is present. -// The ok result indicates whether value was found in the per Connection.Id metadata. -func load(ctx context.Context, isClient bool) (value *proxyListener, ok bool) { - rawValue, ok := metadata.Map(ctx, isClient).Load(key{}) +func loadAndDelete(ctx context.Context) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, false).LoadAndDelete(key{}) if !ok { return } - value, ok = rawValue.(*proxyListener) + value, ok = rawValue.(context.CancelFunc) return value, ok } -// loadAndDelete deletes the *proxyListener stored in per Connection.Id metadata, -// returning the previous value if any. The loaded result reports whether the key was present. -func loadAndDelete(ctx context.Context, isClient bool) (value *proxyListener, ok bool) { - rawValue, ok := metadata.Map(ctx, isClient).LoadAndDelete(key{}) +type infoKey struct{} + +// Info contains client NSURL and SocketFile needed for direct memif +type Info struct { + NSURL, SocketFile string +} + +func storeInfo(ctx context.Context, val *Info) { + metadata.Map(ctx, true).Store(infoKey{}, val) +} + +// LoadInfo loads Info stored in context in metadata +func LoadInfo(ctx context.Context) (value *Info, ok bool) { + rawValue, ok := metadata.Map(ctx, true).Load(infoKey{}) if !ok { return } - value, ok = rawValue.(*proxyListener) + value, ok = rawValue.(*Info) return value, ok } diff --git a/pkg/networkservice/mechanisms/memif/memifproxy/proxy_listener.go b/pkg/networkservice/mechanisms/memif/memifproxy/proxy_listener.go deleted file mode 100644 index fb57bc31..00000000 --- a/pkg/networkservice/mechanisms/memif/memifproxy/proxy_listener.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2020-2021 Cisco and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build !windows - -package memifproxy - -import ( - "net" - "net/url" - - "github.com/hashicorp/go-multierror" - memifMech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" - "github.com/pkg/errors" -) - -type proxyListener struct { - listener net.Listener - socketFilename string - proxyConnections []*proxyConnection -} - -func newProxyListener(mechanism *memifMech.Mechanism, listenSocketFilename string) (*proxyListener, error) { - // Extract the socket filename - u, err := url.Parse(mechanism.GetSocketFileURL()) - if err != nil { - return nil, errors.Wrapf(err, "not a valid url %q", mechanism.GetSocketFileURL()) - } - if u.Scheme != memifMech.FileScheme { - return nil, errors.Errorf("socket file url must have scheme %q, actual %q", memifMech.FileScheme, u.Scheme) - } - p := &proxyListener{ - socketFilename: u.Path, - } - - // Do a trial dial to ensure we can actually proxy - trialConn, err := net.Dial(memifNetwork, u.Path) - if err != nil { - return nil, errors.Wrapf(err, "proxyListener unable to dial %s", p.socketFilename) - } - _ = trialConn.Close() - - p.listener, err = net.Listen(memifNetwork, listenSocketFilename) - if err != nil { - return nil, errors.Wrapf(err, "proxyListener unable to listen on %s", listenSocketFilename) - } - go p.accept() - mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.FileScheme, Path: listenSocketFilename}).String()) - return p, nil -} - -func (p *proxyListener) accept() { - defer func() { _ = p.Close() }() - for { - in, err := p.listener.Accept() - if err != nil { - if optErr, ok := err.(*net.OpError); !ok || !optErr.Temporary() { - // TODO - perhaps log this? - return - } - } - - out, err := net.Dial(memifNetwork, p.socketFilename) - if err != nil { - if optErr, ok := err.(*net.OpError); !ok || !optErr.Temporary() { - _ = in.Close() - // TODO - perhaps log this? - return - } - } - - proxyConn, err := newProxyConnection(in, out) - if err != nil { - _ = in.Close() - _ = out.Close() - // TODO - perhaps log this? - return - } - - // TODO - clean up - while 99% of the time this won't be an issue because we will have exactly one thing - // in this list... in principle it could leak memory - p.proxyConnections = append(p.proxyConnections, proxyConn) - } -} - -func (p *proxyListener) Close() error { - if p == nil { - return nil - } - err := p.listener.Close() - for _, proxyConn := range p.proxyConnections { - err = multierror.Append(err, proxyConn.Close()) - } - return err -} diff --git a/pkg/networkservice/mechanisms/memif/memifproxy/server.go b/pkg/networkservice/mechanisms/memif/memifproxy/server.go new file mode 100644 index 00000000..771cf9ff --- /dev/null +++ b/pkg/networkservice/mechanisms/memif/memifproxy/server.go @@ -0,0 +1,120 @@ +// Copyright (c) 2020-2021 Cisco and/or its affiliates. +// +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package memifproxy + +import ( + "context" + "os" + "path/filepath" + + "github.com/golang/protobuf/ptypes/empty" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + memifMech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" + "github.com/pkg/errors" + + "github.com/networkservicemesh/sdk-vpp/pkg/tools/proxy" +) + +const memifNetwork = "unixpacket" + +type memifProxyServer struct { + chainCtx context.Context +} + +// NewServer - create a new memifProxy server chain element +func NewServer(chainCtx context.Context) networkservice.NetworkServiceServer { + return &memifProxyServer{ + chainCtx: chainCtx, + } +} + +func (m *memifProxyServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + postponeCtxFunc := postpone.ContextWithValues(ctx) + + var mechanism *memifMech.Mechanism + if mechanism = memifMech.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil { + storeInfo(ctx, new(Info)) + } + + conn, err := next.Server(ctx).Request(ctx, request) + if err != nil { + return nil, err + } + + // If it is NOT a direct memif case, do nothing. + info, _ := LoadInfo(ctx) + if info.SocketFile == "" { + return conn, nil + } + + if err = create(ctx, m.chainCtx, conn, info); err != nil { + closeCtx, cancelClose := postponeCtxFunc() + defer cancelClose() + + if _, closeErr := m.Close(closeCtx, conn); closeErr != nil { + err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error()) + } + + return nil, err + } + + return conn, nil +} + +func create(ctx, chainCtx context.Context, conn *networkservice.Connection, info *Info) error { + if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil { + // This connection has already been created + if _, ok := load(ctx); ok { + return nil + } + + proxyCtx, cancelProxy := context.WithCancel(chainCtx) + + if err := proxy.Start( + proxyCtx, memifNetwork, + mechanism.GetNetNSURL(), listenSocketFilename(conn), + info.NSURL, info.SocketFile, + ); err != nil { + cancelProxy() + return err + } + store(ctx, cancelProxy) + + mechanism.SetSocketFilename(listenSocketFilename(conn)) + } + return nil +} + +func (m *memifProxyServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil { + if cancelProxy, ok := loadAndDelete(ctx); ok { + cancelProxy() + } + } + return next.Server(ctx).Close(ctx, conn) +} + +func listenSocketFilename(conn *networkservice.Connection) string { + return "@" + filepath.Join(os.TempDir(), "memifproxy", conn.GetId(), "memif.socket") +} diff --git a/pkg/networkservice/mechanisms/memif/metatdata.go b/pkg/networkservice/mechanisms/memif/metadata.go similarity index 66% rename from pkg/networkservice/mechanisms/memif/metatdata.go rename to pkg/networkservice/mechanisms/memif/metadata.go index ac952aac..4bfcf19e 100644 --- a/pkg/networkservice/mechanisms/memif/metatdata.go +++ b/pkg/networkservice/mechanisms/memif/metadata.go @@ -25,34 +25,16 @@ import ( ) type key struct{} -type directMemifKey struct{} -func store(ctx context.Context, isClient bool, socket *memif.MemifSocketFilenameAddDel) { +func store(ctx context.Context, isClient bool, socket *memif.MemifSocketFilenameAddDelV2) { metadata.Map(ctx, isClient).Store(key{}, socket) } -func load(ctx context.Context, isClient bool) (value *memif.MemifSocketFilenameAddDel, ok bool) { +func load(ctx context.Context, isClient bool) (value *memif.MemifSocketFilenameAddDelV2, ok bool) { rawValue, ok := metadata.Map(ctx, isClient).Load(key{}) if !ok { return } - value, ok = rawValue.(*memif.MemifSocketFilenameAddDel) + value, ok = rawValue.(*memif.MemifSocketFilenameAddDelV2) return value, ok } - -func storeDirectMemifInfo(ctx context.Context, val directMemifInfo) { - metadata.Map(ctx, true).Store(directMemifKey{}, val) -} - -func loadDirectMemifInfo(ctx context.Context) (value directMemifInfo, ok bool) { - rawValue, ok := metadata.Map(ctx, true).Load(directMemifKey{}) - if !ok { - return - } - value, ok = rawValue.(directMemifInfo) - return value, ok -} - -type directMemifInfo struct { - socketURL string -} diff --git a/pkg/networkservice/mechanisms/memif/option.go b/pkg/networkservice/mechanisms/memif/option.go index 16327dc4..23c46198 100644 --- a/pkg/networkservice/mechanisms/memif/option.go +++ b/pkg/networkservice/mechanisms/memif/option.go @@ -16,12 +16,24 @@ package memif +type memifOptions struct { + directMemifEnabled bool + changeNetNS bool +} + // Option is an option for the connect server -type Option func(s *memifServer) +type Option func(o *memifOptions) // WithDirectMemif turn on direct memif logic func WithDirectMemif() Option { - return func(s *memifServer) { - s.directMemifEnabled = true + return func(o *memifOptions) { + o.directMemifEnabled = true + } +} + +// WithChangeNetNS sets if memif client/server should change net NS instead of using own one for creating socket +func WithChangeNetNS() Option { + return func(o *memifOptions) { + o.changeNetNS = true } } diff --git a/pkg/networkservice/mechanisms/memif/server.go b/pkg/networkservice/mechanisms/memif/server.go index c2e97024..7e15cbc2 100644 --- a/pkg/networkservice/mechanisms/memif/server.go +++ b/pkg/networkservice/mechanisms/memif/server.go @@ -14,48 +14,59 @@ // See the License for the specific language governing permissions and // limitations under the License. +//+build linux + package memif import ( "context" - "git.fd.io/govpp.git/api" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "github.com/networkservicemesh/api/pkg/api/networkservice" - memifMech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" - + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/postpone" + + "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/memif/memifproxy" ) type memifServer struct { - vppConn api.Connection - directMemifEnabled bool + vppConn Connection + changeNetNS bool } // NewServer provides a NetworkServiceServer chain elements that support the memif Mechanism -func NewServer(vppConn api.Connection, options ...Option) networkservice.NetworkServiceServer { - m := &memifServer{ - vppConn: vppConn, - directMemifEnabled: false, +func NewServer(chainCtx context.Context, vppConn Connection, options ...Option) networkservice.NetworkServiceServer { + opts := new(memifOptions) + for _, o := range options { + o(opts) } - for _, opt := range options { - opt(m) + memifProxyServer := null.NewServer() + if opts.directMemifEnabled { + memifProxyServer = memifproxy.NewServer(chainCtx) } - return m + return chain.NewNetworkServiceServer( + memifProxyServer, + &memifServer{ + vppConn: vppConn, + changeNetNS: opts.changeNetNS, + }, + ) } func (m *memifServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { postponeCtxFunc := postpone.ContextWithValues(ctx) - // if direct memif we need pass mechanism to client further in request chain - if mechanism := memifMech.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil && m.directMemifEnabled { - storeDirectMemifInfo(ctx, directMemifInfo{}) + if mechanism := memif.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil && !m.changeNetNS { + mechanism.GetParameters()[common.InodeURL] = netNSURL } conn, err := next.Server(ctx).Request(ctx, request) @@ -63,33 +74,25 @@ func (m *memifServer) Request(ctx context.Context, request *networkservice.Netwo return nil, err } - // if direct memif case - just set memif socket name in connection.Mechanism - // if not direct memif case - create memif as always - dirMemifInfo, ok := loadDirectMemifInfo(ctx) - if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil && ok && len(dirMemifInfo.socketURL) > 0 { - mechanism.SetSocketFileURL(dirMemifInfo.socketURL) + // In direct memif case do nothing. + if info, ok := memifproxy.LoadInfo(ctx); ok && info.SocketFile != "" { return conn, nil } - if err := create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil { - if closeErr := m.closeOnFailure(postponeCtxFunc, conn); closeErr != nil { + if err = create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil { + closeCtx, cancelClose := postponeCtxFunc() + defer cancelClose() + + if _, closeErr := m.Close(closeCtx, conn); closeErr != nil { err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error()) } + return nil, err } return conn, nil } -func (m *memifServer) closeOnFailure(postponeCtxFunc func() (context.Context, context.CancelFunc), conn *networkservice.Connection) error { - closeCtx, cancelClose := postponeCtxFunc() - defer cancelClose() - - _, err := m.Close(closeCtx, conn) - - return err -} - func (m *memifServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { _ = del(ctx, conn, m.vppConn, metadata.IsClient(m)) return next.Server(ctx).Close(ctx, conn) diff --git a/pkg/networkservice/mechanisms/memif/memifproxy/proxy_connection.go b/pkg/tools/proxy/proxy_connection.go similarity index 94% rename from pkg/networkservice/mechanisms/memif/memifproxy/proxy_connection.go rename to pkg/tools/proxy/proxy_connection.go index fa4a1dc9..6ba37f64 100644 --- a/pkg/networkservice/mechanisms/memif/memifproxy/proxy_connection.go +++ b/pkg/tools/proxy/proxy_connection.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2021 Cisco and/or its affiliates. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,9 +16,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows +// +build linux -package memifproxy +package proxy import ( "net" @@ -26,6 +28,11 @@ import ( "github.com/pkg/errors" ) +const ( + maxFDCount = 1 + bufferSize = 128 +) + type proxyConnection struct { in net.Conn out net.Conn diff --git a/pkg/tools/proxy/proxy_listener.go b/pkg/tools/proxy/proxy_listener.go new file mode 100644 index 00000000..8bc102bf --- /dev/null +++ b/pkg/tools/proxy/proxy_listener.go @@ -0,0 +1,158 @@ +// Copyright (c) 2020-2021 Cisco and/or its affiliates. +// +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +// Package proxy provides method for proxying socket from one net NS to other +package proxy + +import ( + "context" + "net" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + "github.com/vishvananda/netns" + + "github.com/networkservicemesh/sdk-kernel/pkg/kernel/tools/nshandle" + + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type proxyListener struct { + network string + + targetNSHandle netns.NsHandle + targetSocketFilename string + + listener net.Listener + + proxyConnections []*proxyConnection +} + +// Start starts proxying {targetNSURL, targetSocketFilename} socket via {nsURL, socketFilename} socket +func Start( + ctx context.Context, network string, + nsURL, socketFilename string, + targetNSURL, targetSocketFilename string, +) (err error) { + p := &proxyListener{ + network: network, + targetNSHandle: netns.None(), + targetSocketFilename: targetSocketFilename, + } + defer func() { + if err != nil { + _ = p.Close() + } + }() + + nsHandle, err := nshandle.FromURL(nsURL) + if err != nil { + return errors.Wrap(err, "failed to get server net NS handle") + } + defer func() { _ = nsHandle.Close() }() + + p.listener, err = listen(p.network, socketFilename, nsHandle) + if err != nil { + return errors.Wrap(err, "failed to start listening") + } + + p.targetNSHandle, err = nshandle.FromURL(targetNSURL) + if err != nil { + return errors.Wrap(err, "failed to get server net NS handle") + } + + // Do a trial dial to ensure we can actually proxy + trialConn, err := dial(p.network, p.targetSocketFilename, p.targetNSHandle) + if err != nil { + return errors.Wrapf(err, "unable to dial %s", targetSocketFilename) + } + _ = trialConn.Close() + + go p.accept(ctx) + + return nil +} + +func (p *proxyListener) Close() error { + if p == nil { + return nil + } + + var err error + if p.targetNSHandle.IsOpen() { + err = multierror.Append(err, p.targetNSHandle.Close()) + } + if p.listener != nil { + err = multierror.Append(err, p.listener.Close()) + } + for _, proxyConn := range p.proxyConnections { + err = multierror.Append(err, proxyConn.Close()) + } + + return err +} + +func (p *proxyListener) accept(ctx context.Context) { + logger := log.FromContext(ctx). + WithField("proxy.proxyListener", "accept"). + WithField("proxy", p.listener.Addr().String()). + WithField("target", p.targetSocketFilename) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + <-ctx.Done() + _ = p.Close() + }() + + for { + in, err := p.listener.Accept() + if err != nil { + if optErr, ok := err.(*net.OpError); !ok || !optErr.Temporary() { + logger.Warnf("failed to accept: %s", err.Error()) + return + } + } + + out, err := dial(p.network, p.targetSocketFilename, p.targetNSHandle) + if err != nil { + if optErr, ok := err.(*net.OpError); !ok || !optErr.Temporary() { + logger.Warnf("failed to dial: %s", err.Error()) + _ = in.Close() + return + } + } + + proxyConn, err := newProxyConnection(in, out) + if err != nil { + logger.Warnf("failed to copy data: %s", err.Error()) + _ = in.Close() + _ = out.Close() + return + } + + logger.Debug("established a new connection") + + // TODO - clean up - while 99% of the time this won't be an issue because we will have exactly one thing + // in this list... in principle it could leak memory + p.proxyConnections = append(p.proxyConnections, proxyConn) + } +} diff --git a/pkg/tools/proxy/proxy_test.go b/pkg/tools/proxy/proxy_test.go new file mode 100644 index 00000000..dc781669 --- /dev/null +++ b/pkg/tools/proxy/proxy_test.go @@ -0,0 +1,23 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import "testing" + +func TestStart(t *testing.T) { + // FIXME: add tests +} diff --git a/pkg/tools/proxy/utils.go b/pkg/tools/proxy/utils.go new file mode 100644 index 00000000..e8a7ca3b --- /dev/null +++ b/pkg/tools/proxy/utils.go @@ -0,0 +1,60 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package proxy + +import ( + "net" + + "github.com/pkg/errors" + "github.com/vishvananda/netns" + + "github.com/networkservicemesh/sdk-kernel/pkg/kernel/tools/nshandle" +) + +func listen(network, address string, nsHandle netns.NsHandle) (ln net.Listener, err error) { + current, err := nshandle.Current() + if err != nil { + return nil, errors.Wrap(err, "failed to get current net NS") + } + defer func() { _ = current.Close() }() + + err = nshandle.RunIn(current, nsHandle, func() error { + var listenErr error + ln, listenErr = net.Listen(network, address) + return listenErr + }) + + return ln, err +} + +func dial(network, address string, nsHandle netns.NsHandle) (conn net.Conn, err error) { + current, err := nshandle.Current() + if err != nil { + return nil, errors.Wrap(err, "failed to get current net NS") + } + defer func() { _ = current.Close() }() + + err = nshandle.RunIn(current, nsHandle, func() error { + var dialErr error + conn, dialErr = net.Dial(network, address) + return dialErr + }) + + return conn, err +} From 0f5bf22de5cf7f7a8aa2c17eeeb94da96a596d4c Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Wed, 27 Oct 2021 15:28:52 +0700 Subject: [PATCH 2/6] Add tests for proxy Signed-off-by: Vladimir Popov --- .github/workflows/ci.yaml | 4 + go.mod | 1 + pkg/tools/proxy/proxy_listener.go | 3 +- pkg/tools/proxy/proxy_test.go | 139 +++++++++++++++++++++++++++++- 4 files changed, 143 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 33abb3ac..5e40bee8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -34,6 +34,10 @@ jobs: - name: Build run: go build -race ./... - name: Test + if: matrix.os != 'windows-latest' + run: sudo -E PATH="$PATH" bash -c "go test -race ./..." + - name: Test + if: matrix.os == 'windows-latest' run: go test -race ./... golangci-lint: name: golangci-lint diff --git a/go.mod b/go.mod index 768b8b95..a2fdcf66 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/thanhpk/randstr v1.0.4 github.com/vishvananda/netlink v1.1.0 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae + go.uber.org/goleak v1.1.10 golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b google.golang.org/grpc v1.35.0 diff --git a/pkg/tools/proxy/proxy_listener.go b/pkg/tools/proxy/proxy_listener.go index 8bc102bf..d593829f 100644 --- a/pkg/tools/proxy/proxy_listener.go +++ b/pkg/tools/proxy/proxy_listener.go @@ -120,9 +120,10 @@ func (p *proxyListener) accept(ctx context.Context) { go func() { <-ctx.Done() - _ = p.Close() + _ = p.listener.Close() }() + defer func() { _ = p.Close() }() for { in, err := p.listener.Accept() if err != nil { diff --git a/pkg/tools/proxy/proxy_test.go b/pkg/tools/proxy/proxy_test.go index dc781669..3202ff0a 100644 --- a/pkg/tools/proxy/proxy_test.go +++ b/pkg/tools/proxy/proxy_test.go @@ -14,10 +14,143 @@ // See the License for the specific language governing permissions and // limitations under the License. -package proxy +// +build linux -import "testing" +package proxy_test + +import ( + "context" + "io" + "net" + "path" + "runtime" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "github.com/vishvananda/netns" + "go.uber.org/goleak" + + "github.com/networkservicemesh/sdk-vpp/pkg/tools/proxy" +) + +const ( + network = "unix" + ping = "ping" + pong = "pong" +) func TestStart(t *testing.T) { - // FIXME: add tests + t.Cleanup(func() { goleak.VerifyNone(t) }) + + tempDir := t.TempDir() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + defaultNS, err := netns.Get() + require.NoError(t, err) + defer func() { _ = defaultNS.Close() }() + + proxyNS, err := netns.NewNamed("proxy") + require.NoError(t, err) + defer func() { _ = proxyNS.Close() }() + + targetNS, err := netns.NewNamed("target") + require.NoError(t, err) + defer func() { _ = targetNS.Close() }() + + // 1. Start listening in target net NS. + + require.NoError(t, netns.Set(targetNS)) + + targetFile := "@" + path.Join(tempDir, "target") + + l, err := net.Listen("unix", targetFile) + require.NoError(t, err) + go func() { + <-ctx.Done() + _ = l.Close() + }() + + pongCh := doPong(l) + + // 2. Create proxy from default net NS. + + require.NoError(t, netns.Set(defaultNS)) + + proxyFile := "@" + path.Join(tempDir, "proxy") + + require.NoError(t, proxy.Start(ctx, network, nsURL("proxy"), proxyFile, nsURL("target"), targetFile)) + + // 3. Dial proxy from proxy net NS. + + require.NoError(t, netns.Set(proxyNS)) + + conn, err := net.Dial(network, proxyFile) + require.NoError(t, err) + go func() { + <-ctx.Done() + _ = l.Close() + }() + + doPing(t, conn) + require.NoError(t, <-pongCh) +} + +func doPong(l net.Listener) <-chan error { + ch := make(chan error, 1) + go func() { + defer close(ch) + for { + conn, err := l.Accept() + if err != nil { + ch <- err + return + } + defer func() { _ = conn.Close() }() + + buff := make([]byte, 10) + n, err := conn.Read(buff) + if err == io.EOF { + // test dial + continue + } + if err != nil { + ch <- err + return + } + if msg := string(buff[:n]); msg != ping { + ch <- errors.Errorf("expected %s, actual %s", ping, msg) + return + } + + _, err = conn.Write([]byte(pong)) + if err != nil { + ch <- err + } + return + } + }() + return ch +} + +func doPing(t *testing.T, conn io.ReadWriter) { + _, err := conn.Write([]byte(ping)) + require.NoError(t, err) + + buff := make([]byte, 10) + n, err := conn.Read(buff) + if err != nil { + require.EqualError(t, err, io.EOF.Error()) + } + require.Equal(t, pong, string(buff[:n])) +} + +func nsURL(name string) string { + return "file://" + path.Join("/run/netns", name) } From e87be57e8b8890829d927c57e70f1ed6d951253f Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Wed, 3 Nov 2021 11:26:16 +0700 Subject: [PATCH 3/6] Rework memif to new api, add WithExternalVPP option Signed-off-by: Vladimir Popov --- pkg/networkservice/chains/forwarder/server.go | 8 ++--- pkg/networkservice/mechanisms/memif/client.go | 18 ++++++----- pkg/networkservice/mechanisms/memif/common.go | 31 +++++++------------ pkg/networkservice/mechanisms/memif/option.go | 10 +++++- pkg/networkservice/mechanisms/memif/server.go | 14 ++++++--- 5 files changed, 45 insertions(+), 36 deletions(-) diff --git a/pkg/networkservice/chains/forwarder/server.go b/pkg/networkservice/chains/forwarder/server.go index 60b4b077..17f1284e 100644 --- a/pkg/networkservice/chains/forwarder/server.go +++ b/pkg/networkservice/chains/forwarder/server.go @@ -63,8 +63,6 @@ import ( // Connection aggregates the api.Connection and api.ChannelProvider interfaces type Connection interface { - IsExternal() bool - api.Connection api.ChannelProvider } @@ -97,7 +95,8 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ memif.MECHANISM: memif.NewServer(ctx, vppConn, memif.WithDirectMemif(), - memif.WithChangeNetNS()), + memif.WithChangeNetNS(), + memif.WithExternalVPP()), kernel.MECHANISM: kernel.NewServer(vppConn), vxlan.MECHANISM: vxlan.NewServer(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)), wireguard.MECHANISM: wireguard.NewServer(vppConn, tunnelIP), @@ -118,7 +117,8 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw tag.NewClient(ctx, vppConn), // mechanisms memif.NewClient(vppConn, - memif.WithChangeNetNS()), + memif.WithChangeNetNS(), + memif.WithExternalVPP()), kernel.NewClient(vppConn), vxlan.NewClient(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)), wireguard.NewClient(vppConn, tunnelIP), diff --git a/pkg/networkservice/mechanisms/memif/client.go b/pkg/networkservice/mechanisms/memif/client.go index 880a1168..d3439c55 100644 --- a/pkg/networkservice/mechanisms/memif/client.go +++ b/pkg/networkservice/mechanisms/memif/client.go @@ -23,6 +23,7 @@ package memif import ( "context" + "git.fd.io/govpp.git/api" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "google.golang.org/grpc" @@ -38,12 +39,12 @@ import ( ) type memifClient struct { - vppConn Connection + vppConn *vppConnection changeNetNs bool } // NewClient provides a NetworkServiceClient chain elements that support the memif Mechanism -func NewClient(vppConn Connection, options ...Option) networkservice.NetworkServiceClient { +func NewClient(vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient { opts := &memifOptions{} for _, o := range options { o(opts) @@ -51,7 +52,10 @@ func NewClient(vppConn Connection, options ...Option) networkservice.NetworkServ return chain.NewNetworkServiceClient( &memifClient{ - vppConn: vppConn, + vppConn: &vppConnection{ + isExternal: opts.isVPPExternal, + Connection: vppConn, + }, changeNetNs: opts.changeNetNS, }, ) @@ -68,11 +72,11 @@ func mechanismsContain(list []*networkservice.Mechanism, t string) bool { func (m *memifClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { if !mechanismsContain(request.MechanismPreferences, memif.MECHANISM) { - var u string - if !m.changeNetNs { - u = netNSURL + mechanism := memif.ToMechanism(memif.NewAbstract(netNSPath)) + if m.changeNetNs { + mechanism.SetNetNSURL("") } - request.MechanismPreferences = append(request.MechanismPreferences, memif.New(u)) + request.MechanismPreferences = append(request.MechanismPreferences, mechanism.Mechanism) } postponeCtxFunc := postpone.ContextWithValues(ctx) diff --git a/pkg/networkservice/mechanisms/memif/common.go b/pkg/networkservice/mechanisms/memif/common.go index 71e5181a..3744a5b7 100644 --- a/pkg/networkservice/mechanisms/memif/common.go +++ b/pkg/networkservice/mechanisms/memif/common.go @@ -43,16 +43,15 @@ import ( "github.com/networkservicemesh/sdk-vpp/pkg/tools/ifindex" ) -// Connection is an api.Connection with IsExternal method -type Connection interface { - IsExternal() bool +type vppConnection struct { + isExternal bool api.Connection } var ( - netNS netns.NsHandle - netNSURL string + netNS netns.NsHandle + netNSPath string ) // nolint:gochecknoinits @@ -64,16 +63,14 @@ func init() { if err != nil { panic("failed to open '/proc/thread-self/ns/net': " + err.Error()) } - path := fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd) + netNSPath = fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd) - if netNS, err = netns.GetFromPath(path); err != nil { + if netNS, err = netns.GetFromPath(netNSPath); err != nil { panic("failed to get current net NS: " + err.Error()) } - - netNSURL = (&url.URL{Scheme: memifMech.SocketFileScheme, Path: path}).String() } -func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn Connection, isClient bool) (socketID uint32, err error) { +func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn *vppConnection, isClient bool) (socketID uint32, err error) { namespace, err := getNamespace(mechanism, vppConn) if err != nil { return 0, err @@ -194,7 +191,7 @@ func deleteMemif(ctx context.Context, vppConn api.Connection, isClient bool) err return nil } -func create(ctx context.Context, conn *networkservice.Connection, vppConn Connection, isClient bool) error { +func create(ctx context.Context, conn *networkservice.Connection, vppConn *vppConnection, isClient bool) error { if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil { // This connection has already been created if _, ok := ifindex.Load(ctx, isClient); ok { @@ -234,20 +231,16 @@ func socketFile(conn *networkservice.Connection) string { return "@" + filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket") } -func getNamespace(mechanism *memifMech.Mechanism, vppConn Connection) (string, error) { - if mechanism.GetNetNSURL() == netNSURL { - return "", nil - } - +func getNamespace(mechanism *memifMech.Mechanism, vppConn *vppConnection) (string, error) { u, err := url.Parse(mechanism.GetNetNSURL()) if err != nil { return "", errors.Wrapf(err, "not a valid url %s", mechanism.GetNetNSURL()) } - if u.Scheme != memifMech.SocketFileScheme { - return "", errors.Errorf("socket file url must have scheme %s, actual %s", memifMech.SocketFileScheme, u.Scheme) + if u.Scheme != memifMech.FileScheme { + return "", errors.Errorf("socket file url must have scheme %s, actual %s", memifMech.FileScheme, u.Scheme) } - if vppConn.IsExternal() { + if vppConn.isExternal { return u.Path, nil } diff --git a/pkg/networkservice/mechanisms/memif/option.go b/pkg/networkservice/mechanisms/memif/option.go index 23c46198..f5b999a4 100644 --- a/pkg/networkservice/mechanisms/memif/option.go +++ b/pkg/networkservice/mechanisms/memif/option.go @@ -19,12 +19,13 @@ package memif type memifOptions struct { directMemifEnabled bool changeNetNS bool + isVPPExternal bool } // Option is an option for the connect server type Option func(o *memifOptions) -// WithDirectMemif turn on direct memif logic +// WithDirectMemif turns on direct memif logic func WithDirectMemif() Option { return func(o *memifOptions) { o.directMemifEnabled = true @@ -37,3 +38,10 @@ func WithChangeNetNS() Option { o.changeNetNS = true } } + +// WithExternalVPP sets if VPP is located in different net NS to the application +func WithExternalVPP() Option { + return func(o *memifOptions) { + o.isVPPExternal = true + } +} diff --git a/pkg/networkservice/mechanisms/memif/server.go b/pkg/networkservice/mechanisms/memif/server.go index 7e15cbc2..dcd5688b 100644 --- a/pkg/networkservice/mechanisms/memif/server.go +++ b/pkg/networkservice/mechanisms/memif/server.go @@ -20,12 +20,13 @@ package memif import ( "context" + "net/url" + "git.fd.io/govpp.git/api" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" @@ -37,12 +38,12 @@ import ( ) type memifServer struct { - vppConn Connection + vppConn *vppConnection changeNetNS bool } // NewServer provides a NetworkServiceServer chain elements that support the memif Mechanism -func NewServer(chainCtx context.Context, vppConn Connection, options ...Option) networkservice.NetworkServiceServer { +func NewServer(chainCtx context.Context, vppConn api.Connection, options ...Option) networkservice.NetworkServiceServer { opts := new(memifOptions) for _, o := range options { o(opts) @@ -56,7 +57,10 @@ func NewServer(chainCtx context.Context, vppConn Connection, options ...Option) return chain.NewNetworkServiceServer( memifProxyServer, &memifServer{ - vppConn: vppConn, + vppConn: &vppConnection{ + isExternal: opts.isVPPExternal, + Connection: vppConn, + }, changeNetNS: opts.changeNetNS, }, ) @@ -66,7 +70,7 @@ func (m *memifServer) Request(ctx context.Context, request *networkservice.Netwo postponeCtxFunc := postpone.ContextWithValues(ctx) if mechanism := memif.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil && !m.changeNetNS { - mechanism.GetParameters()[common.InodeURL] = netNSURL + mechanism.SetNetNSURL((&url.URL{Scheme: memif.FileScheme, Path: netNSPath}).String()) } conn, err := next.Server(ctx).Request(ctx, request) From e6df795802b34ad44b76e13d139162a50a25cea6 Mon Sep 17 00:00:00 2001 From: Mikhail Avramenko Date: Thu, 16 Dec 2021 12:15:01 +0700 Subject: [PATCH 4/6] refactoring of init() Signed-off-by: Mikhail Avramenko --- pkg/networkservice/mechanisms/memif/client.go | 2 ++ pkg/networkservice/mechanisms/memif/common.go | 5 +++-- pkg/networkservice/mechanisms/memif/server.go | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/networkservice/mechanisms/memif/client.go b/pkg/networkservice/mechanisms/memif/client.go index d3439c55..4bedff07 100644 --- a/pkg/networkservice/mechanisms/memif/client.go +++ b/pkg/networkservice/mechanisms/memif/client.go @@ -45,6 +45,8 @@ type memifClient struct { // NewClient provides a NetworkServiceClient chain elements that support the memif Mechanism func NewClient(vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient { + once.Do(setupNetNS) + opts := &memifOptions{} for _, o := range options { o(opts) diff --git a/pkg/networkservice/mechanisms/memif/common.go b/pkg/networkservice/mechanisms/memif/common.go index 3744a5b7..39079c64 100644 --- a/pkg/networkservice/mechanisms/memif/common.go +++ b/pkg/networkservice/mechanisms/memif/common.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "runtime" + "sync" "time" "git.fd.io/govpp.git/api" @@ -52,10 +53,10 @@ type vppConnection struct { var ( netNS netns.NsHandle netNSPath string + once sync.Once ) -// nolint:gochecknoinits -func init() { +func setupNetNS() { runtime.LockOSThread() defer runtime.UnlockOSThread() diff --git a/pkg/networkservice/mechanisms/memif/server.go b/pkg/networkservice/mechanisms/memif/server.go index dcd5688b..dbba8949 100644 --- a/pkg/networkservice/mechanisms/memif/server.go +++ b/pkg/networkservice/mechanisms/memif/server.go @@ -44,6 +44,8 @@ type memifServer struct { // NewServer provides a NetworkServiceServer chain elements that support the memif Mechanism func NewServer(chainCtx context.Context, vppConn api.Connection, options ...Option) networkservice.NetworkServiceServer { + once.Do(setupNetNS) + opts := new(memifOptions) for _, o := range options { o(opts) From 3c68e618d7871a91b017b4bfb6c3be5898ccb32c Mon Sep 17 00:00:00 2001 From: Mikhail Avramenko Date: Fri, 17 Dec 2021 11:54:24 +0700 Subject: [PATCH 5/6] refactored to remove global variables Signed-off-by: Mikhail Avramenko --- pkg/networkservice/chains/forwarder/server.go | 8 +++-- pkg/networkservice/mechanisms/memif/client.go | 10 +++---- .../mechanisms/memif/client_test.go | 4 +-- pkg/networkservice/mechanisms/memif/common.go | 30 +++++++++++-------- pkg/networkservice/mechanisms/memif/server.go | 10 +++---- 5 files changed, 35 insertions(+), 27 deletions(-) diff --git a/pkg/networkservice/chains/forwarder/server.go b/pkg/networkservice/chains/forwarder/server.go index 17f1284e..d18e4cb4 100644 --- a/pkg/networkservice/chains/forwarder/server.go +++ b/pkg/networkservice/chains/forwarder/server.go @@ -79,6 +79,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw ) nsClient := registryclient.NewNetworkServiceRegistryClient(ctx, clientURL, registryclient.WithDialOptions(clientDialOptions...)) + netNsInfo := memif.NewNetNSInfo() rv := &xconnectNSServer{} additionalFunctionality := []networkservice.NetworkServiceServer{ recvfd.NewServer(), @@ -93,7 +94,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw tag.NewServer(ctx, vppConn), mtu.NewServer(vppConn), mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ - memif.MECHANISM: memif.NewServer(ctx, vppConn, + memif.MECHANISM: memif.NewServer(ctx, vppConn, netNsInfo, memif.WithDirectMemif(), memif.WithChangeNetNS(), memif.WithExternalVPP()), @@ -116,9 +117,10 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw mtu.NewClient(vppConn), tag.NewClient(ctx, vppConn), // mechanisms - memif.NewClient(vppConn, + memif.NewClient(vppConn, netNsInfo, memif.WithChangeNetNS(), - memif.WithExternalVPP()), + memif.WithExternalVPP(), + ), kernel.NewClient(vppConn), vxlan.NewClient(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)), wireguard.NewClient(vppConn, tunnelIP), diff --git a/pkg/networkservice/mechanisms/memif/client.go b/pkg/networkservice/mechanisms/memif/client.go index 4bedff07..bcc2da86 100644 --- a/pkg/networkservice/mechanisms/memif/client.go +++ b/pkg/networkservice/mechanisms/memif/client.go @@ -41,12 +41,11 @@ import ( type memifClient struct { vppConn *vppConnection changeNetNs bool + nsInfo NetNSInfo } // NewClient provides a NetworkServiceClient chain elements that support the memif Mechanism -func NewClient(vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient { - once.Do(setupNetNS) - +func NewClient(vppConn api.Connection, nsInfo NetNSInfo, options ...Option) networkservice.NetworkServiceClient { opts := &memifOptions{} for _, o := range options { o(opts) @@ -59,6 +58,7 @@ func NewClient(vppConn api.Connection, options ...Option) networkservice.Network Connection: vppConn, }, changeNetNs: opts.changeNetNS, + nsInfo: nsInfo, }, ) } @@ -74,7 +74,7 @@ func mechanismsContain(list []*networkservice.Mechanism, t string) bool { func (m *memifClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { if !mechanismsContain(request.MechanismPreferences, memif.MECHANISM) { - mechanism := memif.ToMechanism(memif.NewAbstract(netNSPath)) + mechanism := memif.ToMechanism(memif.NewAbstract(m.nsInfo.netNSPath)) if m.changeNetNs { mechanism.SetNetNSURL("") } @@ -97,7 +97,7 @@ func (m *memifClient) Request(ctx context.Context, request *networkservice.Netwo } } - if err = create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil { + if err = create(ctx, conn, m.vppConn, metadata.IsClient(m), m.nsInfo.netNS); err != nil { closeCtx, cancelClose := postponeCtxFunc() defer cancelClose() diff --git a/pkg/networkservice/mechanisms/memif/client_test.go b/pkg/networkservice/mechanisms/memif/client_test.go index a2c52d81..e4156fee 100644 --- a/pkg/networkservice/mechanisms/memif/client_test.go +++ b/pkg/networkservice/mechanisms/memif/client_test.go @@ -33,7 +33,7 @@ import ( ) func Test_MemifClient_ShouldAppendMechanismIfMemifMechanismMissed(t *testing.T) { - c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil)) + c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil, memif.NetNSInfo{})) req := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{}, @@ -55,7 +55,7 @@ func Test_MemifClient_ShouldAppendMechanismIfMemifMechanismMissed(t *testing.T) } func Test_MemifClient_ShouldNotDuplicateMechanisms(t *testing.T) { - c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil)) + c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil, memif.NetNSInfo{})) req := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ diff --git a/pkg/networkservice/mechanisms/memif/common.go b/pkg/networkservice/mechanisms/memif/common.go index 39079c64..dc39fea3 100644 --- a/pkg/networkservice/mechanisms/memif/common.go +++ b/pkg/networkservice/mechanisms/memif/common.go @@ -25,7 +25,6 @@ import ( "os" "path/filepath" "runtime" - "sync" "time" "git.fd.io/govpp.git/api" @@ -50,13 +49,14 @@ type vppConnection struct { api.Connection } -var ( +// NetNSInfo contains shared info for server and client +type NetNSInfo struct { netNS netns.NsHandle netNSPath string - once sync.Once -) +} -func setupNetNS() { +// NewNetNSInfo should be called only once for single chain +func NewNetNSInfo() NetNSInfo { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -64,15 +64,21 @@ func setupNetNS() { if err != nil { panic("failed to open '/proc/thread-self/ns/net': " + err.Error()) } - netNSPath = fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd) + netNSPath := fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd) - if netNS, err = netns.GetFromPath(netNSPath); err != nil { + netNS, err := netns.GetFromPath(netNSPath) + if err != nil { panic("failed to get current net NS: " + err.Error()) } + + return NetNSInfo{ + netNSPath: netNSPath, + netNS: netNS, + } } -func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn *vppConnection, isClient bool) (socketID uint32, err error) { - namespace, err := getNamespace(mechanism, vppConn) +func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn *vppConnection, isClient bool, netNS netns.NsHandle) (socketID uint32, err error) { + namespace, err := getNamespace(mechanism, vppConn, netNS) if err != nil { return 0, err } @@ -192,7 +198,7 @@ func deleteMemif(ctx context.Context, vppConn api.Connection, isClient bool) err return nil } -func create(ctx context.Context, conn *networkservice.Connection, vppConn *vppConnection, isClient bool) error { +func create(ctx context.Context, conn *networkservice.Connection, vppConn *vppConnection, isClient bool, netNS netns.NsHandle) error { if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil { // This connection has already been created if _, ok := ifindex.Load(ctx, isClient); ok { @@ -205,7 +211,7 @@ func create(ctx context.Context, conn *networkservice.Connection, vppConn *vppCo if conn.GetPayload() == payload.Ethernet { mode = memif.MEMIF_MODE_API_ETHERNET } - socketID, err := createMemifSocket(ctx, mechanism, vppConn, isClient) + socketID, err := createMemifSocket(ctx, mechanism, vppConn, isClient, netNS) if err != nil { return err } @@ -232,7 +238,7 @@ func socketFile(conn *networkservice.Connection) string { return "@" + filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket") } -func getNamespace(mechanism *memifMech.Mechanism, vppConn *vppConnection) (string, error) { +func getNamespace(mechanism *memifMech.Mechanism, vppConn *vppConnection, netNS netns.NsHandle) (string, error) { u, err := url.Parse(mechanism.GetNetNSURL()) if err != nil { return "", errors.Wrapf(err, "not a valid url %s", mechanism.GetNetNSURL()) diff --git a/pkg/networkservice/mechanisms/memif/server.go b/pkg/networkservice/mechanisms/memif/server.go index dbba8949..fc3447c7 100644 --- a/pkg/networkservice/mechanisms/memif/server.go +++ b/pkg/networkservice/mechanisms/memif/server.go @@ -40,12 +40,11 @@ import ( type memifServer struct { vppConn *vppConnection changeNetNS bool + nsInfo NetNSInfo } // NewServer provides a NetworkServiceServer chain elements that support the memif Mechanism -func NewServer(chainCtx context.Context, vppConn api.Connection, options ...Option) networkservice.NetworkServiceServer { - once.Do(setupNetNS) - +func NewServer(chainCtx context.Context, vppConn api.Connection, nsInfo NetNSInfo, options ...Option) networkservice.NetworkServiceServer { opts := new(memifOptions) for _, o := range options { o(opts) @@ -64,6 +63,7 @@ func NewServer(chainCtx context.Context, vppConn api.Connection, options ...Opti Connection: vppConn, }, changeNetNS: opts.changeNetNS, + nsInfo: nsInfo, }, ) } @@ -72,7 +72,7 @@ func (m *memifServer) Request(ctx context.Context, request *networkservice.Netwo postponeCtxFunc := postpone.ContextWithValues(ctx) if mechanism := memif.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil && !m.changeNetNS { - mechanism.SetNetNSURL((&url.URL{Scheme: memif.FileScheme, Path: netNSPath}).String()) + mechanism.SetNetNSURL((&url.URL{Scheme: memif.FileScheme, Path: m.nsInfo.netNSPath}).String()) } conn, err := next.Server(ctx).Request(ctx, request) @@ -85,7 +85,7 @@ func (m *memifServer) Request(ctx context.Context, request *networkservice.Netwo return conn, nil } - if err = create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil { + if err = create(ctx, conn, m.vppConn, metadata.IsClient(m), m.nsInfo.netNS); err != nil { closeCtx, cancelClose := postponeCtxFunc() defer cancelClose() From 78e713daa17a28734fd2cabbb7b359b1759e23a8 Mon Sep 17 00:00:00 2001 From: Mikhail Avramenko Date: Mon, 20 Dec 2021 16:46:33 +0700 Subject: [PATCH 6/6] refactoring for not using shared variables Signed-off-by: Mikhail Avramenko --- pkg/networkservice/chains/forwarder/server.go | 5 ++--- pkg/networkservice/mechanisms/memif/client.go | 4 ++-- pkg/networkservice/mechanisms/memif/client_test.go | 4 ++-- pkg/networkservice/mechanisms/memif/common.go | 2 +- pkg/networkservice/mechanisms/memif/server.go | 4 ++-- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/networkservice/chains/forwarder/server.go b/pkg/networkservice/chains/forwarder/server.go index d18e4cb4..d9c5cc58 100644 --- a/pkg/networkservice/chains/forwarder/server.go +++ b/pkg/networkservice/chains/forwarder/server.go @@ -79,7 +79,6 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw ) nsClient := registryclient.NewNetworkServiceRegistryClient(ctx, clientURL, registryclient.WithDialOptions(clientDialOptions...)) - netNsInfo := memif.NewNetNSInfo() rv := &xconnectNSServer{} additionalFunctionality := []networkservice.NetworkServiceServer{ recvfd.NewServer(), @@ -94,7 +93,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw tag.NewServer(ctx, vppConn), mtu.NewServer(vppConn), mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ - memif.MECHANISM: memif.NewServer(ctx, vppConn, netNsInfo, + memif.MECHANISM: memif.NewServer(ctx, vppConn, memif.WithDirectMemif(), memif.WithChangeNetNS(), memif.WithExternalVPP()), @@ -117,7 +116,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw mtu.NewClient(vppConn), tag.NewClient(ctx, vppConn), // mechanisms - memif.NewClient(vppConn, netNsInfo, + memif.NewClient(vppConn, memif.WithChangeNetNS(), memif.WithExternalVPP(), ), diff --git a/pkg/networkservice/mechanisms/memif/client.go b/pkg/networkservice/mechanisms/memif/client.go index bcc2da86..21be5c73 100644 --- a/pkg/networkservice/mechanisms/memif/client.go +++ b/pkg/networkservice/mechanisms/memif/client.go @@ -45,7 +45,7 @@ type memifClient struct { } // NewClient provides a NetworkServiceClient chain elements that support the memif Mechanism -func NewClient(vppConn api.Connection, nsInfo NetNSInfo, options ...Option) networkservice.NetworkServiceClient { +func NewClient(vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient { opts := &memifOptions{} for _, o := range options { o(opts) @@ -58,7 +58,7 @@ func NewClient(vppConn api.Connection, nsInfo NetNSInfo, options ...Option) netw Connection: vppConn, }, changeNetNs: opts.changeNetNS, - nsInfo: nsInfo, + nsInfo: newNetNSInfo(), }, ) } diff --git a/pkg/networkservice/mechanisms/memif/client_test.go b/pkg/networkservice/mechanisms/memif/client_test.go index e4156fee..a2c52d81 100644 --- a/pkg/networkservice/mechanisms/memif/client_test.go +++ b/pkg/networkservice/mechanisms/memif/client_test.go @@ -33,7 +33,7 @@ import ( ) func Test_MemifClient_ShouldAppendMechanismIfMemifMechanismMissed(t *testing.T) { - c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil, memif.NetNSInfo{})) + c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil)) req := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{}, @@ -55,7 +55,7 @@ func Test_MemifClient_ShouldAppendMechanismIfMemifMechanismMissed(t *testing.T) } func Test_MemifClient_ShouldNotDuplicateMechanisms(t *testing.T) { - c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil, memif.NetNSInfo{})) + c := chain.NewNetworkServiceClient(metadata.NewClient(), memif.NewClient(nil)) req := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ diff --git a/pkg/networkservice/mechanisms/memif/common.go b/pkg/networkservice/mechanisms/memif/common.go index dc39fea3..a093619e 100644 --- a/pkg/networkservice/mechanisms/memif/common.go +++ b/pkg/networkservice/mechanisms/memif/common.go @@ -56,7 +56,7 @@ type NetNSInfo struct { } // NewNetNSInfo should be called only once for single chain -func NewNetNSInfo() NetNSInfo { +func newNetNSInfo() NetNSInfo { runtime.LockOSThread() defer runtime.UnlockOSThread() diff --git a/pkg/networkservice/mechanisms/memif/server.go b/pkg/networkservice/mechanisms/memif/server.go index fc3447c7..63916f39 100644 --- a/pkg/networkservice/mechanisms/memif/server.go +++ b/pkg/networkservice/mechanisms/memif/server.go @@ -44,7 +44,7 @@ type memifServer struct { } // NewServer provides a NetworkServiceServer chain elements that support the memif Mechanism -func NewServer(chainCtx context.Context, vppConn api.Connection, nsInfo NetNSInfo, options ...Option) networkservice.NetworkServiceServer { +func NewServer(chainCtx context.Context, vppConn api.Connection, options ...Option) networkservice.NetworkServiceServer { opts := new(memifOptions) for _, o := range options { o(opts) @@ -63,7 +63,7 @@ func NewServer(chainCtx context.Context, vppConn api.Connection, nsInfo NetNSInf Connection: vppConn, }, changeNetNS: opts.changeNetNS, - nsInfo: nsInfo, + nsInfo: newNetNSInfo(), }, ) }