Skip to content

Commit 1a67d00

Browse files
committed
add async sending
1 parent d6b8655 commit 1a67d00

File tree

5 files changed

+455
-44
lines changed

5 files changed

+455
-44
lines changed

binding.gyp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
{
44
"target_name": "cproton",
55
"type": "loadable_module",
6-
"sources": [ "src/cproton.cc", "src/messenger.cc", "src/protondata.cc" ],
6+
"sources": [ "src/cproton.cc", "src/sending.cc", "src/messenger.cc", "src/protondata.cc" ],
77

88
'conditions': [
99
['OS=="linux"', {

src/messenger.cc

Lines changed: 96 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,21 @@
1414
#include "messenger.h"
1515
#include "async.h"
1616

17-
1817
using namespace v8;
1918
using namespace node;
2019
using namespace std;
2120

2221
Messenger::Messenger() {
2322
NODE_CPROTON_MUTEX_INIT(mutex);
23+
messageSender = new MessageSender(this);
24+
messageSettler = new MessageSettler(this);
2425
};
2526

2627
Messenger::~Messenger() {
2728
pn_messenger_stop(messenger);
2829
pn_messenger_stop(receiver);
30+
delete(messageSender);
31+
delete(messageSettler);
2932
NODE_CPROTON_MUTEX_DESTROY(mutex);
3033
}
3134

@@ -57,11 +60,12 @@ Handle<Value> Messenger::New(const Arguments& args) {
5760
Messenger* msgr = new Messenger();
5861

5962
pn_messenger_t* messenger = pn_messenger(NULL);
63+
pn_messenger_set_blocking(messenger, false);
6064
pn_messenger_start(messenger);
6165
msgr->messenger = messenger;
6266

63-
// Temporary fix
64-
pn_messenger_set_outgoing_window(msgr->messenger, 1);
67+
// Temporary fix -- should tune this
68+
pn_messenger_set_outgoing_window(msgr->messenger, 50);
6569

6670
pn_messenger_t* receiver = pn_messenger(NULL);
6771
pn_messenger_set_incoming_window(receiver, 1);
@@ -340,58 +344,36 @@ Handle<Value> Messenger::Send(const Arguments& args) {
340344

341345
REQUIRE_ARGUMENT_OBJECT(0, obj);
342346
OPTIONAL_ARGUMENT_FUNCTION(1, callback);
347+
pn_message_t *msg = JSToMessage(obj);
343348

344-
pn_message_t* msg = JSToMessage(obj);
345-
346-
SendBaton* baton = new SendBaton(msgr, callback, msg);
347-
348-
Work_BeginSend(baton);
349+
InFlightMessage *ifmsg = new InFlightMessage(obj, msg, callback);
350+
msgr->messageSender->AppendMessage(ifmsg);
349351

350352
return Undefined();
351-
352353
}
353354

355+
354356
void Messenger::Work_BeginSend(Baton* baton) {
357+
SendBaton * send_baton = static_cast<SendBaton *>(baton);
358+
355359
int status = uv_queue_work(uv_default_loop(),
356360
&baton->request, Work_Send, (uv_after_work_cb)Work_AfterSend);
357361

358362
assert(status == 0);
359-
360363
}
361364

362365
void Messenger::Work_Send(uv_work_t* req) {
363366

364367
SendBaton* baton = static_cast<SendBaton*>(req->data);
365-
pn_messenger_t* messenger = baton->msgr->messenger;
366-
pn_message_t* message = baton->msg;
367-
368-
NODE_CPROTON_MUTEX_LOCK(&baton->msgr->mutex)
369-
assert(!pn_messenger_put(messenger, message));
370-
baton->tracker = pn_messenger_outgoing_tracker(messenger);
371-
372-
assert(!pn_messenger_send(messenger, -1));
373-
NODE_CPROTON_MUTEX_UNLOCK(&baton->msgr->mutex)
374-
375-
376-
pn_message_free(message);
377-
368+
378369
}
379370

380371
void Messenger::Work_AfterSend(uv_work_t* req) {
381372
HandleScope scope;
373+
382374
SendBaton* baton = static_cast<SendBaton*>(req->data);
383-
384-
if (baton->error_code > 0) {
385-
Local<Value> err = Exception::Error(String::New(baton->error_message.c_str()));
386-
Local<Value> argv[] = { err };
387-
baton->callback->Call(Context::GetCurrent()->Global(), 1, argv);
388-
} else {
389-
Local<Value> argv[1];
390-
baton->callback->Call(Context::GetCurrent()->Global(), 0, argv);
391-
}
392-
375+
393376
delete baton;
394-
395377
}
396378

397379
Handle<Value> Messenger::Receive(const Arguments& args) {
@@ -457,7 +439,7 @@ void Messenger::Work_Receive(uv_work_t* req) {
457439

458440
while (baton->msgr->receiving) {
459441

460-
pn_messenger_recv(receiver, 1024);
442+
pn_messenger_recv(receiver, 150);
461443

462444
while(pn_messenger_incoming(receiver)) {
463445

@@ -634,3 +616,82 @@ Local<Object> Messenger::MessageToJS(pn_message_t* message) {
634616
return result;
635617
}
636618

619+
int Messenger::MessengerGetOutgoingWindow(void)
620+
{
621+
int result;
622+
NODE_CPROTON_MUTEX_LOCK(&mutex)
623+
result = pn_messenger_get_outgoing_window(messenger);
624+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
625+
return result;
626+
}
627+
628+
int Messenger::MessengerGetOutgoing(void){
629+
int result;
630+
NODE_CPROTON_MUTEX_LOCK(&mutex)
631+
result = pn_messenger_outgoing(messenger);
632+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
633+
return result;
634+
}
635+
636+
bool Messenger::MessengerGetBuffered(pn_tracker_t tracker){
637+
bool result;
638+
NODE_CPROTON_MUTEX_LOCK(&mutex)
639+
result = pn_messenger_buffered(messenger, tracker);
640+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
641+
return result;
642+
}
643+
644+
pn_status_t Messenger::MessengerGetStatus(pn_tracker_t tracker){
645+
pn_status_t result;
646+
NODE_CPROTON_MUTEX_LOCK(&mutex)
647+
result = pn_messenger_status(messenger, tracker);
648+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
649+
return result;
650+
}
651+
652+
int Messenger::MessengerSettleOutgoing(pn_tracker_t tracker){
653+
int result;
654+
NODE_CPROTON_MUTEX_LOCK(&mutex)
655+
result = pn_messenger_settle(messenger, tracker, 0);
656+
NODE_CPROTON_MUTEX_UNLOCK(&mutex)
657+
return result;
658+
}
659+
660+
std::string Messenger::MapErrorToString(int err) {
661+
switch(err) {
662+
case MSG_ERROR_NONE:
663+
return std::string("none");
664+
case MSG_ERROR_INTERNAL:
665+
return std::string("internal error");
666+
case MSG_ERROR_REJECTED:
667+
return std::string("rejected");
668+
case MSG_ERROR_STATUS_UNKNOWN:
669+
return std::string("status unknown");
670+
case MSG_ERROR_ABORTED:
671+
return std::string("aborted");
672+
}
673+
return std::string("unknown error");
674+
}
675+
676+
int Messenger::MapPNStatusToError(pn_status_t status)
677+
{
678+
switch(status) {
679+
case PN_STATUS_UNKNOWN:
680+
return MSG_ERROR_STATUS_UNKNOWN;
681+
case PN_STATUS_PENDING:
682+
return MSG_ERROR_NONE;
683+
case PN_STATUS_ACCEPTED:
684+
return MSG_ERROR_NONE;
685+
case PN_STATUS_REJECTED:
686+
return MSG_ERROR_REJECTED;
687+
case PN_STATUS_RELEASED:
688+
return MSG_ERROR_NONE;
689+
case PN_STATUS_MODIFIED:
690+
return MSG_ERROR_NONE;
691+
case PN_STATUS_ABORTED:
692+
return MSG_ERROR_ABORTED;
693+
case PN_STATUS_SETTLED:
694+
return MSG_ERROR_NONE;
695+
}
696+
return MSG_ERROR_INTERNAL;
697+
}

src/messenger.h

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <string>
55
#include <vector>
66
#include <map>
7+
#include <list>
78

89
#include <node.h>
910

@@ -13,10 +14,17 @@
1314
#include "macros.h"
1415
#include "threading.h"
1516
#include "protondata.h"
17+
//#include "sending.h"
1618

1719
using namespace v8;
1820
using namespace node;
1921

22+
#define MSG_ERROR_NONE 0
23+
#define MSG_ERROR_INTERNAL -1
24+
#define MSG_ERROR_REJECTED -2
25+
#define MSG_ERROR_STATUS_UNKNOWN -3
26+
#define MSG_ERROR_ABORTED -4
27+
2028
struct MessageInfo {
2129
pn_message_t *message;
2230
std::string subscription_address;
@@ -87,12 +95,16 @@ class Messenger : public node::ObjectWrap {
8795
};
8896

8997
struct SendBaton : Baton {
98+
99+
SendBaton(Messenger* msgr_, Handle<Function> cb_) :
100+
Baton(msgr_, cb_) {}
90101

91-
pn_message_t * msg;
92-
pn_tracker_t tracker;
102+
};
93103

94-
SendBaton(Messenger* msgr_, Handle<Function> cb_, pn_message_t * msg_) :
95-
Baton(msgr_, cb_), msg(msg_) {}
104+
struct SendSettlingBaton : Baton {
105+
106+
SendSettlingBaton(Messenger* msgr_, Handle<Function> cb_) :
107+
Baton(msgr_, cb_) {}
96108

97109
};
98110

@@ -122,7 +134,7 @@ class Messenger : public node::ObjectWrap {
122134
Async(Messenger* m, uv_async_cb async_cb) :
123135
msgr(m), completed(false), retrieved(0) {
124136
watcher.data = this;
125-
NODE_CPROTON_MUTEX_INIT(mutex)
137+
NODE_CPROTON_MUTEX_INIT(mutex);
126138
msgr->Ref();
127139
uv_async_init(uv_default_loop(), &watcher, async_cb);
128140
}
@@ -151,6 +163,90 @@ class Messenger : public node::ObjectWrap {
151163

152164
void SetSourceFilter(std::string & address, pn_data_t *key, pn_data_t *value);
153165

166+
// protected access methods
167+
int MessengerGetOutgoingWindow(void);
168+
int MessengerGetOutgoing(void);
169+
bool MessengerGetBuffered(pn_tracker_t tracker);
170+
pn_status_t MessengerGetStatus(pn_tracker_t tracker);
171+
int MessengerSettleOutgoing(pn_tracker_t tracker);
172+
173+
// error stuff
174+
static int MapPNStatusToError(pn_status_t status);
175+
static std::string MapErrorToString(int error);
176+
177+
struct InFlightMessage {
178+
InFlightMessage(Handle<Object> msg_, pn_message_t *pnmsg_, Handle<Function> cb_) :
179+
pnmsg(pnmsg_),
180+
retried(false),
181+
error(0) {
182+
callback = Persistent<Function>::New(cb_);
183+
message = Persistent<Object>::New(msg_);
184+
};
185+
186+
~InFlightMessage() {
187+
if(pnmsg) {
188+
pn_message_free(pnmsg);
189+
}
190+
message.Dispose();
191+
callback.Dispose();
192+
}
193+
194+
Persistent<Object> message;
195+
Persistent<Function> callback;
196+
pn_message_t *pnmsg;
197+
pn_tracker_t tracker;
198+
bool retried;
199+
int error;
200+
};
201+
202+
struct QueuedWorker {
203+
QueuedWorker(Messenger *m);
204+
~QueuedWorker();
205+
206+
void AppendMessage(InFlightMessage *msg);
207+
void AppendMessages(std::list<InFlightMessage *> *srcList,
208+
std::list<InFlightMessage *>::iterator begin,
209+
std::list<InFlightMessage *>::iterator end,
210+
NODE_CPROTON_MUTEX_t *srcMutex);
211+
212+
protected:
213+
virtual void HandleQueue(void) = 0;
214+
215+
Messenger *msgr;
216+
std::list<InFlightMessage *> messageList;
217+
NODE_CPROTON_MUTEX_t(mutex);
218+
bool active;
219+
uv_timer_t activityTimer;
220+
};
221+
222+
struct MessageSender : QueuedWorker {
223+
MessageSender(Messenger *m) :
224+
QueuedWorker(m) {};
225+
226+
virtual ~MessageSender() {};
227+
228+
protected:
229+
virtual void HandleQueue(void);
230+
static void ProcessSending(uv_timer_t* handle, int status);
231+
};
232+
233+
struct MessageSettler : QueuedWorker {
234+
MessageSettler(Messenger *m) :
235+
QueuedWorker(m) {};
236+
237+
virtual ~MessageSettler() {};
238+
239+
protected:
240+
virtual void HandleQueue(void);
241+
static void ProcessSettling(uv_timer_t* handle, int status);
242+
};
243+
244+
245+
246+
247+
248+
249+
154250
private:
155251
Messenger();
156252
~Messenger();
@@ -161,6 +257,7 @@ class Messenger : public node::ObjectWrap {
161257
WORK_DEFINITION(Stop)
162258
WORK_DEFINITION(Put)
163259
WORK_DEFINITION(Receive)
260+
WORK_DEFINITION(SendSettling)
164261

165262
static void AsyncReceive(uv_async_t* handle, int status);
166263
static void CloseEmitter(uv_handle_t* handle);
@@ -173,9 +270,12 @@ class Messenger : public node::ObjectWrap {
173270
pn_messenger_t * receiver;
174271
bool receiving;
175272
bool receiveWait;
176-
NODE_CPROTON_MUTEX_t(mutex);
273+
NODE_CPROTON_MUTEX_t mutex;
177274
ReceiveBaton * receiveWaitBaton;
178275

276+
MessageSender *messageSender;
277+
MessageSettler *messageSettler;
278+
179279
std::vector<Subscription *> _subscriptions;
180280
std::map<std::string, unsigned long> _addressToSubscriptionMap;
181281
std::map<pn_subscription_t *, unsigned long> _handleToSubscriptionMap;

0 commit comments

Comments
 (0)