Skip to content

Commit c0a2714

Browse files
authored
less memory utilisation on received msgs (#79)
* less memory utilisation on received msgs making use of scatter/gather io rather than malloc * better output on examples to show whats occuring * put topic len inside cb, no change in cb size * less memory used on recv & less memcpy * dont print log for every message * retry read on interrupt
1 parent 4873680 commit c0a2714

File tree

3 files changed

+118
-63
lines changed

3 files changed

+118
-63
lines changed

examples/consumer.q

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
\l ../q/mqtt.q
22

3+
// consumer topics
4+
topics:`topic1`topic2
5+
36
// Define the table schema to handle incoming messages
47
.mqtt.tab:([]topic:`symbol$();
58
msg_sent:`timestamp$();
@@ -11,10 +14,12 @@ cbfn:{[topic;msg]
1114
data:";" vs msg;
1215
.mqtt.tab,:(`$topic;"P"$data 0;.z.p;"S"$data 1)}
1316

14-
.mqtt.msgrcvd:{cbfn[x;y];0N!"Message received"}
17+
.mqtt.msgrcvd:{cbfn[x;y];}
1518

1619
// Connect and subscribe
1720
//.mqtt.conn[`$"tcp://host.docker.internal:1883";`rcv;()!()];
1821
.mqtt.conn[`$"tcp://localhost:1883";`rcv;()!()];
19-
.mqtt.sub[`topic1];
20-
.mqtt.sub[`topic2];
22+
23+
-1"Configured topics: ",","sv string topics;
24+
-1"Populating .mqtt.tab with each received message";
25+
.mqtt.sub each topics;

examples/producer.q

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
\l ../q/mqtt.q
22

33
// Initialize a counter to allow us to stop after a set number of published messages
4-
n:0
4+
n:1
5+
topics:`topic1`topic2
56
// Connect to the host
67
//.mqtt.conn[`$"tcp://host.docker.internal:1883";`src;()!()]
78
.mqtt.conn[`$"tcp://localhost:1883";`src;()!()]
89
// Set up a timed message publisher
9-
.z.ts:{if[n>=199;system"t 0"];
10-
.mqtt.pub[`topic1;string[.z.p],";","topic1_",string n];
11-
.mqtt.pub[`topic2;string[.z.p],";","topic2_",string n];
10+
.z.ts:{if[n>=200;0N!"Finished sending ",(string n)," messages to each topic";system"t 0"];
11+
.mqtt.pubx[;string[.z.p],";","topic1_",string n;0;0b] each topics;
1212
n+:1}
1313

14-
-1"Type `\\t 100` to publish a message every 100ms up to 200 messages, to stop type `\\t 0`";
14+
-1"Configured topics: ",","sv string topics;
15+
-1"Type `\\t 100` to publish a message to each topic every 100ms (up to 200 messages), to stop type `\\t 0`";

src/mqtt.c

+104-55
Original file line numberDiff line numberDiff line change
@@ -368,16 +368,14 @@ EXP K unsub(K topic){
368368

369369
// Callback data structure
370370
typedef struct CallbackDataStr{
371-
union{
372-
char reserved[8]; // reserve space for flags, aligned to 64 bit word
373-
enum{
371+
enum{
374372
MSG_TYPE_SEND = 9876, // arbitrary uncommon value
375373
MSG_TYPE_RCVD,
376374
MSG_TYPE_DISCONN
377375
} msg_type;
378-
} header;
376+
unsigned int topic_len;
379377
union{
380-
long size;
378+
long payload_len;
381379
MQTTClient_deliveryToken dt;
382380
} body;
383381
// Start of dynamic data
@@ -389,23 +387,59 @@ static void msgsent(void* context, MQTTClient_deliveryToken dt){
389387
// Body contains: <dt>
390388
CallbackData msg;
391389
msg.body.dt = dt;
392-
msg.header.msg_type = MSG_TYPE_SEND;
390+
msg.msg_type = MSG_TYPE_SEND;
393391
send(spair[1], &msg, sizeof(CallbackData), 0);
394392
}
395393

394+
static char* getSysError(char* buf,int len){
395+
buf[0]=0;
396+
#ifdef _WIN32
397+
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 0,
398+
WSAGetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buf,
399+
len, 0);
400+
#else
401+
strerror_r(errno,buf,len);
402+
#endif
403+
return buf;
404+
}
405+
396406
static int msgrcvd(void* context, char* topic, int unused, MQTTClient_message* mq_msg){
397407
// Body contains: <topic_len><topic><payload>
398408
(void)unused;(void)context;
399-
long topic_len = strlen(topic)+1;
400-
long msg_size = sizeof(CallbackData) + topic_len + mq_msg->payloadlen;
401-
CallbackData* msg = malloc(msg_size);
402-
msg->body.size = topic_len + mq_msg->payloadlen;
403-
msg->header.msg_type = MSG_TYPE_RCVD;
404-
char* p = (char*)&(msg[1]);
405-
memcpy(p, topic, topic_len);
406-
memcpy(p += topic_len, mq_msg->payload, mq_msg->payloadlen);
407-
send(spair[1], (char*)msg, msg_size, 0);
408-
free(msg);
409+
#ifdef _WIN32
410+
WSABUF buffers[3];
411+
DWORD bytesSent=0;
412+
#else
413+
struct iovec iov[3];
414+
#endif
415+
unsigned int topic_len=strlen(topic);
416+
CallbackData msg;
417+
msg.msg_type = MSG_TYPE_RCVD;
418+
msg.topic_len = topic_len;
419+
msg.body.payload_len = topic_len + mq_msg->payloadlen;
420+
#ifdef _WIN32
421+
buffers[0].buf=&msg;
422+
buffers[0].len=sizeof(CallbackData);
423+
buffers[1].buf=topic;
424+
buffers[1].len=topic_len;
425+
buffers[2].buf=mq_msg->payload;
426+
buffers[2].len=mq_msg->payloadlen;
427+
if(SOCKET_ERROR==WSASend(spair[1],buffers,3,&bytesSent,0,NULL,NULL)){
428+
char buf[256];
429+
fprintf(stderr, "WSASend error: %s\n", getSysError(buf,sizeof(buf)));
430+
}
431+
#else
432+
iov[0].iov_base=&msg;
433+
iov[0].iov_len=sizeof(CallbackData);
434+
iov[1].iov_base=topic;
435+
iov[1].iov_len=topic_len;
436+
iov[2].iov_base=mq_msg->payload;
437+
iov[2].iov_len=mq_msg->payloadlen;
438+
if(-1==writev(spair[1],iov,sizeof(iov)/sizeof(struct iovec))){
439+
char buf[256];
440+
fprintf(stderr, "send error: %s\n", getSysError(buf,sizeof(buf)));
441+
}
442+
#endif
409443
MQTTClient_freeMessage(&mq_msg);
410444
MQTTClient_free(topic);
411445
return 1;
@@ -415,8 +449,7 @@ static void disconn(void* context, char* cause){
415449
(void)context;(void)cause;
416450
// Body contains: <>
417451
CallbackData msg;
418-
msg.body.size = 0;
419-
msg.header.msg_type = MSG_TYPE_DISCONN;
452+
msg.msg_type = MSG_TYPE_DISCONN;
420453
send(spair[1], &msg, sizeof(CallbackData), 0);
421454
}
422455

@@ -434,29 +467,10 @@ static void qmsgsent(MQTTClient_deliveryToken p){
434467
pr0(k(0, (char*)".mqtt.msgsent", kj(p), (K)0));
435468
}
436469

437-
static void qmsgrcvd(char* p,long sz){
438-
K topic = kp(p);
439-
p += topic->n+1;
440-
K msg = kpn(p, sz - (topic->n+1));
441-
pr0(k(0, (char*)".mqtt.msgrcvd", topic, msg, (K)0));
442-
}
443-
444470
static void qdisconn(){
445471
pr0(k(0, (char*)".mqtt.disconn", ktn(0,0), (K)0));
446472
}
447473

448-
static char* getSysError(char* buff,int len){
449-
buff[0]=0;
450-
#ifdef _WIN32
451-
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 0,
452-
WSAGetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buff,
453-
len, 0);
454-
#else
455-
strerror_r(errno,buff,len);
456-
#endif
457-
return buff;
458-
}
459-
460474
/* Socketpair initialization, callback definition and clean up functionality
461475
* detach function initialized at exit, socketpair start issues handled
462476
* callback function set to loop on socketpair connection
@@ -465,35 +479,70 @@ K mqttCallback(int fd){
465479
CallbackData cb_data;
466480
long rc = recv(fd, (char*)&cb_data, sizeof(cb_data), 0);
467481
if (rc < (long)sizeof(cb_data)){
468-
char buff[256];
469-
fprintf(stderr, "recv(%li) error: %s\n", rc, getSysError(buff,sizeof(buff)));
482+
char buf[256];
483+
fprintf(stderr, "recv(%li) error: %s\n", rc, getSysError(buf,sizeof(buf)));
470484
return (K)0;
471485
}
472-
switch (cb_data.header.msg_type){
486+
switch (cb_data.msg_type){
473487
case MSG_TYPE_SEND:
474488
qmsgsent(cb_data.body.dt);
475489
break;
476490
case MSG_TYPE_RCVD:{
477-
const long expected = cb_data.body.size;
478-
long actual;
479-
char* body = malloc(expected);
480-
for (rc = 0, actual = 0;
481-
actual < expected && rc >= 0;
482-
actual += rc = recv(fd, body + actual, expected - actual, 0));
483-
if (rc < 0){
484-
char buff[256];
485-
fprintf(stderr, "recv(%li) error: %s, expected: %li, actual: %li\n", rc, getSysError(buff,sizeof(buff)), expected, actual);
491+
K topic = ktn(KC,cb_data.topic_len);
492+
K msg = ktn(KC, cb_data.body.payload_len-cb_data.topic_len);
493+
#ifdef _WIN32
494+
DWORD actual=0;
495+
DWORD flags=MSG_WAITALL;
496+
WSABUF buffers[2];
497+
buffers[0].buf=&(kG(topic));
498+
buffers[0].len=cb_data.topic_len;
499+
buffers[1].buf=&(kG(msg));
500+
buffers[1].len=msg->n;
501+
if (WSARecv(fd, buffers, 2, &actual, &flags, NULL, NULL) == SOCKET_ERROR) {
502+
char buf[256];
503+
fprintf(stderr, "WSARecv error: %s\n", getSysError(buf,sizeof(buf)));
504+
}
505+
#else
506+
ssize_t actual=0,c=0,iov_c=2;
507+
struct iovec iov[2];
508+
iov[0].iov_base=&(kG(topic));
509+
iov[0].iov_len=cb_data.topic_len;
510+
iov[1].iov_base=&(kG(msg));
511+
iov[1].iov_len=msg->n;
512+
while(actual<cb_data.body.payload_len){
513+
c=readv(fd,iov,iov_c);
514+
if(c<=0){
515+
if(EINTR==errno)
516+
continue;
517+
else{
518+
char buf[256];
519+
fprintf(stderr, "readv error: %s\n", getSysError(buf,sizeof(buf)));
520+
break;
521+
}
522+
}
523+
actual+=c;
524+
if(actual<cb_data.topic_len){
525+
iov[0].iov_base=&(kG(topic)[actual]);
526+
iov[0].iov_len=cb_data.topic_len-actual;
527+
}else{
528+
iov_c=1;
529+
iov[0].iov_base=&(kG(msg)[actual-cb_data.topic_len]);
530+
iov[0].iov_len=msg->n-(actual-cb_data.topic_len);
531+
}
532+
}
533+
#endif
534+
if(actual==cb_data.body.payload_len)
535+
pr0(k(0, (char*)".mqtt.msgrcvd", topic, msg, (K)0));
536+
else{
537+
r0(topic);r0(msg);
486538
}
487-
else
488-
qmsgrcvd(body, actual);
489-
free(body);
490539
break;
491540
}
492541
case MSG_TYPE_DISCONN:
493542
qdisconn();
494543
break;
495544
default:
496-
fprintf(stderr, "mqttCallback - invalid callback type: %u\n", cb_data.header.msg_type);
545+
fprintf(stderr, "mqttCallback - invalid callback type: %u\n", cb_data.msg_type);
497546
}
498547
return (K)0;
499548
}
@@ -515,8 +564,8 @@ EXP K init(K UNUSED(X)){
515564
if(!(0==validinit))
516565
return 0;
517566
if(dumb_socketpair(spair,1) == SOCKET_ERROR){
518-
char buff[256];
519-
fprintf(stderr,"Init failed. socketpair: %s\n", getSysError(buff,sizeof(buff)));
567+
char buf[256];
568+
fprintf(stderr,"Init failed. socketpair: %s\n", getSysError(buf,sizeof(buf)));
520569
return 0;
521570
}
522571
pr0(sd1(spair[0], &mqttCallback));

0 commit comments

Comments
 (0)