Skip to content

Commit 7f337db

Browse files
authored
feat(web): add file transfer support (#62)
1 parent a2e0b42 commit 7f337db

File tree

15 files changed

+1540
-761
lines changed

15 files changed

+1540
-761
lines changed

Diff for: cmd/wasm/main.go

+276-10
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ import (
66
"bytes"
77
"context"
88
"fmt"
9+
"io"
910
"log"
1011
"log/slog"
1112
"net"
13+
"net/http"
14+
"strings"
1215
"syscall/js"
1316
"time"
1417

@@ -52,13 +55,8 @@ func main() {
5255
<-make(chan struct{}, 0)
5356
}
5457

55-
func newWush(jsConfig js.Value) map[string]any {
58+
func newWush(cfg js.Value) map[string]any {
5659
ctx := context.Background()
57-
var authKey string
58-
if jsAuthKey := jsConfig.Get("authKey"); jsAuthKey.Type() == js.TypeString {
59-
authKey = jsAuthKey.String()
60-
}
61-
6260
logger := slog.New(slog.NewTextHandler(jsConsoleWriter{}, nil))
6361
hlog := func(format string, args ...any) {
6462
fmt.Printf(format+"\n", args...)
@@ -68,18 +66,19 @@ func newWush(jsConfig js.Value) map[string]any {
6866
panic(err)
6967
}
7068

71-
send := overlay.NewSendOverlay(logger, dm)
72-
err = send.Auth.Parse(authKey)
69+
ov := overlay.NewWasmOverlay(log.Printf, dm, cfg.Get("onNewPeer"))
70+
71+
err = ov.PickDERPHome(ctx)
7372
if err != nil {
7473
panic(err)
7574
}
7675

77-
s, err := tsserver.NewServer(ctx, logger, send, dm)
76+
s, err := tsserver.NewServer(ctx, logger, ov, dm)
7877
if err != nil {
7978
panic(err)
8079
}
8180

82-
go send.ListenOverlayDERP(ctx)
81+
go ov.ListenOverlayDERP(ctx)
8382
go s.ListenAndServe(ctx)
8483
netns.SetDialerOverride(s.Dialer())
8584

@@ -94,12 +93,40 @@ func newWush(jsConfig js.Value) map[string]any {
9493
}
9594
hlog("WireGuard is ready")
9695

96+
cpListener, err := ts.Listen("tcp", ":4444")
97+
if err != nil {
98+
panic(err)
99+
}
100+
101+
go func() {
102+
err := http.Serve(cpListener, http.HandlerFunc(cpH(
103+
cfg.Get("onIncomingFile"),
104+
cfg.Get("downloadFile"),
105+
)))
106+
if err != nil {
107+
hlog("File transfer server exited: " + err.Error())
108+
}
109+
}()
110+
97111
return map[string]any{
112+
"auth_info": js.FuncOf(func(this js.Value, args []js.Value) any {
113+
if len(args) != 0 {
114+
log.Printf("Usage: auth_info()")
115+
return nil
116+
}
117+
118+
return map[string]any{
119+
"derp_id": ov.DerpRegionID,
120+
"derp_name": ov.DerpMap.Regions[int(ov.DerpRegionID)].RegionName,
121+
"auth_key": ov.ClientAuth().AuthKey(),
122+
}
123+
}),
98124
"stop": js.FuncOf(func(this js.Value, args []js.Value) any {
99125
if len(args) != 0 {
100126
log.Printf("Usage: stop()")
101127
return nil
102128
}
129+
cpListener.Close()
103130
ts.Close()
104131
return nil
105132
}),
@@ -127,6 +154,157 @@ func newWush(jsConfig js.Value) map[string]any {
127154
}),
128155
}
129156
}),
157+
"connect": js.FuncOf(func(this js.Value, args []js.Value) any {
158+
handler := js.FuncOf(func(this js.Value, promiseArgs []js.Value) any {
159+
resolve := promiseArgs[0]
160+
reject := promiseArgs[1]
161+
162+
go func() {
163+
if len(args) != 1 {
164+
errorConstructor := js.Global().Get("Error")
165+
errorObject := errorConstructor.New("Usage: connect(authKey)")
166+
reject.Invoke(errorObject)
167+
return
168+
}
169+
170+
var authKey string
171+
if args[0].Type() == js.TypeString {
172+
authKey = args[0].String()
173+
} else {
174+
errorConstructor := js.Global().Get("Error")
175+
errorObject := errorConstructor.New("Usage: connect(authKey)")
176+
reject.Invoke(errorObject)
177+
return
178+
}
179+
180+
var ca overlay.ClientAuth
181+
err := ca.Parse(authKey)
182+
if err != nil {
183+
errorConstructor := js.Global().Get("Error")
184+
errorObject := errorConstructor.New(fmt.Errorf("parse authkey: %w", err).Error())
185+
reject.Invoke(errorObject)
186+
return
187+
}
188+
189+
ctx, cancel := context.WithCancel(context.Background())
190+
peer, err := ov.Connect(ctx, ca)
191+
if err != nil {
192+
cancel()
193+
errorConstructor := js.Global().Get("Error")
194+
errorObject := errorConstructor.New(fmt.Errorf("parse authkey: %w", err).Error())
195+
reject.Invoke(errorObject)
196+
return
197+
}
198+
199+
resolve.Invoke(map[string]any{
200+
"id": js.ValueOf(peer.ID),
201+
"name": js.ValueOf(peer.Name),
202+
"ip": js.ValueOf(peer.IP.String()),
203+
"cancel": js.FuncOf(func(this js.Value, args []js.Value) any {
204+
cancel()
205+
return nil
206+
}),
207+
})
208+
}()
209+
210+
return nil
211+
})
212+
213+
promiseConstructor := js.Global().Get("Promise")
214+
return promiseConstructor.New(handler)
215+
}),
216+
"transfer": js.FuncOf(func(this js.Value, args []js.Value) any {
217+
handler := js.FuncOf(func(this js.Value, promiseArgs []js.Value) any {
218+
resolve := promiseArgs[0]
219+
reject := promiseArgs[1]
220+
221+
if len(args) != 5 {
222+
errorConstructor := js.Global().Get("Error")
223+
errorObject := errorConstructor.New("Usage: transfer(peer, file)")
224+
reject.Invoke(errorObject)
225+
return nil
226+
}
227+
228+
peer := args[0]
229+
ip := peer.Get("ip").String()
230+
fileName := args[1].String()
231+
sizeBytes := args[2].Int()
232+
stream := args[3]
233+
streamHelper := args[4]
234+
235+
pr, pw := io.Pipe()
236+
237+
goCallback := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
238+
promiseConstructor := js.Global().Get("Promise")
239+
return promiseConstructor.New(js.FuncOf(func(this js.Value, promiseArgs []js.Value) any {
240+
resolve := promiseArgs[0]
241+
_ = promiseArgs[1]
242+
go func() {
243+
if len(args) == 0 || args[0].IsNull() || args[0].IsUndefined() {
244+
pw.Close()
245+
resolve.Invoke()
246+
return
247+
}
248+
249+
fmt.Println("in go callback")
250+
// Convert the JavaScript Uint8Array to a Go byte slice
251+
uint8Array := args[0]
252+
fmt.Println("type is", uint8Array.Type().String())
253+
length := uint8Array.Get("length").Int()
254+
buf := make([]byte, length)
255+
js.CopyBytesToGo(buf, uint8Array)
256+
257+
fmt.Println("sending data to channel")
258+
// Send the data to the channel
259+
if _, err := pw.Write(buf); err != nil {
260+
pw.CloseWithError(err)
261+
}
262+
fmt.Println("callback finished")
263+
264+
// Resolve the promise
265+
resolve.Invoke()
266+
}()
267+
return nil
268+
}))
269+
})
270+
271+
go func() {
272+
defer goCallback.Release()
273+
274+
streamHelper.Invoke(stream, goCallback)
275+
276+
hc := ts.HTTPClient()
277+
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:4444/%s", ip, fileName), pr)
278+
if err != nil {
279+
errorConstructor := js.Global().Get("Error")
280+
errorObject := errorConstructor.New(err.Error())
281+
reject.Invoke(errorObject)
282+
return
283+
}
284+
req.ContentLength = int64(sizeBytes)
285+
286+
res, err := hc.Do(req)
287+
if err != nil {
288+
errorConstructor := js.Global().Get("Error")
289+
errorObject := errorConstructor.New(err.Error())
290+
reject.Invoke(errorObject)
291+
return
292+
}
293+
defer res.Body.Close()
294+
295+
bod := bytes.NewBuffer(nil)
296+
_, _ = io.Copy(bod, res.Body)
297+
298+
fmt.Println(bod.String())
299+
resolve.Invoke()
300+
}()
301+
302+
return nil
303+
})
304+
305+
promiseConstructor := js.Global().Get("Promise")
306+
return promiseConstructor.New(handler)
307+
}),
130308
}
131309
}
132310

@@ -306,3 +484,91 @@ func newTSNet(direction string) (*tsnet.Server, error) {
306484

307485
return srv, nil
308486
}
487+
488+
func cpH(onIncomingFile js.Value, downloadFile js.Value) http.HandlerFunc {
489+
return func(w http.ResponseWriter, r *http.Request) {
490+
if r.Method != "POST" {
491+
w.WriteHeader(http.StatusOK)
492+
w.Write([]byte("OK"))
493+
return
494+
}
495+
496+
fiName := strings.TrimPrefix(r.URL.Path, "/")
497+
498+
// TODO: impl
499+
peer := map[string]any{
500+
"id": js.ValueOf(0),
501+
"name": js.ValueOf(""),
502+
"ip": js.ValueOf(""),
503+
"cancel": js.FuncOf(func(this js.Value, args []js.Value) any {
504+
return nil
505+
}),
506+
}
507+
508+
allow := onIncomingFile.Invoke(peer, fiName, r.ContentLength).Bool()
509+
if !allow {
510+
w.WriteHeader(http.StatusForbidden)
511+
w.Write([]byte("File transfer was denied"))
512+
r.Body.Close()
513+
return
514+
}
515+
516+
underlyingSource := map[string]interface{}{
517+
// start method
518+
"start": js.FuncOf(func(this js.Value, args []js.Value) interface{} {
519+
// The first and only arg is the controller object
520+
controller := args[0]
521+
522+
// Process the stream in yet another background goroutine,
523+
// because we can't block on a goroutine invoked by JS in Wasm
524+
// that is dealing with HTTP requests
525+
go func() {
526+
// Close the response body at the end of this method
527+
defer r.Body.Close()
528+
529+
// Read the entire stream and pass it to JavaScript
530+
for {
531+
// Read up to 16KB at a time
532+
buf := make([]byte, 16384)
533+
n, err := r.Body.Read(buf)
534+
if err != nil && err != io.EOF {
535+
// Tell the controller we have an error
536+
// We're ignoring "EOF" however, which means the stream was done
537+
errorConstructor := js.Global().Get("Error")
538+
errorObject := errorConstructor.New(err.Error())
539+
controller.Call("error", errorObject)
540+
return
541+
}
542+
if n > 0 {
543+
// If we read anything, send it to JavaScript using the "enqueue" method on the controller
544+
// We need to convert it to a Uint8Array first
545+
arrayConstructor := js.Global().Get("Uint8Array")
546+
dataJS := arrayConstructor.New(n)
547+
js.CopyBytesToJS(dataJS, buf[0:n])
548+
controller.Call("enqueue", dataJS)
549+
}
550+
if err == io.EOF {
551+
// Stream is done, so call the "close" method on the controller
552+
controller.Call("close")
553+
return
554+
}
555+
}
556+
}()
557+
558+
return nil
559+
}),
560+
// cancel method
561+
"cancel": js.FuncOf(func(this js.Value, args []js.Value) interface{} {
562+
// If the request is canceled, just close the body
563+
r.Body.Close()
564+
565+
return nil
566+
}),
567+
}
568+
569+
readableStreamConstructor := js.Global().Get("ReadableStream")
570+
readableStream := readableStreamConstructor.New(underlyingSource)
571+
572+
downloadFile.Invoke(peer, fiName, r.ContentLength, readableStream)
573+
}
574+
}

0 commit comments

Comments
 (0)