Skip to content

Commit ff7bda4

Browse files
authored
Add message ordering samples (GoogleCloudPlatform#220)
* Initial commit. * Add unit test. * Add message ordering sample. * Rename a sample. * Address comments.
1 parent 1217387 commit ff7bda4

File tree

13 files changed

+282
-38
lines changed

13 files changed

+282
-38
lines changed

bigquery/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START bigquery_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google BigQuery
1918
const bigquery = require('@google-cloud/bigquery')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

datastore/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START datastore_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google Cloud Datastore
1918
const datastore = require('@google-cloud/datastore')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

logging/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START logging_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Stackdriver Logging
1918
const logging = require('@google-cloud/logging')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

pubsub/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"yargs": "^5.0.0"
1414
},
1515
"devDependencies": {
16+
"async": "^2.0.1",
1617
"mocha": "^3.0.2",
1718
"node-uuid": "^1.4.7"
1819
},

pubsub/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START pubsub_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google Cloud Pub/Sub
1918
const pubsub = require('@google-cloud/pubsub')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

pubsub/subscriptions.js

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323

2424
'use strict';
2525

26-
const pubsubClient = require(`@google-cloud/pubsub`)();
26+
const PubSub = require(`@google-cloud/pubsub`);
2727

2828
// [START pubsub_list_subscriptions]
2929
function listSubscriptions (callback) {
30+
// Instantiates the client library
31+
const pubsubClient = PubSub();
32+
3033
// Lists all subscriptions in the current project
3134
pubsubClient.getSubscriptions((err, subscriptions) => {
3235
if (err) {
@@ -43,6 +46,9 @@ function listSubscriptions (callback) {
4346

4447
// [START pubsub_list_topic_subscriptions]
4548
function listTopicSubscriptions (topicName, callback) {
49+
// Instantiates the client library
50+
const pubsubClient = PubSub();
51+
4652
// References an existing topic, e.g. "my-topic"
4753
const topic = pubsubClient.topic(topicName);
4854

@@ -62,6 +68,9 @@ function listTopicSubscriptions (topicName, callback) {
6268

6369
// [START pubsub_create_subscription]
6470
function createSubscription (topicName, subscriptionName, callback) {
71+
// Instantiates the client library
72+
const pubsubClient = PubSub();
73+
6574
// References an existing topic, e.g. "my-topic"
6675
const topic = pubsubClient.topic(topicName);
6776

@@ -80,16 +89,18 @@ function createSubscription (topicName, subscriptionName, callback) {
8089

8190
// [START pubsub_create_push_subscription]
8291
function createPushSubscription (topicName, subscriptionName, callback) {
92+
// Instantiates the client library
93+
const pubsubClient = PubSub();
94+
8395
// References an existing topic, e.g. "my-topic"
8496
const topic = pubsubClient.topic(topicName);
85-
const projectId = process.env.GCLOUD_PROJECT || 'YOU_PROJECT_ID';
8697

8798
// Creates a new push subscription, e.g. "my-new-subscription"
8899
topic.subscribe(subscriptionName, {
89100
pushConfig: {
90101
// Set to an HTTPS endpoint of your choice. If necessary, register
91102
// (authorize) the domain on which the server is hosted.
92-
pushEndpoint: `https://${projectId}.appspot.com/push`
103+
pushEndpoint: `https://${pubsubClient.projectId}.appspot.com/push`
93104
}
94105
}, (err, subscription) => {
95106
if (err) {
@@ -105,6 +116,9 @@ function createPushSubscription (topicName, subscriptionName, callback) {
105116

106117
// [START pubsub_delete_subscription]
107118
function deleteSubscription (subscriptionName, callback) {
119+
// Instantiates the client library
120+
const pubsubClient = PubSub();
121+
108122
// References an existing subscription, e.g. "my-subscription"
109123
const subscription = pubsubClient.subscription(subscriptionName);
110124

@@ -121,8 +135,11 @@ function deleteSubscription (subscriptionName, callback) {
121135
}
122136
// [END pubsub_delete_subscription]
123137

124-
// [START pubsub_get_subscription_metadata]
125-
function getSubscriptionMetadata (subscriptionName, callback) {
138+
// [START pubsub_get_subscription]
139+
function getSubscription (subscriptionName, callback) {
140+
// Instantiates the client library
141+
const pubsubClient = PubSub();
142+
126143
// References an existing subscription, e.g. "my-subscription"
127144
const subscription = pubsubClient.subscription(subscriptionName);
128145

@@ -140,10 +157,13 @@ function getSubscriptionMetadata (subscriptionName, callback) {
140157
callback();
141158
});
142159
}
143-
// [END pubsub_get_subscription_metadata]
160+
// [END pubsub_get_subscription]
144161

145162
// [START pubsub_pull_messages]
146163
function pullMessages (subscriptionName, callback) {
164+
// Instantiates the client library
165+
const pubsubClient = PubSub();
166+
147167
// References an existing subscription, e.g. "my-subscription"
148168
const subscription = pubsubClient.subscription(subscriptionName);
149169

@@ -168,8 +188,73 @@ function pullMessages (subscriptionName, callback) {
168188
}
169189
// [END pubsub_pull_messages]
170190

191+
let subscribeCounterValue = 1;
192+
193+
function getSubscribeCounterValue () {
194+
return subscribeCounterValue;
195+
}
196+
197+
function setSubscribeCounterValue (value) {
198+
subscribeCounterValue = value;
199+
}
200+
201+
// [START pubsub_pull_ordered_messages]
202+
const outstandingMessages = {};
203+
204+
function pullOrderedMessages (subscriptionName, callback) {
205+
// Instantiates the client library
206+
const pubsubClient = PubSub();
207+
208+
// References an existing subscription, e.g. "my-subscription"
209+
const subscription = pubsubClient.subscription(subscriptionName);
210+
211+
// Pulls messages. Set returnImmediately to false to block until messages are
212+
// received.
213+
subscription.pull({ returnImmediately: true }, (err, messages) => {
214+
if (err) {
215+
callback(err);
216+
return;
217+
}
218+
219+
// Pub/Sub messages are unordered, so here we manually order messages by
220+
// their "counterId" attribute which was set when they were published.
221+
messages.forEach((message) => {
222+
outstandingMessages[message.attributes.counterId] = message;
223+
});
224+
225+
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => +counterId);
226+
outstandingIds.sort();
227+
228+
outstandingIds.forEach((counterId) => {
229+
const counter = getSubscribeCounterValue();
230+
const message = outstandingMessages[counterId];
231+
232+
if (counterId < counter) {
233+
// The message has already been processed
234+
subscription.ack(message.ackId);
235+
delete outstandingMessages[counterId];
236+
} else if (counterId === counter) {
237+
// Process the message
238+
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
239+
240+
setSubscribeCounterValue(counterId + 1);
241+
subscription.ack(message.ackId);
242+
delete outstandingMessages[counterId];
243+
} else {
244+
// Have not yet processed the message on which this message is dependent
245+
return false;
246+
}
247+
});
248+
callback();
249+
});
250+
}
251+
// [END pubsub_pull_ordered_messages]
252+
171253
// [START pubsub_get_subscription_policy]
172254
function getSubscriptionPolicy (subscriptionName, callback) {
255+
// Instantiates the client library
256+
const pubsubClient = PubSub();
257+
173258
// References an existing subscription, e.g. "my-subscription"
174259
const subscription = pubsubClient.subscription(subscriptionName);
175260

@@ -188,6 +273,9 @@ function getSubscriptionPolicy (subscriptionName, callback) {
188273

189274
// [START pubsub_set_subscription_policy]
190275
function setSubscriptionPolicy (subscriptionName, callback) {
276+
// Instantiates the client library
277+
const pubsubClient = PubSub();
278+
191279
// References an existing subscription, e.g. "my-subscription"
192280
const subscription = pubsubClient.subscription(subscriptionName);
193281

@@ -222,6 +310,9 @@ function setSubscriptionPolicy (subscriptionName, callback) {
222310

223311
// [START pubsub_test_subscription_permissions]
224312
function testSubscriptionPermissions (subscriptionName, callback) {
313+
// Instantiates the client library
314+
const pubsubClient = PubSub();
315+
225316
// References an existing subscription, e.g. "my-subscription"
226317
const subscription = pubsubClient.subscription(subscriptionName);
227318

@@ -253,8 +344,9 @@ const program = module.exports = {
253344
createSubscription: createSubscription,
254345
createPushSubscription: createPushSubscription,
255346
deleteSubscription: deleteSubscription,
256-
getSubscriptionMetadata: getSubscriptionMetadata,
347+
getSubscription: getSubscription,
257348
pullMessages: pullMessages,
349+
pullOrderedMessages: pullOrderedMessages,
258350
getSubscriptionPolicy: getSubscriptionPolicy,
259351
setSubscriptionPolicy: setSubscriptionPolicy,
260352
testSubscriptionPermissions: testSubscriptionPermissions,
@@ -283,7 +375,7 @@ cli
283375
program.deleteSubscription(options.subscriptionName, makeHandler(false));
284376
})
285377
.command(`get <subscriptionName>`, `Gets the metadata for a subscription.`, {}, (options) => {
286-
program.getSubscriptionMetadata(options.subscriptionName, makeHandler(false));
378+
program.getSubscription(options.subscriptionName, makeHandler(false));
287379
})
288380
.command(`pull <subscriptionName>`, `Pulls messages for a subscription.`, {}, (options) => {
289381
program.pullMessages(options.subscriptionName, makeHandler(false));

pubsub/system-test/subscriptions.test.js

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
'use strict';
1515

16+
const async = require(`async`);
1617
const pubsub = require(`@google-cloud/pubsub`)();
1718
const uuid = require(`node-uuid`);
1819
const path = require(`path`);
@@ -108,10 +109,52 @@ describe(`pubsub:subscriptions`, () => {
108109
`* ${messageIds[0]} "${expected}" {}`;
109110
assert.equal(output, expectedOutput);
110111
done();
111-
}, 5000);
112+
}, 2000);
112113
});
113114
});
114115

116+
it(`should pull ordered messages`, (done) => {
117+
const subscriptions = require('../subscriptions');
118+
const expected = `Hello, world!`;
119+
const publishedMessageIds = [];
120+
121+
async.waterfall([
122+
(cb) => {
123+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '3' } }, cb);
124+
},
125+
(messageIds, apiResponse, cb) => {
126+
publishedMessageIds.push(messageIds[0]);
127+
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
128+
},
129+
(cb) => {
130+
assert.equal(console.log.callCount, 0);
131+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
132+
},
133+
(messageIds, apiResponse, cb) => {
134+
publishedMessageIds.push(messageIds[0]);
135+
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
136+
},
137+
(cb) => {
138+
assert.equal(console.log.callCount, 1);
139+
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
140+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
141+
},
142+
(messageIds, apiResponse, cb) => {
143+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '2' } }, cb);
144+
},
145+
(messageIds, apiResponse, cb) => {
146+
publishedMessageIds.push(messageIds[0]);
147+
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
148+
},
149+
(cb) => {
150+
assert.equal(console.log.callCount, 3);
151+
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
152+
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
153+
cb();
154+
}
155+
], done);
156+
});
157+
115158
it(`should set the IAM policy for a subscription`, (done) => {
116159
run(`${cmd} set-policy ${subscriptionNameOne}`, cwd);
117160
pubsub.subscription(subscriptionNameOne).iam.getPolicy((err, policy) => {

0 commit comments

Comments
 (0)