Skip to content

Commit e338ca7

Browse files
authored
Merge pull request #295 from stgraber/keepalive
Add keepalive support in the client
2 parents 82231b1 + d824898 commit e338ca7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3993
-2996
lines changed

cmd/incus/remote.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"net/url"
1111
"os"
12+
"runtime"
1213
"sort"
1314
"strings"
1415

@@ -48,6 +49,12 @@ func (c *cmdRemote) Command() *cobra.Command {
4849
remoteListCmd := cmdRemoteList{global: c.global, remote: c}
4950
cmd.AddCommand(remoteListCmd.Command())
5051

52+
if runtime.GOOS != "windows" {
53+
// Proxy
54+
remoteProxyCmd := cmdRemoteProxy{global: c.global, remote: c}
55+
cmd.AddCommand(remoteProxyCmd.Command())
56+
}
57+
5158
// Rename
5259
remoteRenameCmd := cmdRemoteRename{global: c.global, remote: c}
5360
cmd.AddCommand(remoteRenameCmd.Command())

cmd/incus/remote_unix.go

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
//go:build !windows
2+
3+
package main
4+
5+
import (
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"net"
10+
"net/http"
11+
"net/http/httputil"
12+
"net/url"
13+
"os"
14+
"strings"
15+
"sync"
16+
"time"
17+
18+
"github.com/spf13/cobra"
19+
20+
"github.com/lxc/incus/client"
21+
cli "github.com/lxc/incus/internal/cmd"
22+
"github.com/lxc/incus/internal/i18n"
23+
"github.com/lxc/incus/shared/api"
24+
)
25+
26+
type cmdRemoteProxy struct {
27+
global *cmdGlobal
28+
remote *cmdRemote
29+
30+
flagTimeout int
31+
}
32+
33+
func (c *cmdRemoteProxy) Command() *cobra.Command {
34+
cmd := &cobra.Command{}
35+
cmd.Use = usage("proxy", i18n.G("<remote>: <path>"))
36+
cmd.Short = i18n.G("Run a local API proxy")
37+
cmd.Long = cli.FormatSection(i18n.G("Description"), i18n.G(
38+
`Run a local API proxy for the remote`))
39+
40+
cmd.RunE = c.Run
41+
42+
cmd.Flags().IntVar(&c.flagTimeout, "timeout", 0, i18n.G("Proxy timeout (exits when no connections)")+"``")
43+
44+
return cmd
45+
}
46+
47+
func (c *cmdRemoteProxy) Run(cmd *cobra.Command, args []string) error {
48+
// Quick checks.
49+
exit, err := c.global.CheckArgs(cmd, args, 2, 2)
50+
if exit {
51+
return err
52+
}
53+
54+
// Detect remote name.
55+
remoteName := args[0]
56+
if !strings.HasSuffix(remoteName, ":") {
57+
remoteName = remoteName + ":"
58+
}
59+
60+
path := args[1]
61+
62+
remote := c.global.conf.Remotes[strings.TrimSuffix(remoteName, ":")]
63+
remote.KeepAlive = 0
64+
c.global.conf.Remotes[strings.TrimSuffix(remoteName, ":")] = remote
65+
66+
resources, err := c.global.ParseServers(remoteName)
67+
if err != nil {
68+
return err
69+
}
70+
71+
s := resources[0].server
72+
73+
// Create proxy socket.
74+
err = os.Remove(path)
75+
if err != nil && !errors.Is(err, os.ErrNotExist) {
76+
return fmt.Errorf("Failed to delete pre-existing unix socket: %w", err)
77+
}
78+
79+
unixAddr, err := net.ResolveUnixAddr("unix", path)
80+
if err != nil {
81+
return fmt.Errorf("Unable to resolve unix socket: %w", err)
82+
}
83+
84+
server, err := net.ListenUnix("unix", unixAddr)
85+
if err != nil {
86+
return fmt.Errorf("Unable to setup unix socket: %w", err)
87+
}
88+
89+
err = os.Chmod(path, 0600)
90+
if err != nil {
91+
return fmt.Errorf("Unable to set socket permissions: %w", err)
92+
}
93+
94+
// Get the connection info.
95+
info, err := s.GetConnectionInfo()
96+
if err != nil {
97+
return err
98+
}
99+
100+
uri, err := url.Parse(info.URL)
101+
if err != nil {
102+
return err
103+
}
104+
105+
// Enable keep-alive for proxied connections.
106+
httpClient, err := s.GetHTTPClient()
107+
if err != nil {
108+
return err
109+
}
110+
111+
httpTransport, ok := httpClient.Transport.(*http.Transport)
112+
if ok {
113+
httpTransport.DisableKeepAlives = false
114+
}
115+
116+
// Get server info.
117+
api10, api10Etag, err := s.GetServer()
118+
if err != nil {
119+
return err
120+
}
121+
122+
// Handle inbound connections.
123+
transport := remoteProxyTransport{
124+
s: s,
125+
baseURL: uri,
126+
}
127+
128+
connections := uint64(0)
129+
transactions := uint64(0)
130+
131+
handler := remoteProxyHandler{
132+
s: s,
133+
transport: transport,
134+
api10: api10,
135+
api10Etag: api10Etag,
136+
137+
mu: &sync.RWMutex{},
138+
connections: &connections,
139+
transactions: &transactions,
140+
}
141+
142+
// Handle the timeout.
143+
if c.flagTimeout > 0 {
144+
go func() {
145+
for {
146+
time.Sleep(time.Duration(c.flagTimeout) * time.Second)
147+
148+
// Check for active connections.
149+
handler.mu.RLock()
150+
if *handler.connections > 0 {
151+
handler.mu.RUnlock()
152+
continue
153+
}
154+
155+
// Look for recent activity
156+
oldCount := uint64(*handler.transactions)
157+
handler.mu.RUnlock()
158+
159+
time.Sleep(5 * time.Second)
160+
161+
handler.mu.RLock()
162+
if oldCount == *handler.transactions {
163+
handler.mu.RUnlock()
164+
165+
// Daemon has been inactive for 10s, exit.
166+
os.Exit(0)
167+
}
168+
169+
handler.mu.RUnlock()
170+
}
171+
}()
172+
}
173+
174+
// Start the server.
175+
err = http.Serve(server, handler)
176+
if err != nil {
177+
return err
178+
}
179+
180+
return nil
181+
}
182+
183+
type remoteProxyTransport struct {
184+
s incus.InstanceServer
185+
186+
baseURL *url.URL
187+
}
188+
189+
func (t remoteProxyTransport) RoundTrip(r *http.Request) (*http.Response, error) {
190+
// Fix the request.
191+
r.URL.Scheme = t.baseURL.Scheme
192+
r.URL.Host = t.baseURL.Host
193+
r.RequestURI = ""
194+
195+
return t.s.DoHTTP(r)
196+
}
197+
198+
type remoteProxyHandler struct {
199+
s incus.InstanceServer
200+
transport http.RoundTripper
201+
202+
mu *sync.RWMutex
203+
connections *uint64
204+
transactions *uint64
205+
206+
api10 *api.Server
207+
api10Etag string
208+
}
209+
210+
func (h remoteProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
211+
// Increase counters.
212+
defer func() {
213+
h.mu.Lock()
214+
*h.connections -= 1
215+
h.mu.Unlock()
216+
}()
217+
218+
h.mu.Lock()
219+
*h.transactions += 1
220+
*h.connections += 1
221+
h.mu.Unlock()
222+
223+
// Handle /1.0 internally (saves a round-trip).
224+
if r.RequestURI == "/1.0" || strings.HasPrefix(r.RequestURI, "/1.0?project=") {
225+
// Parse query URL.
226+
values, err := url.ParseQuery(r.URL.RawQuery)
227+
if err != nil {
228+
return
229+
}
230+
231+
// Update project name to match.
232+
projectName := values.Get("project")
233+
if projectName == "" {
234+
projectName = api.ProjectDefaultName
235+
}
236+
237+
api10 := api.Server(*h.api10)
238+
api10.Environment.Project = projectName
239+
240+
// Set the request headers.
241+
w.Header().Set("Content-Type", "application/json")
242+
w.Header().Set("ETag", h.api10Etag)
243+
w.WriteHeader(http.StatusOK)
244+
245+
// Generate a body from the cached data.
246+
serverBody, err := json.Marshal(api10)
247+
if err != nil {
248+
return
249+
}
250+
251+
apiResponse := api.Response{
252+
Type: "sync",
253+
Status: "success",
254+
StatusCode: 200,
255+
Metadata: serverBody,
256+
}
257+
258+
body, err := json.Marshal(apiResponse)
259+
if err != nil {
260+
return
261+
}
262+
263+
_, _ = w.Write(body)
264+
265+
return
266+
}
267+
268+
// Forward everything else.
269+
proxy := httputil.ReverseProxy{
270+
Transport: h.transport,
271+
Director: func(*http.Request) {},
272+
}
273+
274+
proxy.ServeHTTP(w, r)
275+
}

cmd/incus/remote_windows.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//go:build windows
2+
3+
package main
4+
5+
import (
6+
"github.com/spf13/cobra"
7+
)
8+
9+
type cmdRemoteProxy struct {
10+
global *cmdGlobal
11+
remote *cmdRemote
12+
}
13+
14+
func (c *cmdRemoteProxy) Command() *cobra.Command {
15+
return nil
16+
}

0 commit comments

Comments
 (0)