Skip to content

Commit

Permalink
Sync metadata (#94)
Browse files Browse the repository at this point in the history
* Added initial version of sync metadata, querable by API

* Added integration test

* Fixed linter issues
  • Loading branch information
glothriel authored Feb 12, 2025
1 parent 4320441 commit 692cc24
Show file tree
Hide file tree
Showing 19 changed files with 473 additions and 106 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ No body or query parameters are required.
|200 Ok | Returned when request was successful |
|500 Internal server error | Returned when the peers could not be fetched for unknown reasons. |


### GET /api/peers/v2

This endpoint is only available on the server. It returns the list of remote peers that are connected to the server.

#### Request

No body or query parameters are required.

#### Response

| Property | Required | Type | Description |
|:---------|:---------|:-----|:------------|
| **name** | yes | String | Name of the remote peer |
| **metadata** | yes | Object | Key/Value pairs, that were sent with the latest sync from the client |



| Code | Description |
|:-----|:------------|
|200 Ok | Returned when request was successful |
|500 Internal server error | Returned when the peers could not be fetched for unknown reasons. |


### DELETE /api/peers/v1/{name}

This endpoint is only available on the server. It allows removing a peer from the server. The peer will be disconnected and all the apps exposed by the peer will be removed.
Expand Down
5 changes: 3 additions & 2 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ for server in servers:
k8s_yaml(helm("./kubernetes/helm", namespace=server, set=[
"server.enabled=true",
"server.resources.limits.memory=2Gi",
"server.wg.publicHost=wormhole-server-chart.server.svc.cluster.local",
"server.wg.publicHost=wormhole-server.server.svc.cluster.local",
"server.service.type=ClusterIP",
"docker.image=wormhole-controller",
"docker.wgImage=wormhole-wireguard",
Expand All @@ -66,8 +66,9 @@ for client in clients:
k8s_yaml(helm("./kubernetes/helm", namespace=client, name=client, set=[
"client.enabled=true",
"client.name=" + client,
"client.serverDsn=http://wormhole-server-chart.server.svc.cluster.local:8080",
"client.serverDsn=http://wormhole-server.server.svc.cluster.local:8080",
"client.resources.limits.memory=2Gi",
"client.syncMetadata.metadata_key=metadata_value_"+client,
"docker.image=wormhole-controller",
"docker.wgImage=wormhole-wireguard",
"docker.nginxImage=wormhole-nginx",
Expand Down
1 change: 1 addition & 0 deletions kubernetes/helm/templates/client-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ metadata:
namespace: {{ $.Release.Namespace }}
stringData:
INVITE_TOKEN: {{ quote .Values.peering.psk }}
CLIENT_METADATA: {{ $.Values.client.syncMetadata | toJson | quote }}

{{ end }}
2 changes: 1 addition & 1 deletion kubernetes/helm/templates/client-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rules:
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ template "name-client" . }}
name: {{ template "name-client" . }}-{{ $.Values.client.name }}
labels:
application: {{ template "name-client" . }}
subjects:
Expand Down
1 change: 1 addition & 0 deletions kubernetes/helm/templates/server-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ spec:
- '--wg-public-host={{ $.Values.server.wg.publicHost }}'
- '--wg-subnet-mask={{ $.Values.server.wg.subnetMask }}'
- '--peer-storage-db=/storage/peers.db'
- '--peer-metadata-storage-db=/storage/peers-metadata.db'
- '--key-storage-db=/storage/keys.db'


Expand Down
2 changes: 2 additions & 0 deletions kubernetes/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ client:
debug: false
name: ""

syncMetadata: {}

serverDsn: ""

priorityClassName: ""
Expand Down
42 changes: 39 additions & 3 deletions pkg/api/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package api
import (
"github.com/gin-gonic/gin"
"github.com/glothriel/wormhole/pkg/pairing"
"github.com/glothriel/wormhole/pkg/syncing"
"github.com/glothriel/wormhole/pkg/wg"
)

// PeerController is a controller for managing peers
type PeerController struct {
peers pairing.PeerStorage
wgConfig *wg.Config
watcher *wg.Watcher
peers pairing.PeerStorage
wgConfig *wg.Config
watcher *wg.Watcher

metadata syncing.MetadataStorage
enablePeerDeletion bool
}

Expand All @@ -31,6 +34,12 @@ func (p *PeerController) deletePeer(name string) error {
return nil
}

// PeersV2ListItem is a struct for the v2 peers list
type PeersV2ListItem struct {
Name string `json:"name"`
Metadata syncing.Metadata `json:"metadata"`
}

func (p *PeerController) registerRoutes(r *gin.Engine) {
r.GET("/api/peers/v1", func(c *gin.Context) {
peerList, err := p.peers.List()
Expand All @@ -47,6 +56,31 @@ func (p *PeerController) registerRoutes(r *gin.Engine) {
c.JSON(200, []string{})
})

r.GET("/api/peers/v2", func(c *gin.Context) {
peerList, err := p.peers.List()
if err != nil {
c.JSON(500, gin.H{
"error": err.Error(),
})
return
}
var peerListItems []PeersV2ListItem
for _, peer := range peerList {
metadata, err := p.metadata.Get(peer.Name)
if err != nil {
c.JSON(500, gin.H{
"error": err.Error(),
})
return
}
peerListItems = append(peerListItems, PeersV2ListItem{
Name: peer.Name,
Metadata: metadata,
})
}
c.JSON(200, peerListItems)
})

r.DELETE("/api/peers/v1/:name", func(c *gin.Context) {
if !p.enablePeerDeletion {
c.JSON(403, gin.H{
Expand Down Expand Up @@ -79,12 +113,14 @@ func NewPeersController(
peers pairing.PeerStorage,
wgConfig *wg.Config,
watcher *wg.Watcher,
metadata syncing.MetadataStorage,
settings ...PeerControllerSettings,
) Controller {
theController := &PeerController{
peers: peers,
wgConfig: wgConfig,
watcher: watcher,
metadata: metadata,
// We currently don't have authorization in place, disabling peer deletion
enablePeerDeletion: false,
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package cmd

import (
"encoding/json"
"time"

"github.com/glothriel/wormhole/pkg/api"
Expand Down Expand Up @@ -35,6 +36,7 @@ var clientCommand *cli.Command = &cli.Command{
kubernetesNamespaceFlag,
kubernetesLabelsFlag,
peerNameFlag,
clientMetadataFlag,
enableNetworkPoliciesFlag,
helloRetryIntervalFlag,
nginxExposerConfdPathFlag,
Expand Down Expand Up @@ -153,6 +155,7 @@ var clientCommand *cli.Command = &cli.Command{
),
),
pairingResponse,
syncing.NewStaticMetadataFactory(getClientMetadata(c)),
)
if scErr != nil {
logrus.Fatalf("Failed to create syncing client: %v", scErr)
Expand All @@ -172,3 +175,14 @@ var clientCommand *cli.Command = &cli.Command{
return sc.Start()
},
}

func getClientMetadata(c *cli.Context) syncing.Metadata {
metadata := syncing.Metadata{}
if c.String(clientMetadataFlag.Name) != "" {
unmarshalErr := json.Unmarshal([]byte(c.String(clientMetadataFlag.Name)), &metadata)
if unmarshalErr != nil {
logrus.Fatalf("Failed to unmarshal metadata: %v", unmarshalErr)
}
}
return metadata
}
12 changes: 12 additions & 0 deletions pkg/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ var peerStorageDBFlag *cli.StringFlag = &cli.StringFlag{
Value: "",
}

var peerMetadataStorageDBFlag *cli.StringFlag = &cli.StringFlag{
Name: "peer-metadata-storage-db",
Value: "",
}

var clientMetadataFlag *cli.StringFlag = &cli.StringFlag{
Name: "client-metadata",
Value: "{}",
EnvVars: []string{"CLIENT_METADATA"},
Usage: "JSON-formatted metadata to send to the server with every sync request",
}

var peerControllerEnableDeletionFlag *cli.BoolFlag = &cli.BoolFlag{
Name: "peer-controller-enable-deletion",
Value: false,
Expand Down
6 changes: 5 additions & 1 deletion pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var serverCommand *cli.Command = &cli.Command{
kubernetesLabelsFlag,
enableNetworkPoliciesFlag,
peerStorageDBFlag,
peerMetadataStorageDBFlag,
peerControllerEnableDeletionFlag,
peerNameFlag,
wgAddressFlag,
Expand Down Expand Up @@ -148,13 +149,16 @@ var serverCommand *cli.Command = &cli.Command{
syncing.NewPeerEnrichingAppSource("server", appsExposedHere),
)

metadataStorage := getPeerMetadataStorage(c)

ss := syncing.NewServer(
c.String(peerNameFlag.Name),
remoteNginxAdapter,
appSource,
syncing.NewJSONSyncingEncoder(),
syncTransport,
peerStorage,
metadataStorage,
)
watcher := wg.NewWatcher(c.String(wireguardConfigFilePathFlag.Name))
updateErr := watcher.Update(*wgConfig)
Expand Down Expand Up @@ -196,7 +200,7 @@ var serverCommand *cli.Command = &cli.Command{
}
err := api.NewAdminAPI([]api.Controller{
api.NewAppsController(appsExposedFromRemote),
api.NewPeersController(peerStorage, wgConfig, watcher, peerControllerSettings...),
api.NewPeersController(peerStorage, wgConfig, watcher, metadataStorage, peerControllerSettings...),
}, c.Bool(debugFlag.Name)).Run(":8082")
if err != nil {
logrus.Fatalf("Failed to start admin API: %v", err)
Expand Down
16 changes: 16 additions & 0 deletions pkg/cmd/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cmd

import (
"github.com/glothriel/wormhole/pkg/pairing"
"github.com/glothriel/wormhole/pkg/syncing"
"github.com/glothriel/wormhole/pkg/wg"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

Expand All @@ -19,3 +21,17 @@ func getKeyStorage(c *cli.Context) wg.KeyStorage {
}
return wg.NewBoltKeyStorage(c.String(keyStorageDBFlag.Name))
}

func getPeerMetadataStorage(c *cli.Context) syncing.MetadataStorage {
theStorage := syncing.NewInMemoryMetadataStorage()
if c.String(peerMetadataStorageDBFlag.Name) != "" {
boltStorage, boltMetadataStorage := syncing.NewBoltMetadataStorage(c.String(peerMetadataStorageDBFlag.Name))
if boltMetadataStorage != nil {
logrus.Fatalf("Failed to create metadata storage: %v", boltMetadataStorage)
}
theStorage = syncing.NewCachingMetadataStorage(
boltStorage,
)
}
return theStorage
}
15 changes: 13 additions & 2 deletions pkg/syncing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Client struct {
apps AppSource
transport ClientTransport
failureThreshold int
metadata MetadataFactory
}

// Start starts the syncing client
Expand All @@ -31,9 +32,15 @@ func (c *Client) Start() error {
logrus.Errorf("failed to list apps: %v", listErr)
continue
}
metadata, metadataErr := c.metadata.Get()
if metadataErr != nil {
logrus.Errorf("failed to get metadata: %v", metadataErr)
continue
}
encodedApps, encodeErr := c.encoder.Encode(Message{
Peer: c.myName,
Apps: apps,
Peer: c.myName,
Metadata: metadata,
Apps: apps,
})
if encodeErr != nil {
logrus.Errorf("failed to encode apps: %v", encodeErr)
Expand Down Expand Up @@ -69,6 +76,7 @@ func NewClient(
interval time.Duration,
apps AppSource,
transport ClientTransport,
MetadataFactory MetadataFactory,
) *Client {
return &Client{
myName: myName,
Expand All @@ -78,6 +86,7 @@ func NewClient(
apps: apps,
transport: transport,
failureThreshold: 3,
metadata: MetadataFactory,
}
}

Expand All @@ -89,6 +98,7 @@ func NewHTTPClient(
interval time.Duration,
apps AppSource,
pr pairing.Response,
metadata MetadataFactory,

) (*Client, error) {
syncServerAddress, ok := pr.Metadata["sync_server_address"]
Expand All @@ -103,5 +113,6 @@ func NewHTTPClient(
interval,
apps,
transport,
metadata,
), nil
}
1 change: 1 addition & 0 deletions pkg/syncing/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestClientStartFailsAfterXSyncFailures(t *testing.T) {
&mockSyncClientTransport{
returnError: fmt.Errorf("sync failed"),
},
NewStaticMetadataFactory(Metadata{}),
)

// when
Expand Down
5 changes: 3 additions & 2 deletions pkg/syncing/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (

// Message is a message that contains a list of apps and the peer that sent them
type Message struct {
Peer string
Apps []apps.App
Peer string
Metadata Metadata
Apps []apps.App
}

// Encoder is an interface for encoding and decoding syncing messages
Expand Down
Loading

0 comments on commit 692cc24

Please sign in to comment.