diff --git a/src/core/conduit.ml b/src/core/conduit.ml index 89f4a78a..212a51bf 100644 --- a/src/core/conduit.ml +++ b/src/core/conduit.ml @@ -1,7 +1,5 @@ module Endpoint = Endpoint -module Sigs = Sigs - -type ('a, 'b) refl = Refl : ('a, 'a) refl +include Conduit_intf let strf = Format.asprintf @@ -17,8 +15,6 @@ type _ resolver = } -> ('edn * 's) resolver -type ('a, 'b) value = Value : 'b -> ('a, 'b) value - let reword_error f = function Ok x -> Ok x | Error err -> Error (f err) let msgf fmt = Fmt.kstrf (fun err -> `Msg err) fmt @@ -52,139 +48,6 @@ type resolvers = Map.t let empty = Map.empty -module type S = sig - module Endpoint : module type of Endpoint - - type input - - type output - - type +'a io - - type scheduler - - type flow = private .. - - type error = [ `Msg of string | `Not_found ] - - val pp_error : error Fmt.t - - val recv : - flow -> input -> ([ `Input of int | `End_of_flow ], [> error ]) result io - - val send : flow -> output -> (int, [> error ]) result io - - val close : flow -> (unit, [> error ]) result io - - module type FLOW = - Sigs.FLOW - with type input = input - and type output = output - and type +'a io = 'a io - - module type PROTOCOL = - Sigs.PROTOCOL - with type input = input - and type output = output - and type +'a io = 'a io - - type ('edn, 'flow) impl = - (module PROTOCOL with type endpoint = 'edn and type flow = 'flow) - - type ('edn, 'flow) protocol - - val register : protocol:('edn, 'flow) impl -> ('edn, 'flow) protocol - - module type REPR = sig - type t - - type flow += T of t - end - - val repr : ('edn, 'v) protocol -> (module REPR with type t = ('edn, 'v) value) - - type unpack = Flow : 'flow * (module FLOW with type flow = 'flow) -> unpack - - val unpack : flow -> unpack - - val impl : - ('edn, 'flow) protocol -> - (module PROTOCOL with type endpoint = 'edn and type flow = 'flow) - - val cast : flow -> ('edn, 'flow) protocol -> 'flow option - - val pack : ('edn, 'v) protocol -> 'v -> flow - - type 'edn resolver = Endpoint.t -> 'edn option io - - type nonrec resolvers = resolvers - - val empty : resolvers - - val add : - ('edn, 'flow) protocol -> - ?priority:int -> - 'edn resolver -> - resolvers -> - resolvers - - val resolve : - resolvers -> - ?protocol:('edn, 'v) protocol -> - Endpoint.t -> - (flow, [> error ]) result io - - val connect : 'edn -> ('edn, _) protocol -> (flow, [> error ]) result io - - module type SERVICE = Sigs.SERVICE with type +'a io = 'a io - - module Service : sig - type ('cfg, 't, 'flow) impl = - (module SERVICE - with type configuration = 'cfg - and type t = 't - and type flow = 'flow) - - type ('cfg, 't, 'flow) service - - val equal : - ('cfg0, 't0, 'flow0) service -> - ('cfg1, 't1, 'flow1) service -> - (('cfg0, 'cfg1) refl * ('t0, 't1) refl * ('flow0, 'flow1) refl) option - - val register : - service:('cfg, 't, 'v) impl -> - protocol:(_, 'v) protocol -> - ('cfg, 't, 'v) service - - type error = [ `Msg of string ] - - val pp_error : error Fmt.t - - val init : - 'cfg -> service:('cfg, 't, 'v) service -> ('t, [> error ]) result io - - val accept : - service:('cfg, 't, 'v) service -> 't -> (flow, [> error ]) result io - - val close : - service:('cfg, 't, 'v) service -> 't -> (unit, [> error ]) result io - - val pack : (_, _, 'v) service -> 'v -> flow - - val impl : - ('cfg, 't, 'v) service -> - (module SERVICE - with type configuration = 'cfg - and type t = 't - and type flow = 'v) - end -end - -module type IO = Sigs.IO - -module type BUFFER = Sigs.BUFFER - module type BIJECTION = sig type +'a s @@ -241,13 +104,13 @@ module Make (IO : IO) (Input : BUFFER) (Output : BUFFER) : type output = Output.t module type PROTOCOL = - Sigs.PROTOCOL + PROTOCOL with type input = input and type output = output and type +'a io = 'a io module type FLOW = - Sigs.FLOW + FLOW with type input = input and type output = output and type +'a io = 'a io @@ -486,7 +349,7 @@ module Make (IO : IO) (Input : BUFFER) (Output : BUFFER) : | Some (Value flow) -> Some flow | None -> None - module type SERVICE = Sigs.SERVICE with type +'a io = 'a io + module type SERVICE = SERVICE with type +'a io = 'a io module Service = struct type ('cfg, 't, 'flow) impl = diff --git a/src/core/conduit.mli b/src/core/conduit.mli index 3fa4e853..45d7ebf5 100644 --- a/src/core/conduit.mli +++ b/src/core/conduit.mli @@ -1,427 +1,21 @@ module Endpoint = Endpoint -type ('a, 'b) refl = Refl : ('a, 'a) refl - type resolvers (** Type for resolvers map. *) val empty : resolvers (** [empty] is an empty {!resolvers} map. *) -type ('edn, 'flow) value = Value : 'flow -> ('edn, 'flow) value - -module type S = sig - module Endpoint : module type of Endpoint - - type input - (** The type for payload inputs. *) - - type output - (** The type for payload outputs. *) - - type +'a io - (** The type for I/O effects. *) - - type scheduler - (** The type of I/O monads. *) - - (** {2:client Client-side conduits.} *) - - type flow = private .. - (** The type for generic flows. {!PROTOCOL} implementations are extending (via - {!register}) this type. It allows users to extract the underlying flow - implementation: - - {[ - Conduit.connect domain_name >>? function - | Conduit_lwt_unix_tcp.T Conduit.(Value (file_descr : Lwt_unix.file_descr)) -> ... - | Conduit_lwt_unix_tls.T Conduit.(Value (fd, (tls : Tls.Engine.state))) -> ... - | _ -> ... (* use flow functions for the default case *) - ]} - *) - - type error = [ `Msg of string | `Not_found ] - - val pp_error : error Fmt.t - - val recv : - flow -> input -> ([ `Input of int | `End_of_flow ], [> error ]) result io - (** [recv flow input] is [Ok (`Input len)] iff [n] bytes of data has been - received from the flow [flow] and copied in [input]. *) - - val send : flow -> output -> (int, [> error ]) result io - (** [send flow output] is [Ok n] iff [n] bytes of date from [output] has been - sent over the flow [flow]. *) - - val close : flow -> (unit, [> error ]) result io - (** [close flow] closes [flow]. Subsequent calls to {!recv} will return - [Ok `End_of_flow]. Subsequent calls to {!send} will return an [Error]. *) - - (** {2:registration Protocol registration.} *) - - (** A flow is a system that allows entities to transmit {i payloads}. These - entities do not have to care about the underlying transport mechanism. - flows simply deal with routing and delivering of these payloads. That - abstraction allows these protocols to compose. - - For example, the Transmission Control Protocol (TCP) is representable as a - flow, because it is able to encapsulate some {i payloads} without - interpreting it. A counter-example is the Simple Mail Transfer Protocol - (SMTP) which needs an interpretation of its {i payloads}: tokens such as - [EHLO] or [QUIT] have a direct incidence over the life-cycle of the - connection. - - An other protocol representable as a flow is the Transport Layer Security - (TLS), as it deals only with privacy and data integrity. [Conduit] is able - to compose flows together like [TCP ∘ TLS] to make a new flow. Higher-level - protocols can be built in top of these abstract flows: for instance, Secure - Simple Mail Transfer Protocol (SSMTP) or HyperText Transfer Protocol Secure - (HTTPS) can be defined on top of both TCP and TLS. Using [Conduit], these - can be abstracted to work over any flow implementations. *) - module type FLOW = - Sigs.FLOW - with type input = input - and type output = output - and type +'a io = 'a io - - (** A protocol is a {!FLOW} plus [connect]. *) - module type PROTOCOL = - Sigs.PROTOCOL - with type input = input - and type output = output - and type +'a io = 'a io - - type ('edn, 'flow) impl = - (module PROTOCOL with type endpoint = 'edn and type flow = 'flow) - (** The type to represent a module {!PROTOCOL}. *) - - type ('edn, 'flow) protocol - (** The type for client protocols. ['edn] is the type for endpoint parameters. - ['flow] is the type for underlying flows. - - Endpoints allow users to create flows by either connecting directly to a - remote server or by resolving domain names (with {!connect}). *) - - val register : protocol:('edn, 'flow) impl -> ('edn, 'flow) protocol - (** [register ~protocol] is the protocol using the implementation [protocol]. - [protocol] must provide a [connect] function to allow client flows to be - created. - - For instance, on Unix, [Conduit] clients will use [Unix.sockaddr] as flow - endpoints, while [Unix.file_descr] would be used for the flow transport. - - {[ - module Conduit_tcp : sig - val t : (Unix.sockaddr, Unix.file_descr) protocol - end = struct - let t = register ~protocol:(module TCP) - end - ]} - - Client endpoints can of course be more complex, for instance to hold TLS - credentials, and [Conduit] allows all these kinds of flow to be used - transparently: - - {[ - module Conduit_tcp_tls : sig - val t : (Unix.sockaddr * Tls.Config.client, Unix.file_descr) protocol - end = struct - let t = register ~protocol:(module TLS) - end - ]} - - As a protocol implementer, you must {i register} your implementation and - expose the {i witness} of it. Then, users will be able to use it. *) - - (** {2 Injection and Extraction.} - - The goal of [Conduit] is to provide: - {ul - {- A way to manipulate a fully-abstract [flow].} - {- A way to manipulate a concrete and well-know [flow].}} - - [Conduit] provides several mechanisms to be able to manipulate our abstract - type {!flow} and destruct it to a concrete value such as a [Unix.file_descr]. - [Conduit] can assert one assumption: from a given abstracted [flow], it exists - one and only one {!FLOW} implementation. - - As [Conduit] determines this implementation, the user can determine the used - implementation when he wants to {!send} or {!recv} datas. - - So [Conduit] uses or extracts uniqely the implementation registered before - with {!register} and no layer can tweak or update this assertion. - - {!repr}, {!flow}, {!impl} and {!is} can extracts in differents ways the - abstracted {!flow}: - {ul - {- with the {i pattern-matching}} - {- with {i first-class module}} - {- with the function {!is}}} - *) - - module type REPR = sig - type t - - type flow += T of t - end - - val repr : ('edn, 'v) protocol -> (module REPR with type t = ('edn, 'v) value) - (** As a protocol implementer, you should expose the concrete type of your - flow (to be able users to {i destruct} {!flow}). [repr] returns a module - which contains extension of {!flow} from your [protocol] such as: - - {[ - module Conduit_tcp : sig - type t = (Unix.sockaddr, Unix.file_descr) Conduit.value - type Conduit.flow += T of t - val t : (Unix.sockaddr, Unix.file_descr) protocol - end = struct - let t = register ~protocol:(module TCP) - include (val (Conduit.repr t)) - end - ]} - - With this interface, users are able to {i destruct} {!flow} to your - concrete type: - - {[ - Conduit.connect domain_name >>? function - | Conduit_tcp.T (Conduit.Value file_descr) -> ... - | _ -> ... - ]} - *) - - type unpack = Flow : 'flow * (module FLOW with type flow = 'flow) -> unpack - - val unpack : flow -> unpack - (** [pack flow] projects the module implementation associated to the given - abstract [flow] such as: +type ('edn, 'flow) value = ('edn, 'flow) Conduit_intf.value = + | Value : 'flow -> ('edn, 'flow) value - {[ - Conduit.connect edn >>= fun flow -> - let Conduit.Flow (flow, (module Flow)) = Conduit.unpack flow in - Flow.send flow "Hello World!" - ]} - *) - - val impl : - ('edn, 'flow) protocol -> - (module PROTOCOL with type endpoint = 'edn and type flow = 'flow) - (** [impl protocol] is [protocol]'s implementation. *) - - val cast : flow -> (_, 'flow) protocol -> 'flow option - (** [cast flow protocol] tries to {i cast} the given [flow] to the concrete - type described by the given [protocol]. - - {[ - match Conduit.is flow Conduit_tcp.t with - | Some (file_descr : Unix.file_descr) -> Some (Unix.getpeername file_descr) - | None -> None - ]} - *) - - val pack : (_, 'v) protocol -> 'v -> flow - (** [pack protocol concrete_flow] abstracts the given [flow] into the - {!flow} type from a given [protocol]. It permits to use [Conduit] with a - concrete value created by the user. - - {[ - let socket = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - let flow = Conduit.pack Conduit_tcp.t socket in - Conduit.send flow "Hello World!" - ]} - *) - - (** {2:resolution Domain name resolvers.} *) - - type 'edn resolver = Endpoint.t -> 'edn option io - (** The type for resolver functions, which resolve domain names to endpoints. - For instance, the DNS resolver function is: - - {[ - let http_resolver : Unix.sockaddr resolver = function - | IP ip -> Some (Ipaddr_unix.to_inet_addr ip, 80) - | Domain domain_name -> match Unix.gethostbyname (Domain_name.to_string domain_name) with - | { Unix.h_addr_list; _ } -> - if Array.length h_addr_list > 0 - then Some (Unix.ADDR_INET (h_addr_list.(0), 80)) - else None - | exception _ -> None - ]} - *) - - type nonrec resolvers = resolvers - - val empty : resolvers - - val add : - ('edn, _) protocol -> - ?priority:int -> - 'edn resolver -> - resolvers -> - resolvers - (** [add protocol ?priority resolver resolvers] adds a new resolver function - [resolver] to [resolvers]. - - When the [resolver] is able to resolve the given domain name, it will try - to connect to the specified client endpoint. Resolvers are iterated in - priority order (lower to higher). - - {[ - let http_resolver = ... - let https_resolver = ... (* deal with client-side certificates here. *) - - let resolvers = - empty - |> add Conduit_tcp.t http_resolver - |> add Conduit_tcp_tls.t https_resolver ~priority:10 - |> add Conduit_tcp_ssl.t https_resolver ~priority:20 - ]} *) - - val resolve : - resolvers -> - ?protocol:('edn, 'v) protocol -> - Endpoint.t -> - (flow, [> error ]) result io - (** [resolve resolvers domain_name] is the flow created by connecting to the - domain name [domain_name], using the resolvers [resolvers]. Each resolver - tries to resolve the given domain-name (they are ordered by the given - priority). The first which connects successfully wins. - - The resolver result is a flow connect to that winning endpoint. - - {[ - let mirage_io = domain_name_exn "mirage.io" - - val resolver_on_my_private_network : Unix.sockaddr resolver - val resolver_on_internet : Unix.sockaddr resolver - val resolver_with_tls : (Unix.sockaddr * Tls.Config.client) resolver - - let resolvers = - empty - |> add tls ~priority:0 resolver_with_tls - |> add tcp ~priority:10 resolver_on_my_private_network - |> add tcp ~priority:20 resolver_on_internet - - let () = Conduit.resolve resolvers (Conduit.Endpoint.domain mirage_io) >>? function - | TCP.T (Conduit.Value file_descr) as flow -> - let peer = Unix.getpeername file_descr in - ignore @@ Conduit.send flow ("Hello " ^ string_of_sockaddr peer) - | flow -> - ignore @@ Conduit.send flow "Hello World!" - ]} - *) - - val connect : 'edn -> ('edn, _) protocol -> (flow, [> error ]) result io - - (** {2:service Server-side conduits.} *) - - module type SERVICE = Sigs.SERVICE with type +'a io = 'a io - - module Service : sig - type ('cfg, 't, 'flow) impl = - (module SERVICE - with type configuration = 'cfg - and type t = 't - and type flow = 'flow) - - type ('cfg, 't, 'flow) service - (** The type for services, e.g. service-side protocols. ['cfg] is the type - for configuration, ['t] is the type for state states. ['flow] is the type - for underlying flows. *) - - val equal : - ('cfg0, 't0, 'flow0) service -> - ('cfg1, 't1, 'flow1) service -> - (('cfg0, 'cfg1) refl * ('t0, 't1) refl * ('flow0, 'flow1) refl) option - (** [equal svc0 svc1 ] proves that [svc0] and [svc1] are - physically the same. For instance, [Conduit] asserts: - - {[ - let service = Service.register ~service:(module V) ;; - - let () = match Service.equal service service with - | Some (Refl, Refl, Refl) -> ... - | _ -> assert false - ]} *) - - val register : - service:('cfg, 't, 'v) impl -> - protocol:(_, 'v) protocol -> - ('cfg, 't, 'v) service - (** [register ~service ~protocool] is the service using the implementation [service] - bound with implementation of a [protocol]. [service] must define [make] and [accept] - function to be able to create server-side flows. - - For instance: - - {[ - module TCP_service : SERVICE with type configuration = Unix.sockaddr - and type t = Unix.file_descr - and type flow = Unix.file_descr - - let tcp_protocol = Conduit.register ~protocol:(module TCP_protocol) - let tcp_service : (Unix.sockaddr, Unix.file_descr, Unix.file_descr) Service.service = - Conduit.Service.register ~service:(module TCP_service) ~protocol:tcp_protocol - ]} - *) - - type error = [ `Msg of string ] - - val pp_error : error Fmt.t - - val init : - 'cfg -> service:('cfg, 't, 'v) service -> ('t, [> error ]) result io - (** [init cfg ~service] initialises the service with the - configuration [cfg]. *) - - val accept : - service:('cfg, 't, 'v) service -> 't -> (flow, [> error ]) result io - (** [accept service t] waits for a connection on the service [t]. The result - is a {i flow} connected to the client. *) - - val close : - service:('cfg, 't, 'v) service -> 't -> (unit, [> error ]) result io - (** [close ~service t] releases the resources associated to the server [t]. *) - - val pack : (_, _, 'v) service -> 'v -> flow - (** [pack service v] returns the abstracted value [v] as {!pack} does - for a given protocol {i witness} (bound with the given [service]). - It serves to abstract the flow created (and initialised) by the - service to a {!flow}. - - {[ - let handler (flow : Conduit.flow) = - Conduit.send flow "Hello World!" >>= fun _ -> - ... - - let run ~service cfg = - let module Service = Conduit.Service.impl service in - Service.init cfg >>? fun t -> - let rec loop t = - Service.accept t >>? fun flow -> - let flow = Conduit.Service.pack service flow in - async (fun () -> handler flow) ; loop t in - loop t - - let () = run ~service:tcp_service (localhost, 8080) - let () = run ~service:tls_service (certs, (localhost, 8080)) - ]} *) - - val impl : - ('cfg, 't, 'v) service -> - (module SERVICE - with type configuration = 'cfg - and type t = 't - and type flow = 'v) - (** [impl service] is [service]'s underlying implementation. *) - end -end +module type S = Conduit_intf.S +(** @inline *) -module type IO = Sigs.IO +module type IO = Conduit_intf.IO (** @inline *) -module type BUFFER = Sigs.BUFFER +module type BUFFER = Conduit_intf.BUFFER (** @inline *) module Make (IO : IO) (Input : BUFFER) (Output : BUFFER) : diff --git a/src/core/conduit_intf.ml b/src/core/conduit_intf.ml new file mode 100644 index 00000000..2c21478b --- /dev/null +++ b/src/core/conduit_intf.ml @@ -0,0 +1,521 @@ +type 'x or_end_of_flow = [ `End_of_flow | `Input of 'x ] + +module type FLOW = sig + (** [FLOW] is the signature for flow clients. + + A [flow] is an abstract value over which I/O functions such as {!send}, + {!recv} and {!close} can be used. + + {[ + type input = bytes and output = string + type +'a s = 'a + + let process flow = + let buf = Bytes.create 0x1000 in + match Flow.recv flow buf with + | Ok (`Input len) -> + let str = Bytes.sub_string buf 0 len in + ignore (Flow.send flow str) + | _ -> failwith "Flow.recv" + ]} + + The given flow can be more complex than a simple TCP flow for example. It + can be wrapped into a TLS layer. However, the goal is to be able to implement + a protocol without such complexity. + *) + + type +'a io + + type flow + + (** {3 Input & Output.} + + Depending on the I/O model, the type for inputs and outputs can differ ; + for instance they could allow users the ability to define capabilities on + them such as {i read} or {i write} capabilities. + + However, in most of the current [Conduit] backends: + + {[ + type input = Cstruct.t + type output = Cstruct.t + ]} + *) + + type input + + and output + + (** {3 Errors.} *) + + type error + (** The type for errors. *) + + val pp_error : error Fmt.t + (** [pp_error] is the pretty-printer for {!error}. *) + + val recv : flow -> input -> (int or_end_of_flow, error) result io + (** [recv flow input] is [Ok (`Input len)] iff [len] bytes of data has been received from + the flow [flow] and copied in [input]. *) + + val send : flow -> output -> (int, error) result io + (** [send t output] is [Ok len] iff [len] bytes of data from [output] has been + sent over the flow [flow]. *) + + val close : flow -> (unit, error) result io + (** [close flow] closes [flow]. Subsequent calls to {!recv} on [flow] will + return [`End_of_flow]. Subsequent calls to {!send} on [t] will return an + [Error]. *) +end + +module type PROTOCOL = sig + include FLOW + + type endpoint + + val connect : endpoint -> (flow, error) result io +end + +module type SERVICE = sig + type +'a io + + type flow + + type t + + type error + + type configuration + + val init : configuration -> (t, error) result io + + val pp_error : error Fmt.t + + val accept : t -> (flow, error) result io + + val close : t -> (unit, error) result io +end + +module type IO = sig + type +'a t + + val bind : 'a t -> ('a -> 'b t) -> 'b t + + val return : 'a -> 'a t +end + +module type BUFFER = sig + type t +end + +type ('a, 'b) refl = Refl : ('a, 'a) refl + +type ('edn, 'flow) value = Value : 'flow -> ('edn, 'flow) value + +module type S = sig + module Endpoint : module type of Endpoint + + type input + (** The type for payload inputs. *) + + type output + (** The type for payload outputs. *) + + type +'a io + (** The type for I/O effects. *) + + type scheduler + (** The type of I/O monads. *) + + (** {2:client Client-side conduits.} *) + + type flow = private .. + (** The type for generic flows. {!PROTOCOL} implementations are extending (via + {!register}) this type. It allows users to extract the underlying flow + implementation: + + {[ + Conduit.connect domain_name >>? function + | Conduit_lwt_unix_tcp.T Conduit.(Value (file_descr : Lwt_unix.file_descr)) -> ... + | Conduit_lwt_unix_tls.T Conduit.(Value (fd, (tls : Tls.Engine.state))) -> ... + | _ -> ... (* use flow functions for the default case *) + ]} + *) + + type error = [ `Msg of string | `Not_found ] + + val pp_error : error Fmt.t + + val recv : + flow -> input -> ([ `Input of int | `End_of_flow ], [> error ]) result io + (** [recv flow input] is [Ok (`Input len)] iff [n] bytes of data has been + received from the flow [flow] and copied in [input]. *) + + val send : flow -> output -> (int, [> error ]) result io + (** [send flow output] is [Ok n] iff [n] bytes of date from [output] has been + sent over the flow [flow]. *) + + val close : flow -> (unit, [> error ]) result io + (** [close flow] closes [flow]. Subsequent calls to {!recv} will return + [Ok `End_of_flow]. Subsequent calls to {!send} will return an [Error]. *) + + (** {2:registration Protocol registration.} *) + + (** A flow is a system that allows entities to transmit {i payloads}. These + entities do not have to care about the underlying transport mechanism. + flows simply deal with routing and delivering of these payloads. That + abstraction allows these protocols to compose. + + For example, the Transmission Control Protocol (TCP) is representable as a + flow, because it is able to encapsulate some {i payloads} without + interpreting it. A counter-example is the Simple Mail Transfer Protocol + (SMTP) which needs an interpretation of its {i payloads}: tokens such as + [EHLO] or [QUIT] have a direct incidence over the life-cycle of the + connection. + + An other protocol representable as a flow is the Transport Layer Security + (TLS), as it deals only with privacy and data integrity. [Conduit] is able + to compose flows together like [TCP ∘ TLS] to make a new flow. Higher-level + protocols can be built in top of these abstract flows: for instance, Secure + Simple Mail Transfer Protocol (SSMTP) or HyperText Transfer Protocol Secure + (HTTPS) can be defined on top of both TCP and TLS. Using [Conduit], these + can be abstracted to work over any flow implementations. *) + module type FLOW = + FLOW + with type input = input + and type output = output + and type +'a io = 'a io + + (** A protocol is a {!FLOW} plus [connect]. *) + module type PROTOCOL = + PROTOCOL + with type input = input + and type output = output + and type +'a io = 'a io + + type ('edn, 'flow) impl = + (module PROTOCOL with type endpoint = 'edn and type flow = 'flow) + (** The type to represent a module {!PROTOCOL}. *) + + type ('edn, 'flow) protocol + (** The type for client protocols. ['edn] is the type for endpoint parameters. + ['flow] is the type for underlying flows. + + Endpoints allow users to create flows by either connecting directly to a + remote server or by resolving domain names (with {!connect}). *) + + val register : protocol:('edn, 'flow) impl -> ('edn, 'flow) protocol + (** [register ~protocol] is the protocol using the implementation [protocol]. + [protocol] must provide a [connect] function to allow client flows to be + created. + + For instance, on Unix, [Conduit] clients will use [Unix.sockaddr] as flow + endpoints, while [Unix.file_descr] would be used for the flow transport. + + {[ + module Conduit_tcp : sig + val t : (Unix.sockaddr, Unix.file_descr) protocol + end = struct + let t = register ~protocol:(module TCP) + end + ]} + + Client endpoints can of course be more complex, for instance to hold TLS + credentials, and [Conduit] allows all these kinds of flow to be used + transparently: + + {[ + module Conduit_tcp_tls : sig + val t : (Unix.sockaddr * Tls.Config.client, Unix.file_descr) protocol + end = struct + let t = register ~protocol:(module TLS) + end + ]} + + As a protocol implementer, you must {i register} your implementation and + expose the {i witness} of it. Then, users will be able to use it. *) + + (** {2 Injection and Extraction.} + + The goal of [Conduit] is to provide: + {ul + {- A way to manipulate a fully-abstract [flow].} + {- A way to manipulate a concrete and well-know [flow].}} + + [Conduit] provides several mechanisms to be able to manipulate our abstract + type {!flow} and destruct it to a concrete value such as a [Unix.file_descr]. + [Conduit] can assert one assumption: from a given abstracted [flow], it exists + one and only one {!FLOW} implementation. + + As [Conduit] determines this implementation, the user can determine the used + implementation when he wants to {!send} or {!recv} datas. + + So [Conduit] uses or extracts uniqely the implementation registered before + with {!register} and no layer can tweak or update this assertion. + + {!repr}, {!flow}, {!impl} and {!is} can extracts in differents ways the + abstracted {!flow}: + {ul + {- with the {i pattern-matching}} + {- with {i first-class module}} + {- with the function {!is}}} + *) + + module type REPR = sig + type t + + type flow += T of t + end + + val repr : ('edn, 'v) protocol -> (module REPR with type t = ('edn, 'v) value) + (** As a protocol implementer, you should expose the concrete type of your + flow (to be able users to {i destruct} {!flow}). [repr] returns a module + which contains extension of {!flow} from your [protocol] such as: + + {[ + module Conduit_tcp : sig + type t = (Unix.sockaddr, Unix.file_descr) Conduit.value + type Conduit.flow += T of t + val t : (Unix.sockaddr, Unix.file_descr) protocol + end = struct + let t = register ~protocol:(module TCP) + include (val (Conduit.repr t)) + end + ]} + + With this interface, users are able to {i destruct} {!flow} to your + concrete type: + + {[ + Conduit.connect domain_name >>? function + | Conduit_tcp.T (Conduit.Value file_descr) -> ... + | _ -> ... + ]} + *) + + type unpack = Flow : 'flow * (module FLOW with type flow = 'flow) -> unpack + + val unpack : flow -> unpack + (** [pack flow] projects the module implementation associated to the given + abstract [flow] such as: + + {[ + Conduit.connect edn >>= fun flow -> + let Conduit.Flow (flow, (module Flow)) = Conduit.unpack flow in + Flow.send flow "Hello World!" + ]} + *) + + val impl : + ('edn, 'flow) protocol -> + (module PROTOCOL with type endpoint = 'edn and type flow = 'flow) + (** [impl protocol] is [protocol]'s implementation. *) + + val cast : flow -> (_, 'flow) protocol -> 'flow option + (** [cast flow protocol] tries to {i cast} the given [flow] to the concrete + type described by the given [protocol]. + + {[ + match Conduit.is flow Conduit_tcp.t with + | Some (file_descr : Unix.file_descr) -> Some (Unix.getpeername file_descr) + | None -> None + ]} + *) + + val pack : (_, 'v) protocol -> 'v -> flow + (** [pack protocol concrete_flow] abstracts the given [flow] into the + {!flow} type from a given [protocol]. It permits to use [Conduit] with a + concrete value created by the user. + + {[ + let socket = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + let flow = Conduit.pack Conduit_tcp.t socket in + Conduit.send flow "Hello World!" + ]} + *) + + (** {2:resolution Domain name resolvers.} *) + + type 'edn resolver = Endpoint.t -> 'edn option io + (** The type for resolver functions, which resolve domain names to endpoints. + For instance, the DNS resolver function is: + + {[ + let http_resolver : Unix.sockaddr resolver = function + | IP ip -> Some (Ipaddr_unix.to_inet_addr ip, 80) + | Domain domain_name -> match Unix.gethostbyname (Domain_name.to_string domain_name) with + | { Unix.h_addr_list; _ } -> + if Array.length h_addr_list > 0 + then Some (Unix.ADDR_INET (h_addr_list.(0), 80)) + else None + | exception _ -> None + ]} + *) + + type resolvers + + val empty : resolvers + + val add : + ('edn, _) protocol -> + ?priority:int -> + 'edn resolver -> + resolvers -> + resolvers + (** [add protocol ?priority resolver resolvers] adds a new resolver function + [resolver] to [resolvers]. + + When the [resolver] is able to resolve the given domain name, it will try + to connect to the specified client endpoint. Resolvers are iterated in + priority order (lower to higher). + + {[ + let http_resolver = ... + let https_resolver = ... (* deal with client-side certificates here. *) + + let resolvers = + empty + |> add Conduit_tcp.t http_resolver + |> add Conduit_tcp_tls.t https_resolver ~priority:10 + |> add Conduit_tcp_ssl.t https_resolver ~priority:20 + ]} *) + + val resolve : + resolvers -> + ?protocol:('edn, 'v) protocol -> + Endpoint.t -> + (flow, [> error ]) result io + (** [resolve resolvers domain_name] is the flow created by connecting to the + domain name [domain_name], using the resolvers [resolvers]. Each resolver + tries to resolve the given domain-name (they are ordered by the given + priority). The first which connects successfully wins. + + The resolver result is a flow connect to that winning endpoint. + + {[ + let mirage_io = domain_name_exn "mirage.io" + + val resolver_on_my_private_network : Unix.sockaddr resolver + val resolver_on_internet : Unix.sockaddr resolver + val resolver_with_tls : (Unix.sockaddr * Tls.Config.client) resolver + + let resolvers = + empty + |> add tls ~priority:0 resolver_with_tls + |> add tcp ~priority:10 resolver_on_my_private_network + |> add tcp ~priority:20 resolver_on_internet + + let () = Conduit.resolve resolvers (Conduit.Endpoint.domain mirage_io) >>? function + | TCP.T (Conduit.Value file_descr) as flow -> + let peer = Unix.getpeername file_descr in + ignore @@ Conduit.send flow ("Hello " ^ string_of_sockaddr peer) + | flow -> + ignore @@ Conduit.send flow "Hello World!" + ]} + *) + + val connect : 'edn -> ('edn, _) protocol -> (flow, [> error ]) result io + + (** {2:service Server-side conduits.} *) + + module type SERVICE = SERVICE with type +'a io = 'a io + + module Service : sig + type ('cfg, 't, 'flow) impl = + (module SERVICE + with type configuration = 'cfg + and type t = 't + and type flow = 'flow) + + type ('cfg, 't, 'flow) service + (** The type for services, e.g. service-side protocols. ['cfg] is the type + for configuration, ['t] is the type for state states. ['flow] is the type + for underlying flows. *) + + val equal : + ('cfg0, 't0, 'flow0) service -> + ('cfg1, 't1, 'flow1) service -> + (('cfg0, 'cfg1) refl * ('t0, 't1) refl * ('flow0, 'flow1) refl) option + (** [equal svc0 svc1 ] proves that [svc0] and [svc1] are + physically the same. For instance, [Conduit] asserts: + + {[ + let service = Service.register ~service:(module V) ;; + + let () = match Service.equal service service with + | Some (Refl, Refl, Refl) -> ... + | _ -> assert false + ]} *) + + val register : + service:('cfg, 't, 'v) impl -> + protocol:(_, 'v) protocol -> + ('cfg, 't, 'v) service + (** [register ~service ~protocool] is the service using the implementation [service] + bound with implementation of a [protocol]. [service] must define [make] and [accept] + function to be able to create server-side flows. + + For instance: + + {[ + module TCP_service : SERVICE with type configuration = Unix.sockaddr + and type t = Unix.file_descr + and type flow = Unix.file_descr + + let tcp_protocol = Conduit.register ~protocol:(module TCP_protocol) + let tcp_service : (Unix.sockaddr, Unix.file_descr, Unix.file_descr) Service.service = + Conduit.Service.register ~service:(module TCP_service) ~protocol:tcp_protocol + ]} + *) + + type error = [ `Msg of string ] + + val pp_error : error Fmt.t + + val init : + 'cfg -> service:('cfg, 't, 'v) service -> ('t, [> error ]) result io + (** [init cfg ~service] initialises the service with the + configuration [cfg]. *) + + val accept : + service:('cfg, 't, 'v) service -> 't -> (flow, [> error ]) result io + (** [accept service t] waits for a connection on the service [t]. The result + is a {i flow} connected to the client. *) + + val close : + service:('cfg, 't, 'v) service -> 't -> (unit, [> error ]) result io + (** [close ~service t] releases the resources associated to the server [t]. *) + + val pack : (_, _, 'v) service -> 'v -> flow + (** [pack service v] returns the abstracted value [v] as {!pack} does + for a given protocol {i witness} (bound with the given [service]). + It serves to abstract the flow created (and initialised) by the + service to a {!flow}. + + {[ + let handler (flow : Conduit.flow) = + Conduit.send flow "Hello World!" >>= fun _ -> + ... + + let run ~service cfg = + let module Service = Conduit.Service.impl service in + Service.init cfg >>? fun t -> + let rec loop t = + Service.accept t >>? fun flow -> + let flow = Conduit.Service.pack service flow in + async (fun () -> handler flow) ; loop t in + loop t + + let () = run ~service:tcp_service (localhost, 8080) + let () = run ~service:tls_service (certs, (localhost, 8080)) + ]} *) + + val impl : + ('cfg, 't, 'v) service -> + (module SERVICE + with type configuration = 'cfg + and type t = 't + and type flow = 'v) + (** [impl service] is [service]'s underlying implementation. *) + end +end diff --git a/src/core/sigs.ml b/src/core/sigs.ml deleted file mode 100644 index f0325550..00000000 --- a/src/core/sigs.ml +++ /dev/null @@ -1,109 +0,0 @@ -type 'x or_end_of_flow = [ `End_of_flow | `Input of 'x ] - -module type FLOW = sig - (** [FLOW] is the signature for flow clients. - - A [flow] is an abstract value over which I/O functions such as {!send}, - {!recv} and {!close} can be used. - - {[ - type input = bytes and output = string - type +'a s = 'a - - let process flow = - let buf = Bytes.create 0x1000 in - match Flow.recv flow buf with - | Ok (`Input len) -> - let str = Bytes.sub_string buf 0 len in - ignore (Flow.send flow str) - | _ -> failwith "Flow.recv" - ]} - - The given flow can be more complex than a simple TCP flow for example. It - can be wrapped into a TLS layer. However, the goal is to be able to implement - a protocol without such complexity. - *) - - type +'a io - - type flow - - (** {3 Input & Output.} - - Depending on the I/O model, the type for inputs and outputs can differ ; - for instance they could allow users the ability to define capabilities on - them such as {i read} or {i write} capabilities. - - However, in most of the current [Conduit] backends: - - {[ - type input = Cstruct.t - type output = Cstruct.t - ]} - *) - - type input - - and output - - (** {3 Errors.} *) - - type error - (** The type for errors. *) - - val pp_error : error Fmt.t - (** [pp_error] is the pretty-printer for {!error}. *) - - val recv : flow -> input -> (int or_end_of_flow, error) result io - (** [recv flow input] is [Ok (`Input len)] iff [len] bytes of data has been received from - the flow [flow] and copied in [input]. *) - - val send : flow -> output -> (int, error) result io - (** [send t output] is [Ok len] iff [len] bytes of data from [output] has been - sent over the flow [flow]. *) - - val close : flow -> (unit, error) result io - (** [close flow] closes [flow]. Subsequent calls to {!recv} on [flow] will - return [`End_of_flow]. Subsequent calls to {!send} on [t] will return an - [Error]. *) -end - -module type PROTOCOL = sig - include FLOW - - type endpoint - - val connect : endpoint -> (flow, error) result io -end - -module type SERVICE = sig - type +'a io - - type flow - - type t - - type error - - type configuration - - val init : configuration -> (t, error) result io - - val pp_error : error Fmt.t - - val accept : t -> (flow, error) result io - - val close : t -> (unit, error) result io -end - -module type IO = sig - type +'a t - - val bind : 'a t -> ('a -> 'b t) -> 'b t - - val return : 'a -> 'a t -end - -module type BUFFER = sig - type t -end diff --git a/tests/ping-pong/with_async.ml b/tests/ping-pong/with_async.ml index 6a2346a5..ae12793d 100644 --- a/tests/ping-pong/with_async.ml +++ b/tests/ping-pong/with_async.ml @@ -38,9 +38,10 @@ let resolve_tls_ping_pong = Conduit_async_tls.TCP.resolve ~port:9000 ~config let resolvers = - Conduit.empty - |> Conduit_async.add ~priority:10 tls_protocol resolve_tls_ping_pong - |> Conduit_async.add ~priority:20 tcp_protocol resolve_ping_pong + let open Conduit_async in + empty + |> add ~priority:10 tls_protocol resolve_tls_ping_pong + |> add ~priority:20 tcp_protocol resolve_ping_pong let localhost = Domain_name.(host_exn (of_string_exn "localhost")) diff --git a/tests/ping-pong/with_lwt.ml b/tests/ping-pong/with_lwt.ml index 40777470..4ae38ea3 100644 --- a/tests/ping-pong/with_lwt.ml +++ b/tests/ping-pong/with_lwt.ml @@ -33,9 +33,10 @@ let resolve_tls_ping_pong = Conduit_lwt_tls.TCP.resolve ~port:8000 ~config let resolvers = - Conduit.empty - |> Conduit_lwt.add ~priority:20 Conduit_lwt.TCP.protocol resolve_ping_pong - |> Conduit_lwt.add ~priority:10 tls_protocol resolve_tls_ping_pong + let open Conduit_lwt in + empty + |> add ~priority:20 TCP.protocol resolve_ping_pong + |> add ~priority:10 tls_protocol resolve_tls_ping_pong (* Run *)