From b390dfa51a02f514f00ffda4d83118b05666f8b1 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 15:15:51 -0400 Subject: [PATCH 01/10] docs: Add an example of receiving data --- bsconfig.json | 27 ++++++++-------------- examples/receiver.re | 54 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 17 deletions(-) create mode 100644 examples/receiver.re diff --git a/bsconfig.json b/bsconfig.json index 8389234..fa9c137 100644 --- a/bsconfig.json +++ b/bsconfig.json @@ -1,21 +1,14 @@ { + "bs-dependencies": [], "name": "bs-amqp-connection-manager", - "version": "0.1.0", - "sources": { - "dir" : "src", - "subdirs" : true - }, - "package-specs": { - "module": "commonjs", - "in-source": false - }, - "suffix": ".bs.js", - "bs-dependencies": [ - - ], - "warnings": { - "error" : "+101" - }, "namespace": false, - "refmt": 3 + "package-specs": {"in-source": false, "module": "commonjs"}, + "refmt": 3, + "sources": [ + {"dir": "src", "subdirs": true}, + {"dir": "examples", "type": "dev"} + ], + "suffix": ".bs.js", + "version": "0.1.0", + "warnings": {"error": "+101"} } diff --git a/examples/receiver.re b/examples/receiver.re new file mode 100644 index 0000000..0c31976 --- /dev/null +++ b/examples/receiver.re @@ -0,0 +1,54 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/receiver.js + * */ + +module Amqp = AmqpConnectionManager; + +let queue_name = "amqp-connection-manager-sample1"; + +// Handle an incomming message. +let onMessage = (channel, msg: Amqp.Queue.message) => { + let message = msg.content->Node.Buffer.toString->Js.Json.parseExn; + Js.Console.log2("receiver: got message", message); + Amqp.Channel.ack(channel, msg); +}; + +// Create a connetion manager +let connection = + Amqp.connect( + [|"amqp://my-amqp"|], + ~options={"heartbeatIntervalInSeconds": 5}, + (), + ); + +Amqp.AmqpConnectionManager.on( + connection, + `disconnect(err => Js.Console.error(err)), +) +|> ignore; + +Amqp.AmqpConnectionManager.on( + connection, + `connect(_ => Js.Console.info("connected!")), +) +|> ignore; + +// Set up a channel listening for messages in the queue. +let channelWrapper = + Amqp.AmqpConnectionManager.createChannel( + connection, + { + "setup": channel => + // `channel` here is a regular amqplib `ConfirmChannel`. + Js.Promise.( + all([| + Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) + |> then_(_ => resolve()), + Amqp.Channel.prefetch(channel, 1), + Amqp.Channel.consume(channel, queue_name, onMessage(channel)), + |]) + |> then_(_ => resolve()) + ), + }, + ); From 63b59165bd3e17f3cc8f77bbf51ae73a234887e2 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:02:12 -0400 Subject: [PATCH 02/10] docs: Add an example of sending data --- examples/sender.re | 69 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 examples/sender.re diff --git a/examples/sender.re b/examples/sender.re new file mode 100644 index 0000000..1e04a0a --- /dev/null +++ b/examples/sender.re @@ -0,0 +1,69 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/sender.js + * */ + +module Amqp = AmqpConnectionManager; +[@bs.val] external setTimeout: (unit => unit, int) => int = "setTimeout"; + +let queue_name = "amqp-connection-manager-sample1"; + +// Create a connetion manager +let connection = Amqp.connect([|"amqp://localhost"|], ()); + +Amqp.AmqpConnectionManager.on( + connection, + `disconnect(err => Js.Console.error(err)), +) +|> ignore; + +Amqp.AmqpConnectionManager.on( + connection, + `connect(_ => Js.Console.info("connected!")), +) +|> ignore; + +// Set up a channel listening for messages in the queue. +let channelWrapper = + Amqp.AmqpConnectionManager.createChannel( + connection, + { + "json": true, + "setup": channel => + // `channel` here is a regular amqplib `ConfirmChannel`. + Js.Promise.( + all([| + Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) + |> then_(_ => resolve()), + |]) + |> then_(_ => resolve()) + ), + }, + ); + +// Send messages until someone hits CTRL-C or something goes wrong... +let rec sendMessage = () => { + Amqp.ChannelWrapper.sendToQueue( + channelWrapper, + queue_name, + {"time": Js.Date.now()}, + Js.Obj.empty(), + ) + |> Js.Promise.then_(msg => { + Js.Console.info("Message sent"); + Js.Promise.make((~resolve, ~reject as _) => + setTimeout(() => resolve(. msg), 1000) |> ignore + ); + }) + |> Js.Promise.then_(_ => sendMessage()) + |> Js.Promise.catch(err => { + Js.Console.error(err); + Amqp.ChannelWrapper.close(channelWrapper); + Amqp.AmqpConnectionManager.close(connection); + + Js.Promise.resolve(); + }); +}; + +Js.Console.info("Sending messages..."); +sendMessage(); From 144732d222b04460b44c1599d2344064595750f3 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:02:27 -0400 Subject: [PATCH 03/10] docs: Add an example of pubsub-publisher --- examples/pubsub_publisher.re | 64 ++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 examples/pubsub_publisher.re diff --git a/examples/pubsub_publisher.re b/examples/pubsub_publisher.re new file mode 100644 index 0000000..6750d12 --- /dev/null +++ b/examples/pubsub_publisher.re @@ -0,0 +1,64 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-publisher.js */ + +module Amqp = AmqpConnectionManager; +[@bs.val] external setTimeout: (unit => unit, int) => int = "setTimeout"; + +let exchange_name = "amqp-connection-manager-sample2-ex"; + +// Create a connetion manager +let connection = Amqp.connect([|"amqp://localhost"|], ()); +Amqp.AmqpConnectionManager.on( + connection, + `connect(_ => Js.Console.info("Connected!")), +); +Amqp.AmqpConnectionManager.on( + connection, + `disconnect(err => Js.Console.error(err)), +); + +// Create a channel wrapper +let channelWrapper = + Amqp.AmqpConnectionManager.createChannel( + connection, + { + "json": true, + "setup": channel => + Amqp.Channel.assertExchange( + channel, + exchange_name, + "topic", + Js.Obj.empty(), + ) + |> Js.Promise.then_(_ => Js.Promise.resolve()), + }, + ); + +// Send messages until someone hits CTRL-C or something goes wrong... +let rec sendMessage = () => { + Amqp.ChannelWrapper.publish( + channelWrapper, + exchange_name, + "", + {"time": Js.Date.now()}, + Js.Obj.empty(), + ) + |> Js.Promise.then_(msg => { + Js.Console.info("Message sent"); + Js.Promise.make((~resolve, ~reject as _) => + setTimeout(() => resolve(. msg), 1000) |> ignore + ); + }) + |> Js.Promise.then_(_ => sendMessage()) + |> Js.Promise.catch(err => { + Js.Console.error(err); + Amqp.ChannelWrapper.close(channelWrapper); + Amqp.AmqpConnectionManager.close(connection); + + Js.Promise.resolve(); + }); +}; + +Js.Console.info("Sending messages..."); +sendMessage(); From 932b15da28692163473265e13c307bf527516237 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:04:00 -0400 Subject: [PATCH 04/10] docs: remove optional `connect` options --- examples/receiver.re | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/examples/receiver.re b/examples/receiver.re index 0c31976..33fb9d9 100644 --- a/examples/receiver.re +++ b/examples/receiver.re @@ -15,12 +15,7 @@ let onMessage = (channel, msg: Amqp.Queue.message) => { }; // Create a connetion manager -let connection = - Amqp.connect( - [|"amqp://my-amqp"|], - ~options={"heartbeatIntervalInSeconds": 5}, - (), - ); +let connection = Amqp.connect([|"amqp://localhost"|], ()); Amqp.AmqpConnectionManager.on( connection, From 9ae93886ddf963474d10b33877f6d9454501ce14 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:15:23 -0400 Subject: [PATCH 05/10] docs: Add an example of pubsub_subscriber --- examples/pubsub_publisher.re | 2 +- examples/pubsub_subscriber.re | 63 +++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 examples/pubsub_subscriber.re diff --git a/examples/pubsub_publisher.re b/examples/pubsub_publisher.re index 6750d12..0bed663 100644 --- a/examples/pubsub_publisher.re +++ b/examples/pubsub_publisher.re @@ -42,7 +42,7 @@ let rec sendMessage = () => { exchange_name, "", {"time": Js.Date.now()}, - Js.Obj.empty(), + {"contentType": "application/json", "persistent": true}, ) |> Js.Promise.then_(msg => { Js.Console.info("Message sent"); diff --git a/examples/pubsub_subscriber.re b/examples/pubsub_subscriber.re new file mode 100644 index 0000000..c1bf066 --- /dev/null +++ b/examples/pubsub_subscriber.re @@ -0,0 +1,63 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-subscriber.js */ + +module Amqp = AmqpConnectionManager; + +let queue_name = "amqp-connection-manager-sample2"; +let exchange_name = "amqp-connection-manager-sample2-ex"; + +// Handle an incomming message. +let onMessage = (channel, msg: Amqp.Queue.message) => { + let message = msg.content->Node.Buffer.toString->Js.Json.parseExn; + Js.Console.log2("receiver: got message", message); + Amqp.Channel.ack(channel, msg); +}; + +// Create a connetion manager +let connection = Amqp.connect([|"amqp://localhost"|], ()); +Amqp.AmqpConnectionManager.on( + connection, + `connect(_ => Js.Console.info("Connected!")), +); +Amqp.AmqpConnectionManager.on( + connection, + `disconnect(err => Js.Console.error(err)), +); + +// Set up a channel listening for messages in the queue. +let channelWrapper = + Amqp.AmqpConnectionManager.createChannel( + connection, + { + "setup": channel => + // `channel` here is a regular amqplib `ConfirmChannel`. + Js.Promise.( + all([| + Amqp.Channel.assertQueue( + channel, + queue_name, + {"exclusive": true, "autoDelete": true, "durable": false}, + ) + |> then_(_ => resolve()), + Amqp.Channel.assertExchange( + channel, + exchange_name, + "topic", + Js.Obj.empty(), + ) + |> then_(_ => resolve()), + Amqp.Channel.prefetch(channel, 1), + Amqp.Channel.bindQueue(channel, queue_name, exchange_name, ""), + Amqp.Channel.consume(channel, queue_name, onMessage(channel)), + |]) + |> then_(_ => resolve()) + ), + }, + ); + +Amqp.ChannelWrapper.waitForConnect(channelWrapper) +|> Js.Promise.then_(_ => { + Js.Console.info("Listening for messages"); + Js.Promise.resolve(); + }); From 0ebdfe35a798fd749d2b5d9523e1d8ae07918627 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:17:10 -0400 Subject: [PATCH 06/10] docs: Add a some comment docs to bindings --- src/AmqpConnectionManager.rei | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/AmqpConnectionManager.rei b/src/AmqpConnectionManager.rei index b647556..a957ff7 100644 --- a/src/AmqpConnectionManager.rei +++ b/src/AmqpConnectionManager.rei @@ -77,8 +77,13 @@ module AmqpConnectionManager: { type t; module Options: {type t('connectionOptions) = Js.t('connectionOptions);}; + /** Returns true if the AmqpConnectionManager is connected to a broker, false + * otherwise. */ let isConnected: t => bool; + + /** Close this AmqpConnectionManager and free all associated resources. */ let close: t => unit; + let on: ( t, @@ -96,9 +101,15 @@ module AmqpConnectionManager: { ) => t; - let createChannel: (t, Channel.Config.t) => ChannelWrapper.t; + /** Create a new ChannelWrapper. This is a proxy for the actual channel (which + * may or may not exist at any moment, depending on whether or not we are + * currently connected.) */ + let createChannel: (t, Channel.Config.t('a)) => ChannelWrapper.t; }; +/** Creates a new AmqpConnectionManager, which will connect to one of the URLs + * provided in `urls`. If a broker is unreachable or dies, then + * AmqpConnectionManager will try the next available broker, round-robin. */ let connect: (urls, ~options: AmqpConnectionManager.Options.t('a)=?, unit) => AmqpConnectionManager.t; From feba6066514b838d89d2c77d8c1de014009beafd Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:17:57 -0400 Subject: [PATCH 07/10] fix(types): Add type for `name`s --- src/AmqpConnectionManager.re | 6 +++--- src/AmqpConnectionManager.rei | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/AmqpConnectionManager.re b/src/AmqpConnectionManager.re index 285dacc..ac2d7ae 100644 --- a/src/AmqpConnectionManager.re +++ b/src/AmqpConnectionManager.re @@ -9,12 +9,12 @@ module Queue = { }; module Exchange = { - type name; + type name = string; }; module Channel = { type t; - type name; + type name = string; type ack = Queue.message => unit; type nack = Queue.message => unit; @@ -59,7 +59,7 @@ module Channel = { module ChannelWrapper = { type t; - type name; + type name = string; type routingKey = string; type ack = Queue.message => unit; type nack = Queue.message => unit; diff --git a/src/AmqpConnectionManager.rei b/src/AmqpConnectionManager.rei index a957ff7..0afced5 100644 --- a/src/AmqpConnectionManager.rei +++ b/src/AmqpConnectionManager.rei @@ -4,15 +4,15 @@ type urls = array(url); exception ConnectionError(Js.Exn.t); module Queue: { - type name; + type name = string; type message = {content: Node.Buffer.t}; }; -module Exchange: {type name;}; +module Exchange: {type name = string;}; module Channel: { type t; - type name; + type name = string; type ack = Queue.message => unit; type nack = Queue.message => unit; @@ -44,7 +44,7 @@ module Channel: { module ChannelWrapper: { type t; - type name; + type name = string; type routingKey = string; type ack = Queue.message => unit; type nack = Queue.message => unit; From aba40e7001933a47bfb0d884f83e55225367e4e7 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:18:50 -0400 Subject: [PATCH 08/10] feat(ChannelWrapper): Add `waitForConnect` binding --- src/AmqpConnectionManager.re | 2 ++ src/AmqpConnectionManager.rei | 1 + 2 files changed, 3 insertions(+) diff --git a/src/AmqpConnectionManager.re b/src/AmqpConnectionManager.re index ac2d7ae..32a2477 100644 --- a/src/AmqpConnectionManager.re +++ b/src/AmqpConnectionManager.re @@ -68,6 +68,8 @@ module ChannelWrapper = { [@bs.send] external nack: (t, Queue.message) => unit = "nack"; [@bs.send] external queueLength: t => int = "queueLength"; [@bs.send] external close: t => unit = "close"; + [@bs.send] + external waitForConnect: t => Js.Promise.t(unit) = "waitForConnect"; [@bs.send] external on: diff --git a/src/AmqpConnectionManager.rei b/src/AmqpConnectionManager.rei index 0afced5..7ca6dba 100644 --- a/src/AmqpConnectionManager.rei +++ b/src/AmqpConnectionManager.rei @@ -53,6 +53,7 @@ module ChannelWrapper: { let nack: t => nack; let queueLength: t => int; let close: t => unit; + let waitForConnect: t => Js.Promise.t(unit); let on: ( From 19c97e37c372f4fc60037fb4ab725ba46c1ba228 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:20:28 -0400 Subject: [PATCH 09/10] refactor: Don't require `Channel.Config.t`'s `"json"` prop --- src/AmqpConnectionManager.re | 8 ++------ src/AmqpConnectionManager.rei | 6 +----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/AmqpConnectionManager.re b/src/AmqpConnectionManager.re index 32a2477..f78aafa 100644 --- a/src/AmqpConnectionManager.re +++ b/src/AmqpConnectionManager.re @@ -22,11 +22,7 @@ module Channel = { [@bs.send] external nack: (t, Queue.message) => unit = "nack"; module Config = { - type nonrec t = { - . - "json": bool, - "setup": t => Js.Promise.t(unit), - }; + type nonrec t('a) = {.. "setup": t => Js.Promise.t(unit)} as 'a; }; [@bs.send] @@ -130,7 +126,7 @@ module AmqpConnectionManager = { "on"; [@bs.send] - external createChannel: (t, Channel.Config.t) => ChannelWrapper.t = + external createChannel: (t, Channel.Config.t('a)) => ChannelWrapper.t = "createChannel"; }; diff --git a/src/AmqpConnectionManager.rei b/src/AmqpConnectionManager.rei index 7ca6dba..c100423 100644 --- a/src/AmqpConnectionManager.rei +++ b/src/AmqpConnectionManager.rei @@ -20,11 +20,7 @@ module Channel: { let nack: t => nack; module Config: { - type nonrec t = { - . - "json": bool, - "setup": t => Js.Promise.t(unit), - }; + type nonrec t('a) = {.. "setup": t => Js.Promise.t(unit)} as 'a; }; let assertExchange: From 003b83c3186be564c5e0c3ce2833aa496f0a9a83 Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Tue, 17 Mar 2020 16:21:12 -0400 Subject: [PATCH 10/10] feat(publish): Less strictness regarding outgoing message types --- src/AmqpConnectionManager.re | 4 ++-- src/AmqpConnectionManager.rei | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/AmqpConnectionManager.re b/src/AmqpConnectionManager.re index f78aafa..69bf7b6 100644 --- a/src/AmqpConnectionManager.re +++ b/src/AmqpConnectionManager.re @@ -82,7 +82,7 @@ module ChannelWrapper = { [@bs.send] external publish': - (t, Exchange.name, routingKey, Js.Json.t, Js.t('options)) => + (t, Exchange.name, routingKey, 'message, Js.t('options)) => Js.Promise.t(unit) = "publish"; @@ -91,7 +91,7 @@ module ChannelWrapper = { [@bs.send] external sendToQueue': - (t, Queue.name, Js.Json.t, Js.t('options)) => Js.Promise.t(unit) = + (t, Queue.name, 'message, Js.t('options)) => Js.Promise.t(unit) = "sendToQueue"; let sendToQueue = (t, q, m, o) => diff --git a/src/AmqpConnectionManager.rei b/src/AmqpConnectionManager.rei index c100423..fb21d8c 100644 --- a/src/AmqpConnectionManager.rei +++ b/src/AmqpConnectionManager.rei @@ -63,11 +63,11 @@ module ChannelWrapper: { t; let publish: - (t, Exchange.name, routingKey, Js.Json.t, Js.t('options)) => - Js.Promise.t(Js.Json.t); + (t, Exchange.name, routingKey, 'message, Js.t('options)) => + Js.Promise.t('message); let sendToQueue: - (t, Queue.name, Js.Json.t, Js.t('options)) => Js.Promise.t(Js.Json.t); + (t, Queue.name, 'message, Js.t('options)) => Js.Promise.t('message); }; module AmqpConnectionManager: {