diff --git a/README.md b/README.md index 8240417..9af3a3d 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,14 @@ tilt up First start of wormhole will be really slow - it compiles the go code inside the container. Subsequent starts will be faster, as the go build cache is preserved in PVC. +The development environment deploys a server, two clients and a mock service, that you can use to test the tunnels. + +``` +kubectl annotate --overwrite svc --namespace nginx nginx wormhole.glothriel.github.com/exposed=yes +``` + +The additional services should be immediately created. Please note, that all three workloads are deployed on the same (and by extension are monitoring the same services for annotations), so the nginx will be exposed 4 times - client1 to server, client2 to server, server to client1 and server to client2. + ### Integration tests ``` diff --git a/pkg/cmd/client.go b/pkg/cmd/client.go index f8c770a..066c648 100644 --- a/pkg/cmd/client.go +++ b/pkg/cmd/client.go @@ -45,18 +45,12 @@ var joinCommand *cli.Command = &cli.Command{ } startPrometheusServer(c) - localListenerRegistry := listeners.NewApps(nginx.NewNginxExposer( - c.String(nginxExposerConfdPathFlag.Name), - "local", - nginx.NewDefaultReloader(), - nginx.NewRangePortAllocator(20000, 25000), - )) - remoteNginxExposer := nginx.NewNginxExposer( c.String(nginxExposerConfdPathFlag.Name), "remote", nginx.NewDefaultReloader(), nginx.NewRangePortAllocator(25001, 30000), + nginx.NewAllAcceptWireguardListener(), ) var effectiveExposer listeners.Exposer = remoteNginxExposer @@ -113,6 +107,13 @@ var joinCommand *cli.Command = &cli.Command{ } break } + localListenerRegistry := listeners.NewApps(nginx.NewNginxExposer( + c.String(nginxExposerConfdPathFlag.Name), + "local", + nginx.NewDefaultReloader(), + nginx.NewRangePortAllocator(20000, 25000), + nginx.NewOnlyGivenAddressListener(pairingResponse.AssignedIP), + )) logrus.Infof("Paired with server, assigned IP: %s", pairingResponse.AssignedIP) go localListenerRegistry.Watch(getAppStateChangeGenerator(c).Changes(), make(chan bool)) diff --git a/pkg/cmd/server.go b/pkg/cmd/server.go index 3b9d88e..8859cf9 100644 --- a/pkg/cmd/server.go +++ b/pkg/cmd/server.go @@ -81,6 +81,7 @@ var listenCommand *cli.Command = &cli.Command{ "local", nginx.NewDefaultReloader(), nginx.NewRangePortAllocator(20000, 25000), + nginx.NewOnlyGivenAddressListener(c.String(wgAddressFlag.Name)), )) remoteNginxExposer := nginx.NewNginxExposer( @@ -88,6 +89,7 @@ var listenCommand *cli.Command = &cli.Command{ "remote", nginx.NewDefaultReloader(), nginx.NewRangePortAllocator(25001, 30000), + nginx.NewAllAcceptWireguardListener(), ) var effectiveExposer listeners.Exposer = remoteNginxExposer diff --git a/pkg/nginx/exposer.go b/pkg/nginx/exposer.go index 6946197..81f17c7 100644 --- a/pkg/nginx/exposer.go +++ b/pkg/nginx/exposer.go @@ -15,9 +15,10 @@ import ( // Exposer is an Exposer implementation that uses NGINX as a proxy server type Exposer struct { - prefix string - path string - fs afero.Fs + prefix string + path string + fs afero.Fs + listener Listener reloader Reloader ports PortAllocator @@ -35,17 +36,24 @@ func (n *Exposer) Add(app peers.App) (peers.App, error) { File: nginxConfigPath(n.prefix, app), App: app, } - + listenBlock := "" + listenAddrs, addrsErr := n.listener.Addrs(port) + if addrsErr != nil { + return peers.App{}, fmt.Errorf("Could not get listener addresses: %v", addrsErr) + } + for _, addr := range listenAddrs { + listenBlock += fmt.Sprintf(" listen %s;\n", addr) + } if writeErr := afero.WriteFile(n.fs, path.Join(n.path, server.File), []byte(fmt.Sprintf(` # [%s] %s server { - listen %d; +%s proxy_pass %s; } `, server.App.Peer, server.App.Name, - server.ListenPort, + listenBlock, server.ProxyPass, )), 0644); writeErr != nil { logrus.Errorf("Could not write NGINX config file: %v", writeErr) @@ -117,7 +125,9 @@ func (n *Exposer) WithdrawAll() error { } // NewNginxExposer creates a new NGINX exposer -func NewNginxExposer(path, confPrefix string, reloader Reloader, allocator PortAllocator) listeners.Exposer { +func NewNginxExposer( + path, confPrefix string, reloader Reloader, allocator PortAllocator, listener Listener, +) listeners.Exposer { fs := afero.NewOsFs() cg := &Exposer{ path: path, @@ -126,6 +136,7 @@ func NewNginxExposer(path, confPrefix string, reloader Reloader, allocator PortA reloader: reloader, ports: allocator, + listener: listener, } createErr := fs.MkdirAll(path, 0755) if createErr != nil && createErr != afero.ErrDestinationExists { diff --git a/pkg/nginx/listener.go b/pkg/nginx/listener.go new file mode 100644 index 0000000..111a6d1 --- /dev/null +++ b/pkg/nginx/listener.go @@ -0,0 +1,115 @@ +package nginx + +import ( + "errors" + "fmt" + "net" + "strings" +) + +const wg0InterfaceName = "wg0" + +// Listener is an interface for NGINX listeners +type Listener interface { + // Addrs returns a list of addresses that the listener is listening on + Addrs(portNumber int) ([]string, error) +} + +type networkInterface struct { + name string + addresses []string +} + +type networkInterfaceLister interface { + Interfaces() ([]networkInterface, error) +} + +type allAcceptWg0Listener struct { + lister networkInterfaceLister +} + +// Addrs implements Listener +func (a *allAcceptWg0Listener) Addrs(portNumber int) ([]string, error) { + interfaces, interfacesErr := a.lister.Interfaces() + if interfacesErr != nil { + return []string{}, interfacesErr + } + + var allAddrs []string + + for _, iface := range interfaces { + if iface.name == wg0InterfaceName { + continue + } + + for _, addr := range iface.addresses { + // Ignore IPv6 + if !strings.Contains(addr, "::") { + allAddrs = append(allAddrs, fmt.Sprintf("%s:%d", addr, portNumber)) + } + } + } + + if len(allAddrs) == 0 { + return []string{}, errors.New("No network interfaces matching conditions found") + } + + return allAddrs, nil +} + +type givenAddressOnlyListener struct { + address string +} + +// Addrs implements Listener +func (l *givenAddressOnlyListener) Addrs(portNumber int) ([]string, error) { + return []string{ + fmt.Sprintf("%s:%d", l.address, portNumber), + }, nil +} + +// NewAllAcceptWireguardListener creates a new Listener that listens on all interfaces accept wg0 +func NewAllAcceptWireguardListener() Listener { + return &allAcceptWg0Listener{ + lister: &defaultNetworkInterfaceLister{}, + } +} + +// NewOnlyGivenAddressListener creates a new Listener that listens only on given address +func NewOnlyGivenAddressListener(address string) Listener { + return &givenAddressOnlyListener{ + address: address, + } +} + +type defaultNetworkInterfaceLister struct { +} + +// Interfaces implements networkinterfacelister +func (l *defaultNetworkInterfaceLister) Interfaces() ([]networkInterface, error) { + ifaces, err := net.Interfaces() + if err != nil { + return nil, err + } + var result []networkInterface + for _, iface := range ifaces { + netAddrs, err := iface.Addrs() + if err != nil { + return nil, err + } + var addrs []string + for _, addr := range netAddrs { + switch v := addr.(type) { + case *net.IPNet: + addrs = append(addrs, v.IP.String()) + case *net.IPAddr: + addrs = append(addrs, v.IP.String()) + } + } + result = append(result, networkInterface{ + name: iface.Name, + addresses: addrs, + }) + } + return result, nil +} diff --git a/pkg/nginx/listener_test.go b/pkg/nginx/listener_test.go new file mode 100644 index 0000000..a4d7dd9 --- /dev/null +++ b/pkg/nginx/listener_test.go @@ -0,0 +1,102 @@ +package nginx + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +type mockLister struct { + interfaces []networkInterface + err error +} + +func (m *mockLister) Interfaces() ([]networkInterface, error) { + return m.interfaces, m.err +} + +func TestAllAcceptWgListener(t *testing.T) { + // given + lister := &mockLister{ + interfaces: []networkInterface{ + { + name: "eth0", + addresses: []string{"127.0.0.1"}, + }, + { + name: "wg0", + addresses: []string{"10.178.2.1"}, + }, + }, + } + listenerIf := NewAllAcceptWireguardListener() + listener := listenerIf.(*allAcceptWg0Listener) + listener.lister = lister + + // when + addrs, err := listener.Addrs(80) + + // then + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"127.0.0.1:80"}, addrs) +} +func TestAllAcceptWgListenerErrors(t *testing.T) { + tests := []struct { + name string + interfaces []networkInterface + err error + expected []string + }{ + { + name: "Nonempty interface list with error", + interfaces: []networkInterface{ + + { + name: "eth0", + addresses: []string{"127.0.0.1"}, + }, + { + name: "wg0", + addresses: []string{"10.178.2.1"}, + }, + }, + err: errors.New("Blabla"), + expected: nil, + }, + { + name: "Empty interface list without error", + interfaces: []networkInterface{}, + err: nil, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lister := &mockLister{ + interfaces: tt.interfaces, + err: tt.err, + } + listenerIf := NewAllAcceptWireguardListener() + listener := listenerIf.(*allAcceptWg0Listener) + listener.lister = lister + + _, err := listener.Addrs(80) + + assert.Error(t, err) + }) + } +} + +func TestGivenAddressOnlyListener(t *testing.T) { + // given + listener := NewOnlyGivenAddressListener("127.0.0.1") + + // when + addrs, err := listener.Addrs(80) + + // then + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"127.0.0.1:80"}, addrs) +} diff --git a/tests/conftest.py b/tests/conftest.py index 7cfb1c5..96a2df2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -235,6 +235,7 @@ def k8s_server( { "server.enabled": True, "server.wg.publicHost": "wormhole-server-server.server.svc.cluster.local", + "server.service.type": "ClusterIP", "docker.image": wormhole_image.split(":")[0], "docker.version": wormhole_image.split(":")[1], "docker.wgImage": wireguard_image.split(":")[0], diff --git a/tests/fixtures.py b/tests/fixtures.py index 9633a8a..4501b4d 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -465,3 +465,14 @@ def do(self, key, value): "--overwrite" ] ) + + +class Services: + + @classmethod + def count(cls, kubectl, namespace): + return len(kubectl.json(["get", "svc", "-n", namespace])["items"]) + + @classmethod + def names(cls, kubectl, namespace): + return [item["metadata"]["name"] for item in kubectl.json(["get", "svc", "-n", namespace])["items"]] diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 1996aa9..364c360 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -1,7 +1,7 @@ import pytest from retry import retry -from .fixtures import Annotator +from .fixtures import Annotator, Services def test_changing_annotation_causes_creating_proxy_service( @@ -12,15 +12,13 @@ def test_changing_annotation_causes_creating_proxy_service( ): annotator = Annotator(mock_server, kubectl) - amount_of_services_before_annotation = len( - kubectl.json(["-n", "server", "get", "svc"])["items"] - ) + amount_of_services_before_annotation = Services.count(kubectl, "server") annotator.do("wormhole.glothriel.github.com/exposed", "yes") @retry(tries=60, delay=1) def _ensure_that_proxied_service_is_created(): assert ( - len(kubectl.json(["-n", "server", "get", "svc"])["items"]) + Services.count(kubectl, "server") == amount_of_services_before_annotation + 1 ) _ensure_that_proxied_service_is_created() @@ -29,7 +27,7 @@ def _ensure_that_proxied_service_is_created(): @retry(tries=60, delay=1) def _ensure_that_proxied_service_is_deleted(): assert ( - len(kubectl.json(["-n", "server", "get", "svc"])["items"]) + Services.count(kubectl, "server") == amount_of_services_before_annotation ) @@ -48,12 +46,8 @@ def test_annotating_with_custom_name_correctly_sets_remote_name( @retry(tries=60, delay=1) def _ensure_that_proxied_service_is_created(): - assert 'client-huehue-one-two-three' in [ - svc['metadata']['name'] for svc in kubectl.json(["-n", "server", "get", "svc"])["items"] - ] - assert 'server-huehue-one-two-three' in [ - svc['metadata']['name'] for svc in kubectl.json(["-n", "client", "get", "svc"])["items"] - ] + assert 'client-huehue-one-two-three' in Services.names(kubectl, namespace="server") + assert 'server-huehue-one-two-three' in Services.names(kubectl, namespace="client") _ensure_that_proxied_service_is_created() @@ -61,12 +55,8 @@ def _ensure_that_proxied_service_is_created(): @retry(tries=60, delay=1) def _ensure_that_proxied_service_is_deleted(): - assert 'client-huehue-one-two-three' not in [ - svc['metadata']['name'] for svc in kubectl.json(["-n", "server", "get", "svc"])["items"] - ] - assert 'server-huehue-one-two-three' not in [ - svc['metadata']['name'] for svc in kubectl.json(["-n", "client", "get", "svc"])["items"] - ] + assert 'client-huehue-one-two-three' not in Services.names(kubectl, namespace="server") + assert 'server-huehue-one-two-three' not in Services.names(kubectl, namespace="client") _ensure_that_proxied_service_is_deleted() @@ -201,4 +191,42 @@ def _ensure_that_proxied_service_is_deleted(): if 'client-custom' in [ svc['metadata']['name'] for svc in kubectl.json(["-n", "server", "get", "svc"])["items"] ]: - pytest.skip("The orphaned service should be removed, but it's not critical, so skipping for now") + pytest.skip( + "The orphaned service should be removed, but it's not critical, so skipping for now" + ) + + +def test_connection_via_the_tunnel( + kubectl, + k8s_server, + k8s_client, + mock_server, +): + + annotator = Annotator(mock_server, kubectl) + amount_of_services_before_annotation = Services.count(kubectl, "server") + annotator.do("wormhole.glothriel.github.com/exposed", "yes") + + @retry(tries=60, delay=1) + def _ensure_that_proxied_service_is_created(): + assert ( + Services.count(kubectl, "server") + == amount_of_services_before_annotation + 1 + ) + _ensure_that_proxied_service_is_created() + + @retry(tries=60, delay=1) + def _ensure_that_proxied_service_is_reachable(): + kubectl.run( + [ + '-n', + 'nginx', + 'exec', + 'deployment/nginx', + '--', + 'curl', + 'server-nginx-nginx.client.svc.cluster.local' + ] + ) + + _ensure_that_proxied_service_is_reachable()