Skip to content

Commit 2fc50e2

Browse files
committed
Add eio backend
1 parent 1b604cc commit 2fc50e2

12 files changed

+421
-6
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
_build
22
**/*.merlin
3-
*.install
3+
*.install
4+
.vscode

async/dune

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
(library
22
(name websocket_async)
3-
(public_name websocket-async)
4-
(modules websocket_async)
5-
(optional)
6-
(libraries websocket logs-async cohttp-async))
3+
(public_name websocket-async)
4+
(modules websocket_async)
5+
(optional)
6+
(libraries websocket logs-async cohttp-async))
77

88
(executable
99
(name wscat)

core/websocket.mli

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ val upgrade_present : Cohttp.Header.t -> bool
2121

2222
exception Protocol_error of string
2323

24+
val proto_error : ('b, Format.formatter, unit, 'a) format4 -> 'b
25+
2426
module Rng : sig
2527
val init : ?state:Random.State.t -> unit -> int -> string
2628
(** [init ?state ()] is a function that returns a string of random
@@ -40,6 +42,9 @@ module Frame : sig
4042
| Nonctrl of int
4143

4244
val to_string : t -> string
45+
val to_enum : t -> int
46+
val of_enum : int -> t
47+
val is_ctrl : t -> bool
4348
val pp : Format.formatter -> t -> unit
4449
end
4550

@@ -57,6 +62,7 @@ module Frame : sig
5762
t
5863

5964
val close : int -> t
65+
val of_bytes : ?opcode:Opcode.t -> ?extension:int -> ?final:bool -> bytes -> t
6066
end
6167

6268
val check_origin :

dune

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
(vendored_dirs ocaml-cohttp)

dune-project

+25
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,28 @@
8585
(websocket (= :version))
8686
(lwt_log (>= 1.1.1))
8787
(cohttp-lwt-unix (>= 5.0.0))))
88+
89+
(package
90+
(name websocket-eio)
91+
(synopsis "Websocket library (Eio)")
92+
(description
93+
"\| The WebSocket Protocol enables two-way communication between a client
94+
"\| running untrusted code in a controlled environment to a remote host
95+
"\| that has opted-in to communications from that code.
96+
"\|
97+
"\| The security model used for this is the origin-based security model
98+
"\| commonly used by web browsers. The protocol consists of an opening
99+
"\| handshake followed by basic message framing, layered over TCP.
100+
"\|
101+
"\| The goal of this technology is to provide a mechanism for
102+
"\| browser-based applications that need two-way communication with
103+
"\| servers that does not rely on opening multiple HTTP connections (e.g.,
104+
"\| using XMLHttpRequest or <iframe>s and long polling).
105+
)
106+
(tags (org:mirage org:xapi-project))
107+
(depends
108+
(ocaml (>= 5.0.0))
109+
(websocket (= :version))
110+
eio
111+
cohttp-eio
112+
(eio_main :with-test)))

eio/dune

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
(library
2+
(name websocket_eio)
3+
(public_name websocket-eio)
4+
(libraries websocket cohttp-eio))

eio/io.ml

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
open Websocket
2+
open Astring
3+
open Eio
4+
5+
type mode = Client of (int -> string) | Server
6+
7+
let is_client mode = mode <> Server
8+
9+
let xor mask msg =
10+
for i = 0 to Bytes.length msg - 1 do
11+
(* masking msg to send *)
12+
Bytes.set msg i
13+
Char.(to_int mask.[i mod 4] lxor to_int (Bytes.get msg i) |> of_byte)
14+
done
15+
16+
let is_bit_set idx v = (v lsr idx) land 1 = 1
17+
let set_bit v idx b = if b then v lor (1 lsl idx) else v land lnot (1 lsl idx)
18+
let int_value shift len v = (v lsr shift) land ((1 lsl len) - 1)
19+
20+
let read_exactly src remaining =
21+
try
22+
Some (Buf_read.take remaining src)
23+
with End_of_file -> None
24+
25+
let read_uint16 ic =
26+
match read_exactly ic 2 with
27+
| None -> None
28+
| Some s -> Some (EndianString.BigEndian.get_uint16 s 0)
29+
30+
let read_int64 ic =
31+
match read_exactly ic 8 with
32+
| None -> None
33+
| Some s -> Some (Int64.to_int @@ EndianString.BigEndian.get_int64 s 0)
34+
35+
let write_frame_to_buf ~mode buf fr =
36+
let open Frame in
37+
let content = Bytes.unsafe_of_string fr.content in
38+
let len = Bytes.length content in
39+
let opcode = Opcode.to_enum fr.opcode in
40+
let payload_len =
41+
match len with
42+
| n when n < 126 -> len
43+
| n when n < 1 lsl 16 -> 126
44+
| _ -> 127 in
45+
let hdr = set_bit 0 15 fr.final in
46+
(* We do not support extensions for now *)
47+
let hdr = hdr lor (opcode lsl 8) in
48+
let hdr = set_bit hdr 7 (is_client mode) in
49+
let hdr = hdr lor payload_len in
50+
(* Payload len is guaranteed to fit in 7 bits *)
51+
Buf_write.BE.uint16 buf hdr;
52+
( match len with
53+
| n when n < 126 -> ()
54+
| n when n < 1 lsl 16 ->
55+
Buf_write.BE.uint16 buf n
56+
| n ->
57+
Buf_write.BE.uint64 buf Int64.(of_int n);
58+
);
59+
( match mode with
60+
| Server -> ()
61+
| Client random_string ->
62+
let mask = random_string 4 in
63+
Buf_write.string buf mask ;
64+
if len > 0 then xor mask content ) ;
65+
Buf_write.bytes buf content
66+
67+
let close_with_code mode dst code =
68+
write_frame_to_buf ~mode dst @@ Frame.close code
69+
70+
let read_frame ic oc mode hdr =
71+
let hdr_part1 = EndianString.BigEndian.get_int8 hdr 0 in
72+
let hdr_part2 = EndianString.BigEndian.get_int8 hdr 1 in
73+
let final = is_bit_set 7 hdr_part1 in
74+
let extension = int_value 4 3 hdr_part1 in
75+
let opcode = int_value 0 4 hdr_part1 in
76+
let frame_masked = is_bit_set 7 hdr_part2 in
77+
let length = int_value 0 7 hdr_part2 in
78+
let opcode = Frame.Opcode.of_enum opcode in
79+
let payload_len =
80+
match length with
81+
| i when i < 126 -> i
82+
| 126 -> ( match read_uint16 ic with Some i -> i | None -> -1 )
83+
| 127 -> ( match read_int64 ic with Some i -> i | None -> -1 )
84+
| _ -> -1 in
85+
if payload_len = -1 then proto_error "payload len = %d" length
86+
else if extension <> 0 then (
87+
close_with_code mode oc 1002 ;
88+
proto_error "unsupported extension" )
89+
else if Frame.Opcode.is_ctrl opcode && payload_len > 125 then (
90+
close_with_code mode oc 1002 ;
91+
proto_error "control frame too big" )
92+
else
93+
let mask =
94+
if frame_masked then (
95+
match read_exactly ic 4 with
96+
| None -> proto_error "could not read mask"
97+
| Some mask -> mask )
98+
else String.empty in
99+
if payload_len = 0 then Frame.create ~opcode ~extension ~final ()
100+
else (
101+
match read_exactly ic payload_len with
102+
| None -> proto_error "could not read payload (len=%d)" payload_len
103+
| Some payload ->
104+
let payload = Bytes.unsafe_of_string payload in
105+
if frame_masked then xor mask payload ;
106+
let frame = Frame.of_bytes ~opcode ~extension ~final payload in
107+
frame )
108+
109+
let make_read_frame ~mode ic oc () =
110+
match read_exactly ic 2 with
111+
| None -> raise End_of_file
112+
| Some hdr -> read_frame ic oc mode hdr
113+
114+
module Connected_client = struct
115+
type t =
116+
{ buffer: Buf_write.t;
117+
endp: Conduit.endp;
118+
ic: Buf_read.t;
119+
http_request: Cohttp.Request.t;
120+
standard_frame_replies: bool;
121+
read_frame: unit -> Frame.t }
122+
123+
let source {endp; _} = endp
124+
125+
let create http_request endp ic oc =
126+
let read_frame = make_read_frame ~mode:Server ic oc in
127+
{ buffer = oc;
128+
endp;
129+
ic;
130+
http_request;
131+
standard_frame_replies = false;
132+
read_frame }
133+
134+
let send {buffer; _} frame =
135+
write_frame_to_buf ~mode:Server buffer frame
136+
137+
let send_multiple {buffer; _} frames =
138+
List.iter (write_frame_to_buf ~mode:Server buffer) frames
139+
140+
let standard_recv t =
141+
let fr = t.read_frame () in
142+
match fr.Frame.opcode with
143+
| Frame.Opcode.Ping ->
144+
send t @@ Frame.create ~opcode:Frame.Opcode.Pong () ;
145+
fr
146+
| Frame.Opcode.Close ->
147+
(* Immediately echo and pass this last message to the user *)
148+
if String.length fr.Frame.content >= 2 then
149+
send t
150+
@@ Frame.create ~opcode:Frame.Opcode.Close
151+
~content:
152+
String.(sub ~start:0 ~stop:2 fr.Frame.content |> Sub.to_string)
153+
()
154+
else send t @@ Frame.close 1000 ;
155+
fr
156+
| _ -> fr
157+
158+
let recv t =
159+
if t.standard_frame_replies then standard_recv t else t.read_frame ()
160+
161+
let http_request {http_request; _} = http_request
162+
let make_standard t = {t with standard_frame_replies= true}
163+
end

eio/websocket_eio.ml

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
(*
2+
* Copyright (c) 2016-2018 Maciej Wos <[email protected]>
3+
* Copyright (c) 2012-2018 Vincent Bernardoff <[email protected]>
4+
*
5+
* Permission to use, copy, modify, and distribute this software for any
6+
* purpose with or without fee is hereby granted, provided that the above
7+
* copyright notice and this permission notice appear in all copies.
8+
*
9+
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10+
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11+
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12+
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13+
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14+
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15+
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16+
*
17+
*)
18+
open Websocket
19+
module Ws_io = Io
20+
21+
let send_frames stream (oc : Eio.Buf_write.t) =
22+
let rec send_frame stream =
23+
let fr = Eio.Stream.take stream in
24+
Ws_io.write_frame_to_buf ~mode:Server oc fr ;
25+
send_frame stream in
26+
send_frame stream
27+
28+
let read_frames ic oc handler_fn : unit =
29+
let read_frame = Ws_io.make_read_frame ~mode:Server ic oc in
30+
let rec inner () =
31+
handler_fn @@ read_frame () ;
32+
inner () in
33+
inner ()
34+
35+
let upgrade_connection (request : Cohttp_eio.Server.request) incoming_handler =
36+
let request, buf, _ = request in
37+
let headers = Http.Request.headers request in
38+
let key =
39+
match Http.Header.get headers "sec-websocket-key" with
40+
| None ->
41+
invalid_arg "upgrade_connection: missing header `sec-websocket-key`"
42+
| Some key -> key in
43+
let hash = b64_encoded_sha1sum (key ^ websocket_uuid) in
44+
let response_headers =
45+
Http.Header.of_list
46+
[ ("Upgrade", "websocket"); ("Connection", "Upgrade");
47+
("Sec-WebSocket-Accept", hash) ] in
48+
let frames_out_stream = Eio.Stream.create max_int in
49+
let frames_out_fn v = Eio.Stream.add frames_out_stream v in
50+
let f (oc : Eio.Buf_write.t) =
51+
Eio.Fiber.both
52+
(* output: data for the client is written to the output
53+
* channel of the tcp connection *)
54+
(fun () -> send_frames frames_out_stream oc )
55+
(* input: data from the client is read from the input channel
56+
* of the tcp connection; pass it to handler function *)
57+
(fun () -> read_frames buf oc incoming_handler ) in
58+
let resp : Cohttp_eio.Server.response =
59+
( Http.Response.make ~status:`Switching_protocols ~version:`HTTP_1_1
60+
~headers:response_headers (),
61+
Cohttp_eio.Body.(Custom f) ) in
62+
(resp, frames_out_fn)

eio/websocket_eio.mli

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
open Websocket
2+
3+
val upgrade_connection :
4+
Cohttp_eio.Server.request ->
5+
(Frame.t -> unit) ->
6+
Cohttp_eio.Server.response * (Frame.t -> unit)
7+
(** [upgrade_connection req incoming_handler] takes [req], a
8+
connection request, and [incoming_handler], a function that will
9+
process incoming websocket frames, and returns ([response_action],
10+
[push_frame]) where [response_action] is used to produce a
11+
{!Cohttp_lwt.Server.t} and [push_frame] is used to send websocket
12+
frames to the client. *)

test/dune

+6-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,9 @@
66
(executable
77
(name upgrade_connection)
88
(modules upgrade_connection)
9-
(libraries logs.fmt logs.lwt websocket_cohttp_lwt))
9+
(libraries lwt.unix logs.fmt logs.lwt websocket_cohttp_lwt))
10+
11+
(executable
12+
(name eio_upgrade_connection)
13+
(modules eio_upgrade_connection)
14+
(libraries eio_main websocket_eio))

0 commit comments

Comments
 (0)