Skip to content

Commit

Permalink
Merge pull request #3 from amsross/docs-examples
Browse files Browse the repository at this point in the history
Docs examples
  • Loading branch information
amsross authored Mar 17, 2020
2 parents 53aeef6 + 003b83c commit f94a46d
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 41 deletions.
27 changes: 10 additions & 17 deletions bsconfig.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
{
"bs-dependencies": [],
"name": "bs-amqp-connection-manager",
"version": "0.1.0",
"sources": {
"dir" : "src",
"subdirs" : true
},
"package-specs": {
"module": "commonjs",
"in-source": false
},
"suffix": ".bs.js",
"bs-dependencies": [

],
"warnings": {
"error" : "+101"
},
"namespace": false,
"refmt": 3
"package-specs": {"in-source": false, "module": "commonjs"},
"refmt": 3,
"sources": [
{"dir": "src", "subdirs": true},
{"dir": "examples", "type": "dev"}
],
"suffix": ".bs.js",
"version": "0.1.0",
"warnings": {"error": "+101"}
}
64 changes: 64 additions & 0 deletions examples/pubsub_publisher.re
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* this is pretty much the same as the examples provided in
* node-amqp-connection-manager
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-publisher.js */

module Amqp = AmqpConnectionManager;
[@bs.val] external setTimeout: (unit => unit, int) => int = "setTimeout";

let exchange_name = "amqp-connection-manager-sample2-ex";

// Create a connetion manager
let connection = Amqp.connect([|"amqp://localhost"|], ());
Amqp.AmqpConnectionManager.on(
connection,
`connect(_ => Js.Console.info("Connected!")),
);
Amqp.AmqpConnectionManager.on(
connection,
`disconnect(err => Js.Console.error(err)),
);

// Create a channel wrapper
let channelWrapper =
Amqp.AmqpConnectionManager.createChannel(
connection,
{
"json": true,
"setup": channel =>
Amqp.Channel.assertExchange(
channel,
exchange_name,
"topic",
Js.Obj.empty(),
)
|> Js.Promise.then_(_ => Js.Promise.resolve()),
},
);

// Send messages until someone hits CTRL-C or something goes wrong...
let rec sendMessage = () => {
Amqp.ChannelWrapper.publish(
channelWrapper,
exchange_name,
"",
{"time": Js.Date.now()},
{"contentType": "application/json", "persistent": true},
)
|> Js.Promise.then_(msg => {
Js.Console.info("Message sent");
Js.Promise.make((~resolve, ~reject as _) =>
setTimeout(() => resolve(. msg), 1000) |> ignore
);
})
|> Js.Promise.then_(_ => sendMessage())
|> Js.Promise.catch(err => {
Js.Console.error(err);
Amqp.ChannelWrapper.close(channelWrapper);
Amqp.AmqpConnectionManager.close(connection);

Js.Promise.resolve();
});
};

Js.Console.info("Sending messages...");
sendMessage();
63 changes: 63 additions & 0 deletions examples/pubsub_subscriber.re
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/* this is pretty much the same as the examples provided in
* node-amqp-connection-manager
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-subscriber.js */

module Amqp = AmqpConnectionManager;

let queue_name = "amqp-connection-manager-sample2";
let exchange_name = "amqp-connection-manager-sample2-ex";

// Handle an incomming message.
let onMessage = (channel, msg: Amqp.Queue.message) => {
let message = msg.content->Node.Buffer.toString->Js.Json.parseExn;
Js.Console.log2("receiver: got message", message);
Amqp.Channel.ack(channel, msg);
};

// Create a connetion manager
let connection = Amqp.connect([|"amqp://localhost"|], ());
Amqp.AmqpConnectionManager.on(
connection,
`connect(_ => Js.Console.info("Connected!")),
);
Amqp.AmqpConnectionManager.on(
connection,
`disconnect(err => Js.Console.error(err)),
);

// Set up a channel listening for messages in the queue.
let channelWrapper =
Amqp.AmqpConnectionManager.createChannel(
connection,
{
"setup": channel =>
// `channel` here is a regular amqplib `ConfirmChannel`.
Js.Promise.(
all([|
Amqp.Channel.assertQueue(
channel,
queue_name,
{"exclusive": true, "autoDelete": true, "durable": false},
)
|> then_(_ => resolve()),
Amqp.Channel.assertExchange(
channel,
exchange_name,
"topic",
Js.Obj.empty(),
)
|> then_(_ => resolve()),
Amqp.Channel.prefetch(channel, 1),
Amqp.Channel.bindQueue(channel, queue_name, exchange_name, ""),
Amqp.Channel.consume(channel, queue_name, onMessage(channel)),
|])
|> then_(_ => resolve())
),
},
);

Amqp.ChannelWrapper.waitForConnect(channelWrapper)
|> Js.Promise.then_(_ => {
Js.Console.info("Listening for messages");
Js.Promise.resolve();
});
49 changes: 49 additions & 0 deletions examples/receiver.re
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* this is pretty much the same as the examples provided in
* node-amqp-connection-manager
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/receiver.js
* */

module Amqp = AmqpConnectionManager;

let queue_name = "amqp-connection-manager-sample1";

// Handle an incomming message.
let onMessage = (channel, msg: Amqp.Queue.message) => {
let message = msg.content->Node.Buffer.toString->Js.Json.parseExn;
Js.Console.log2("receiver: got message", message);
Amqp.Channel.ack(channel, msg);
};

// Create a connetion manager
let connection = Amqp.connect([|"amqp://localhost"|], ());

Amqp.AmqpConnectionManager.on(
connection,
`disconnect(err => Js.Console.error(err)),
)
|> ignore;

Amqp.AmqpConnectionManager.on(
connection,
`connect(_ => Js.Console.info("connected!")),
)
|> ignore;

// Set up a channel listening for messages in the queue.
let channelWrapper =
Amqp.AmqpConnectionManager.createChannel(
connection,
{
"setup": channel =>
// `channel` here is a regular amqplib `ConfirmChannel`.
Js.Promise.(
all([|
Amqp.Channel.assertQueue(channel, queue_name, {"durable": true})
|> then_(_ => resolve()),
Amqp.Channel.prefetch(channel, 1),
Amqp.Channel.consume(channel, queue_name, onMessage(channel)),
|])
|> then_(_ => resolve())
),
},
);
69 changes: 69 additions & 0 deletions examples/sender.re
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* this is pretty much the same as the examples provided in
* node-amqp-connection-manager
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/sender.js
* */

module Amqp = AmqpConnectionManager;
[@bs.val] external setTimeout: (unit => unit, int) => int = "setTimeout";

let queue_name = "amqp-connection-manager-sample1";

// Create a connetion manager
let connection = Amqp.connect([|"amqp://localhost"|], ());

Amqp.AmqpConnectionManager.on(
connection,
`disconnect(err => Js.Console.error(err)),
)
|> ignore;

Amqp.AmqpConnectionManager.on(
connection,
`connect(_ => Js.Console.info("connected!")),
)
|> ignore;

// Set up a channel listening for messages in the queue.
let channelWrapper =
Amqp.AmqpConnectionManager.createChannel(
connection,
{
"json": true,
"setup": channel =>
// `channel` here is a regular amqplib `ConfirmChannel`.
Js.Promise.(
all([|
Amqp.Channel.assertQueue(channel, queue_name, {"durable": true})
|> then_(_ => resolve()),
|])
|> then_(_ => resolve())
),
},
);

// Send messages until someone hits CTRL-C or something goes wrong...
let rec sendMessage = () => {
Amqp.ChannelWrapper.sendToQueue(
channelWrapper,
queue_name,
{"time": Js.Date.now()},
Js.Obj.empty(),
)
|> Js.Promise.then_(msg => {
Js.Console.info("Message sent");
Js.Promise.make((~resolve, ~reject as _) =>
setTimeout(() => resolve(. msg), 1000) |> ignore
);
})
|> Js.Promise.then_(_ => sendMessage())
|> Js.Promise.catch(err => {
Js.Console.error(err);
Amqp.ChannelWrapper.close(channelWrapper);
Amqp.AmqpConnectionManager.close(connection);

Js.Promise.resolve();
});
};

Js.Console.info("Sending messages...");
sendMessage();
20 changes: 9 additions & 11 deletions src/AmqpConnectionManager.re
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,20 @@ module Queue = {
};

module Exchange = {
type name;
type name = string;
};

module Channel = {
type t;
type name;
type name = string;
type ack = Queue.message => unit;
type nack = Queue.message => unit;

[@bs.send] external ack: (t, Queue.message) => unit = "ack";
[@bs.send] external nack: (t, Queue.message) => unit = "nack";

module Config = {
type nonrec t = {
.
"json": bool,
"setup": t => Js.Promise.t(unit),
};
type nonrec t('a) = {.. "setup": t => Js.Promise.t(unit)} as 'a;
};

[@bs.send]
Expand Down Expand Up @@ -59,7 +55,7 @@ module Channel = {

module ChannelWrapper = {
type t;
type name;
type name = string;
type routingKey = string;
type ack = Queue.message => unit;
type nack = Queue.message => unit;
Expand All @@ -68,6 +64,8 @@ module ChannelWrapper = {
[@bs.send] external nack: (t, Queue.message) => unit = "nack";
[@bs.send] external queueLength: t => int = "queueLength";
[@bs.send] external close: t => unit = "close";
[@bs.send]
external waitForConnect: t => Js.Promise.t(unit) = "waitForConnect";

[@bs.send]
external on:
Expand All @@ -84,7 +82,7 @@ module ChannelWrapper = {

[@bs.send]
external publish':
(t, Exchange.name, routingKey, Js.Json.t, Js.t('options)) =>
(t, Exchange.name, routingKey, 'message, Js.t('options)) =>
Js.Promise.t(unit) =
"publish";

Expand All @@ -93,7 +91,7 @@ module ChannelWrapper = {

[@bs.send]
external sendToQueue':
(t, Queue.name, Js.Json.t, Js.t('options)) => Js.Promise.t(unit) =
(t, Queue.name, 'message, Js.t('options)) => Js.Promise.t(unit) =
"sendToQueue";

let sendToQueue = (t, q, m, o) =>
Expand Down Expand Up @@ -128,7 +126,7 @@ module AmqpConnectionManager = {
"on";

[@bs.send]
external createChannel: (t, Channel.Config.t) => ChannelWrapper.t =
external createChannel: (t, Channel.Config.t('a)) => ChannelWrapper.t =
"createChannel";
};

Expand Down
Loading

0 comments on commit f94a46d

Please sign in to comment.