Skip to content

Commit 0782582

Browse files
committed
use amqp-encoder, add support for annotations
1 parent 1a67d00 commit 0782582

File tree

8 files changed

+249
-75
lines changed

8 files changed

+249
-75
lines changed

examples/recv.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ m.on('message', function(message) {
2222
console.log(message.body);
2323
});
2424

25-
m.subscribe(optimist.argv.address).receive();
25+
m.subscribe(optimist.argv.address, {}).receive();

examples/send.js

+55-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
var Messenger = require('..').proton.Messenger;
2+
var crypto = require('crypto');
3+
var async = require('async');
4+
25
var optimist = require('optimist')
36
.options('a', {
47
alias : 'address',
@@ -20,8 +23,56 @@ m.on('error', function(err) {
2023
console.log("Error: " + err.message);
2124
});
2225

23-
m.send({address: optimist.argv.address, body: message}, function(err) {
24-
if (err) {
25-
console.log("Error sending message: " + err.message);
26+
var messages = {};
27+
var size = 128;
28+
var count = 10000;
29+
for(var i = 0; i < count; i++) {
30+
messages[i] = {
31+
id: i,
32+
content: new Buffer(crypto.randomBytes(size)).toString('base64')
33+
};
34+
}
35+
console.log("Message size -- " + messages[0].content.length);
36+
37+
var processed = 0;
38+
function watcher() {
39+
if(processed == count) {
40+
console.log("Total time for %d messages: %d, errors: %d", count, end - start, errors);
41+
} else {
42+
console.log("%d of %d processed, errors: %d", processed, count, errors);
43+
setTimeout(watcher, 5000);
2644
}
27-
});
45+
}
46+
47+
setTimeout(watcher, 5000);
48+
49+
var sent = count;
50+
var end;
51+
var start = new Date().getTime();
52+
var errors = 0;
53+
for(var i = 0; i < count; i++) {
54+
function send_n(n) {
55+
m.send({ address: optimist.argv.address, body: messages[n].content }, function(err, val) {
56+
if(!err) {
57+
end = new Date().getTime();
58+
} else {
59+
if(err == "aborted") {
60+
m.send(val, function(err, val) {
61+
if(err) {
62+
errors++;
63+
console.log("ERR -- " + n + " : " + err);
64+
} else {
65+
end = new Date().getTime();
66+
}
67+
});
68+
} else {
69+
errors++;
70+
console.log("ERR -- " + err);
71+
}
72+
}
73+
processed++;
74+
});
75+
}
76+
send_n(i);
77+
}
78+
console.log("yup");

package.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
},
1111
"devDependencies": {
1212
"optimist": "*",
13-
"nconf": "*"
13+
"nconf": "*",
14+
"async": "*"
1415
},
1516
"engine": ">= 0.6.13 && < 0.11.0",
16-
"version": "0.0.2",
17+
"version": "0.0.3",
1718
"files": [
1819
"index.js",
1920
"binding.gyp",

src/messenger.cc

+62-46
Original file line numberDiff line numberDiff line change
@@ -145,27 +145,11 @@ Handle<Value> Messenger::Subscribe(const Arguments& args) {
145145
REQUIRE_ARGUMENT_OBJECT(1, bag);
146146
OPTIONAL_ARGUMENT_FUNCTION(2, callback);
147147

148-
149-
pn_data_t *filter_key = NULL, *filter_value = NULL;
148+
pn_data_t *filter_value = NULL;
150149
if(bag->Has(String::New("sourceFilter"))) {
151150
Handle<Value> obj = bag->Get(String::New("sourceFilter"));
152151
if(obj->IsArray()) {
153-
Handle<Array> filter = Handle<v8::Array>::Cast(obj);
154-
if(filter->Length() == 2) {
155-
filter_key = ProtonData::ParseJSData(filter->Get(0));
156-
filter_value = ProtonData::ParseJSData(filter->Get(1));
157-
158-
if(!filter_key || !filter_value) {
159-
// TODO -- error;
160-
if(filter_key) {
161-
pn_data_free(filter_key);
162-
}
163-
if(filter_value) {
164-
pn_data_free(filter_value);
165-
}
166-
filter_key = filter_value = NULL;
167-
}
168-
}
152+
filter_value = ProtonData::ParseJSData(obj);
169153
} else {
170154
// TODO -- unsupported
171155
}
@@ -175,7 +159,7 @@ Handle<Value> Messenger::Subscribe(const Arguments& args) {
175159
int subIdx = msgr->AddSubscription(new Subscription(*addr, callback));
176160
NODE_CPROTON_MUTEX_UNLOCK(&msgr->mutex);
177161
Local<Function> var;
178-
SubscribeBaton *baton = new SubscribeBaton(msgr, subIdx, filter_key, filter_value, var);
162+
SubscribeBaton *baton = new SubscribeBaton(msgr, subIdx, filter_value, var);
179163
Work_BeginSubscribe(baton);
180164

181165
return args.This();
@@ -198,8 +182,8 @@ void Messenger::Work_Subscribe(uv_work_t* req) {
198182
pn_subscription_t * sub = pn_messenger_subscribe(baton->msgr->receiver, subscription->address.c_str());
199183
if(sub) {
200184
baton->msgr->SetSubscriptionHandle(baton->subscriptionIndex, sub);
201-
if(baton->filter_key && baton->filter_value) {
202-
baton->msgr->SetSourceFilter(subscription->address, baton->filter_key, baton->filter_value);
185+
if(baton->filter_value) {
186+
baton->msgr->SetSourceFilter(subscription->address, baton->filter_value);
203187
}
204188
} else {
205189
// TODO -- error getting subscription?
@@ -255,14 +239,13 @@ Handle<Value> Messenger::AddSourceFilter(const Arguments& args) {
255239
Messenger* msgr = ObjectWrap::Unwrap<Messenger>(args.This());
256240

257241
REQUIRE_ARGUMENT_STRING(0, addr);
258-
REQUIRE_ARGUMENT_ARRAY(1, filter);
242+
REQUIRE_ARGUMENT_OBJECT(1, filter);
259243
OPTIONAL_ARGUMENT_FUNCTION(2, callback);
260244

261-
pn_data_t *key = ProtonData::ParseJSData(filter->Get(0));
262-
pn_data_t *value = ProtonData::ParseJSData(filter->Get(1));
245+
pn_data_t *value = ProtonData::ParseJSData(filter);
263246

264-
if(key && value) {
265-
AddSourceFilterBaton *baton = new AddSourceFilterBaton(msgr, callback, *addr, key, value);
247+
if(value) {
248+
AddSourceFilterBaton *baton = new AddSourceFilterBaton(msgr, callback, *addr, value);
266249
Work_BeginAddSourceFilter(baton);
267250
}
268251

@@ -277,26 +260,16 @@ void Messenger::Work_BeginAddSourceFilter(Baton* baton) {
277260

278261
}
279262

280-
void Messenger::SetSourceFilter(std::string & address, pn_data_t *key, pn_data_t *value) {
281-
pn_link_t *link = pn_messenger_get_link(this->receiver, address.c_str(), 0);
282-
if(link) {
283-
pn_terminus_t *sr = pn_link_source(link);
284-
if(sr) {
285-
pn_data_t *filter = pn_terminus_filter(sr);
286-
if(filter) {
287-
if(pn_data_size(filter) == 0) {
288-
pn_data_put_map(filter);
263+
void Messenger::SetSourceFilter(std::string & address, pn_data_t *filter_value) {
264+
if(filter_value) {
265+
pn_link_t *link = pn_messenger_get_link(this->receiver, address.c_str(), 0);
266+
if(link) {
267+
pn_terminus_t *sr = pn_link_source(link);
268+
if(sr) {
269+
pn_data_t *filter = pn_terminus_filter(sr);
270+
if(filter) {
271+
pn_data_copy(filter, filter_value);
289272
}
290-
pn_data_next(filter);
291-
pn_data_enter(filter);
292-
while(pn_data_next(filter)) {
293-
// go to the end of the map
294-
;
295-
}
296-
pn_data_append(filter, key);
297-
pn_data_append(filter, value);
298-
pn_data_exit(filter);
299-
pn_data_rewind(filter);
300273
}
301274
}
302275
}
@@ -308,7 +281,7 @@ void Messenger::Work_AddSourceFilter(uv_work_t* req) {
308281

309282
// TODO - add check for subscribed to address
310283
NODE_CPROTON_MUTEX_LOCK(&baton->msgr->mutex)
311-
baton->msgr->SetSourceFilter(baton->address, baton->filter_key, baton->filter_value);
284+
baton->msgr->SetSourceFilter(baton->address, baton->filter_value);
312285
NODE_CPROTON_MUTEX_UNLOCK(&baton->msgr->mutex)
313286

314287
}
@@ -583,6 +556,15 @@ pn_message_t* Messenger::JSToMessage(Local<Object> obj) {
583556
pn_data_put_string(msg_body, pn_bytes(body.length(), str_body));
584557

585558
}
559+
560+
if (obj->Has(String::New("annotations"))) {
561+
562+
Handle<Value> annotations(obj->Get(String::New("annotations")));
563+
564+
pn_data_t *anndata = ProtonData::ParseJSData(annotations);
565+
pn_data_t *msgann = pn_message_annotations(message);
566+
pn_data_copy(msgann, anndata);
567+
}
586568

587569
return(message);
588570
}
@@ -657,6 +639,40 @@ int Messenger::MessengerSettleOutgoing(pn_tracker_t tracker){
657639
return result;
658640
}
659641

642+
int Messenger::MessengerSend(){
643+
int result;
644+
NODE_CPROTON_MUTEX_LOCK(&mutex)
645+
result = pn_messenger_send(messenger, 0);
646+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
647+
return result;
648+
}
649+
650+
int Messenger::MessengerWork(){
651+
int result;
652+
NODE_CPROTON_MUTEX_LOCK(&mutex)
653+
result = pn_messenger_work(messenger, 0);
654+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
655+
return result;
656+
}
657+
658+
pn_tracker_t Messenger::MessengerGetOutgoingTracker(void){
659+
pn_tracker_t result;
660+
NODE_CPROTON_MUTEX_LOCK(&mutex)
661+
result = pn_messenger_outgoing_tracker(messenger);
662+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
663+
return result;
664+
}
665+
666+
int Messenger::MessengerPut(pn_message_t *msg){
667+
int result;
668+
NODE_CPROTON_MUTEX_LOCK(&mutex)
669+
result = pn_messenger_put(messenger, msg);
670+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
671+
return result;
672+
}
673+
674+
675+
660676
std::string Messenger::MapErrorToString(int err) {
661677
switch(err) {
662678
case MSG_ERROR_NONE:

src/messenger.h

+18-11
Original file line numberDiff line numberDiff line change
@@ -63,34 +63,35 @@ class Messenger : public node::ObjectWrap {
6363

6464
struct SubscribeBaton : Baton {
6565
int subscriptionIndex;
66-
pn_data_t *filter_key;
6766
pn_data_t *filter_value;
6867

69-
SubscribeBaton(Messenger* msgr_, int subIndex, pn_data_t *key, pn_data_t *value, Handle<Function> cb_) :
68+
SubscribeBaton(Messenger* msgr_, int subIndex, pn_data_t *value, Handle<Function> cb_) :
7069
Baton(msgr_, cb_),
7170
subscriptionIndex(subIndex),
72-
filter_key(key),
7371
filter_value(value) {};
7472

7573
~SubscribeBaton() {
76-
if(filter_key) {
77-
pn_data_free(filter_key);
78-
}
7974
if(filter_value) {
8075
pn_data_free(filter_value);
8176
}
82-
filter_value = filter_key = NULL;
77+
filter_value = NULL;
8378
}
8479
};
8580

8681
struct AddSourceFilterBaton : Baton {
8782

8883
std::string address;
89-
pn_data_t *filter_key;
9084
pn_data_t *filter_value;
9185

92-
AddSourceFilterBaton(Messenger* msgr_, Handle<Function> cb_, const char* address_, pn_data_t *filter_key_, pn_data_t *filter_value_) :
93-
Baton(msgr_, cb_), address(address_), filter_key(filter_key_), filter_value(filter_value_) {}
86+
AddSourceFilterBaton(Messenger* msgr_, Handle<Function> cb_, const char* address_, pn_data_t *filter_value_) :
87+
Baton(msgr_, cb_), address(address_), filter_value(filter_value_) {}
88+
89+
~AddSourceFilterBaton() {
90+
if(filter_value) {
91+
pn_data_free(filter_value);
92+
}
93+
filter_value = NULL;
94+
}
9495

9596
};
9697

@@ -161,14 +162,20 @@ class Messenger : public node::ObjectWrap {
161162
unsigned long AddSubscription(Subscription *sub);
162163
bool SetSubscriptionHandle(unsigned long idx, pn_subscription_t *sub);
163164

164-
void SetSourceFilter(std::string & address, pn_data_t *key, pn_data_t *value);
165+
void SetSourceFilter(std::string & address, pn_data_t *value);
165166

166167
// protected access methods
167168
int MessengerGetOutgoingWindow(void);
168169
int MessengerGetOutgoing(void);
169170
bool MessengerGetBuffered(pn_tracker_t tracker);
170171
pn_status_t MessengerGetStatus(pn_tracker_t tracker);
171172
int MessengerSettleOutgoing(pn_tracker_t tracker);
173+
int MessengerSend();
174+
int MessengerWork();
175+
pn_tracker_t MessengerGetOutgoingTracker(void);
176+
int MessengerPut(pn_message_t *msg);
177+
178+
172179

173180
// error stuff
174181
static int MapPNStatusToError(pn_status_t status);

src/protondata.cc

+21-7
Original file line numberDiff line numberDiff line change
@@ -448,12 +448,13 @@ pn_data_t *ProtonData::GetArrayOrListJSValue(pn_type_t type, Handle<Array> array
448448
{
449449
pn_data_t *rval = pn_data(0);
450450
pn_type_t arraytype = PN_NULL;
451+
int length = array->Length() - 1;
451452

452453
if(type == PN_LIST) {
453454
pn_data_put_list(rval);
454455
} else {
455-
if(array->Length() > 0) {
456-
pn_data_t *data = ParseJSData(array->Get(0));
456+
if(length > 0) {
457+
pn_data_t *data = ParseJSData(array->Get(1));
457458
if(data) {
458459
arraytype = pn_data_type(data);
459460
pn_data_free(data);
@@ -494,16 +495,17 @@ pn_data_t *ProtonData::GetListJSValue(Handle<Array> array)
494495
pn_data_t *ProtonData::GetMapJSValue(Handle<Array> array)
495496
{
496497
pn_data_t *rval = NULL;
498+
int length = array->Length() - 1;
497499

498-
if(array->Length() % 2 == 0) {
500+
if((length % 2) == 0) {
499501
rval = pn_data(0);
500502
pn_data_put_map(rval);
501503
pn_data_enter(rval);
502-
unsigned int nodecnt = array->Length() / 2;
504+
unsigned int nodecnt = length / 2;
503505
for(unsigned int i = 0; i < nodecnt; i++) {
504-
pn_data_t *key = ParseJSData(array->Get(i * 2));
506+
pn_data_t *key = ParseJSData(array->Get(i * 2 + 1));
505507
if(key) {
506-
pn_data_t *value = ParseJSData(array->Get(i * 2 + 1));
508+
pn_data_t *value = ParseJSData(array->Get((i + 1) * 2));
507509
if(value) {
508510
pn_data_append(rval, key);
509511
pn_data_append(rval, value);
@@ -538,7 +540,7 @@ pn_data_t *ProtonData::ParseJSData(Handle<Value> jsval)
538540
} else if(type == PN_DESCRIBED) {
539541
rval = GetDescribedJSValue(array);
540542
} else {
541-
Handle<Array> value = Handle<Array>::Cast(array->Get(1));
543+
Handle<Array> value = Handle<Array>::Cast(jsval);
542544
if(type == PN_ARRAY) {
543545
rval = GetArrayJSValue(value);
544546
} else if(type == PN_LIST) {
@@ -550,6 +552,18 @@ pn_data_t *ProtonData::ParseJSData(Handle<Value> jsval)
550552
return NULL;
551553
}
552554
}
555+
} else {
556+
Local<Value> value = Local<Value>::New(jsval);
557+
if(jsval->IsString()) {
558+
rval = GetSimpleJSValue(PN_STRING, value);
559+
} else if(jsval->IsBoolean()) {
560+
rval = GetSimpleJSValue(PN_BOOL, value);
561+
} else if(jsval->IsNull()) {
562+
rval = GetSimpleJSValue(PN_NULL, value);
563+
} else {
564+
// TODO -- BINARY, others?
565+
}
553566
}
567+
554568
return rval;
555569
}

0 commit comments

Comments
 (0)