Skip to content

Commit 677ad66

Browse files
committed
fix memory leak in case of error on consumer/producer creation
Before this patch we didn't free some resources in case of error in Consumer.create or Producer.create functions. This patch fixes it. Closes #108
1 parent 825579a commit 677ad66

File tree

9 files changed

+165
-87
lines changed

9 files changed

+165
-87
lines changed

kafka/callbacks.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,8 @@ new_event_queues() {
317317

318318
void
319319
destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) {
320+
if (event_queues == NULL)
321+
return;
320322
if (event_queues->consume_queue != NULL) {
321323
msg_t *msg = NULL;
322324
while (true) {

kafka/consumer.c

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "consumer.h"
2+
13
#include <unistd.h>
24
#include <stdlib.h>
35
#include <errno.h>
@@ -10,14 +12,27 @@
1012
#include <queue.h>
1113
#include <consumer_msg.h>
1214

13-
#include "consumer.h"
14-
1515
////////////////////////////////////////////////////////////////////////////////////////////////////
1616

1717
/**
1818
* Consumer poll thread
1919
*/
2020

21+
typedef struct consumer_poller_t {
22+
rd_kafka_t *rd_consumer;
23+
pthread_t thread;
24+
pthread_attr_t attr;
25+
int should_stop;
26+
pthread_mutex_t lock;
27+
} consumer_poller_t;
28+
29+
typedef struct {
30+
rd_kafka_t *rd_consumer;
31+
rd_kafka_topic_partition_list_t *topics;
32+
event_queues_t *event_queues;
33+
consumer_poller_t *poller;
34+
} consumer_t;
35+
2136
static void *
2237
consumer_poll_loop(void *arg) {
2338
set_thread_name("kafka_consumer");
@@ -605,7 +620,6 @@ lua_create_consumer(struct lua_State *L) {
605620
}
606621

607622
char errstr[512];
608-
rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
609623

610624
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
611625
lua_pushstring(L, "default_topic_options");
@@ -618,15 +632,15 @@ lua_create_consumer(struct lua_State *L) {
618632
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
619633
lua_pushnil(L);
620634
lua_pushliteral(L, "consumer config default topic options must contains only string keys and string values");
621-
return 2;
635+
goto topic_error;
622636
}
623637

624638
const char *value = lua_tostring(L, -1);
625639
const char *key = lua_tostring(L, -2);
626640
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
627641
lua_pushnil(L);
628642
lua_pushstring(L, errstr);
629-
return 2;
643+
goto topic_error;
630644
}
631645

632646
// pop value, leaving original key
@@ -636,6 +650,8 @@ lua_create_consumer(struct lua_State *L) {
636650
// stack now contains: -1 => table
637651
}
638652
lua_pop(L, 1);
653+
654+
rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
639655
rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf);
640656

641657
event_queues_t *event_queues = new_event_queues();
@@ -678,15 +694,15 @@ lua_create_consumer(struct lua_State *L) {
678694
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
679695
lua_pushnil(L);
680696
lua_pushliteral(L, "consumer config options must contains only string keys and string values");
681-
return 2;
697+
goto config_error;
682698
}
683699

684700
const char *value = lua_tostring(L, -1);
685701
const char *key = lua_tostring(L, -2);
686702
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
687703
lua_pushnil(L);
688704
lua_pushstring(L, errstr);
689-
return 2;
705+
goto config_error;
690706
}
691707

692708
// pop value, leaving original key
@@ -701,13 +717,14 @@ lua_create_consumer(struct lua_State *L) {
701717
if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) {
702718
lua_pushnil(L);
703719
lua_pushstring(L, errstr);
704-
return 2;
720+
goto config_error;
705721
}
706722

723+
rd_config = NULL; // was freed by rd_kafka_new
707724
if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) {
708725
lua_pushnil(L);
709726
lua_pushliteral(L, "No valid brokers specified");
710-
return 2;
727+
goto broker_error;
711728
}
712729

713730
rd_kafka_poll_set_consumer(rd_consumer);
@@ -728,6 +745,17 @@ lua_create_consumer(struct lua_State *L) {
728745
luaL_getmetatable(L, consumer_label);
729746
lua_setmetatable(L, -2);
730747
return 1;
748+
749+
broker_error:
750+
rd_kafka_destroy(rd_consumer);
751+
config_error:
752+
if (rd_config != NULL)
753+
rd_kafka_conf_destroy(rd_config);
754+
destroy_event_queues(L, event_queues);
755+
return 2;
756+
topic_error:
757+
rd_kafka_topic_conf_destroy(topic_conf);
758+
return 2;
731759
}
732760

733761
int

kafka/consumer.h

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,6 @@
55
#include <lualib.h>
66
#include <lauxlib.h>
77

8-
#include <librdkafka/rdkafka.h>
9-
10-
#include <common.h>
11-
#include <queue.h>
12-
#include <callbacks.h>
13-
#include <consumer_msg.h>
14-
15-
////////////////////////////////////////////////////////////////////////////////////////////////////
16-
/**
17-
* Consumer
18-
*/
19-
20-
typedef struct {
21-
rd_kafka_t *rd_consumer;
22-
pthread_t thread;
23-
pthread_attr_t attr;
24-
int should_stop;
25-
pthread_mutex_t lock;
26-
} consumer_poller_t;
27-
28-
typedef struct {
29-
rd_kafka_t *rd_consumer;
30-
rd_kafka_topic_partition_list_t *topics;
31-
event_queues_t *event_queues;
32-
consumer_poller_t *poller;
33-
} consumer_t;
34-
358
int
369
lua_consumer_subscribe(struct lua_State *L);
3710

kafka/producer.c

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "producer.h"
2+
13
#include <unistd.h>
24
#include <stdlib.h>
35
#include <errno.h>
@@ -10,14 +12,33 @@
1012
#include <callbacks.h>
1113
#include <queue.h>
1214

13-
#include "producer.h"
14-
1515
////////////////////////////////////////////////////////////////////////////////////////////////////
1616

1717
/**
1818
* Producer poll thread
1919
*/
2020

21+
typedef struct producer_poller_t {
22+
rd_kafka_t *rd_producer;
23+
pthread_t thread;
24+
pthread_attr_t attr;
25+
int should_stop;
26+
pthread_mutex_t lock;
27+
} producer_poller_t;
28+
29+
typedef struct producer_topics_t {
30+
rd_kafka_topic_t **elements;
31+
int32_t count;
32+
int32_t capacity;
33+
} producer_topics_t;
34+
35+
typedef struct {
36+
rd_kafka_t *rd_producer;
37+
producer_topics_t *topics;
38+
event_queues_t *event_queues;
39+
producer_poller_t *poller;
40+
} producer_t;
41+
2142
static void *
2243
producer_poll_loop(void *arg) {
2344
set_thread_name("kafka_producer");
@@ -98,7 +119,7 @@ destroy_producer_poller(producer_poller_t *poller) {
98119
* Producer
99120
*/
100121

101-
producer_topics_t *
122+
static producer_topics_t *
102123
new_producer_topics(int32_t capacity) {
103124
rd_kafka_topic_t **elements;
104125
elements = xmalloc(sizeof(rd_kafka_topic_t *) * capacity);
@@ -111,7 +132,7 @@ new_producer_topics(int32_t capacity) {
111132
return topics;
112133
}
113134

114-
void
135+
static void
115136
add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element) {
116137
if (topics->count >= topics->capacity) {
117138
rd_kafka_topic_t **new_elements = xrealloc(topics->elements, sizeof(rd_kafka_topic_t *) * topics->capacity * 2);
@@ -133,7 +154,7 @@ find_producer_topic_by_name(producer_topics_t *topics, const char *name) {
133154
return NULL;
134155
}
135156

136-
void
157+
static void
137158
destroy_producer_topics(producer_topics_t *topics) {
138159
rd_kafka_topic_t **topic_p;
139160
rd_kafka_topic_t **end = topics->elements + topics->count;
@@ -448,8 +469,6 @@ lua_create_producer(struct lua_State *L) {
448469

449470
char errstr[512];
450471

451-
rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
452-
453472
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
454473
lua_pushstring(L, "default_topic_options");
455474
lua_gettable(L, -2);
@@ -461,15 +480,15 @@ lua_create_producer(struct lua_State *L) {
461480
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
462481
lua_pushnil(L);
463482
lua_pushliteral(L, "producer config default topic options must contains only string keys and string values");
464-
return 2;
483+
goto topic_error;
465484
}
466485

467486
const char *value = lua_tostring(L, -1);
468487
const char *key = lua_tostring(L, -2);
469488
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
470489
lua_pushnil(L);
471490
lua_pushstring(L, errstr);
472-
return 2;
491+
goto topic_error;
473492
}
474493

475494
// pop value, leaving original key
@@ -479,6 +498,8 @@ lua_create_producer(struct lua_State *L) {
479498
// stack now contains: -1 => table
480499
}
481500
lua_pop(L, 1);
501+
502+
rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
482503
rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf);
483504

484505
event_queues_t *event_queues = new_event_queues();
@@ -522,15 +543,15 @@ lua_create_producer(struct lua_State *L) {
522543
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
523544
lua_pushnil(L);
524545
lua_pushliteral(L, "producer config options must contains only string keys and string values");
525-
return 2;
546+
goto config_error;
526547
}
527548

528549
const char *value = lua_tostring(L, -1);
529550
const char *key = lua_tostring(L, -2);
530551
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
531552
lua_pushnil(L);
532553
lua_pushstring(L, errstr);
533-
return 2;
554+
goto config_error;
534555
}
535556

536557
// pop value, leaving original key
@@ -545,13 +566,14 @@ lua_create_producer(struct lua_State *L) {
545566
if (!(rd_producer = rd_kafka_new(RD_KAFKA_PRODUCER, rd_config, errstr, sizeof(errstr)))) {
546567
lua_pushnil(L);
547568
lua_pushstring(L, errstr);
548-
return 2;
569+
goto config_error;
549570
}
550571

572+
rd_config = NULL; // was freed by rd_kafka_new
551573
if (rd_kafka_brokers_add(rd_producer, brokers) == 0) {
552574
lua_pushnil(L);
553575
lua_pushliteral(L, "No valid brokers specified");
554-
return 2;
576+
goto broker_error;
555577
}
556578

557579
// creating background thread for polling consumer
@@ -570,6 +592,17 @@ lua_create_producer(struct lua_State *L) {
570592
luaL_getmetatable(L, producer_label);
571593
lua_setmetatable(L, -2);
572594
return 1;
595+
596+
broker_error:
597+
rd_kafka_destroy(rd_producer);
598+
config_error:
599+
if (rd_config != NULL)
600+
rd_kafka_conf_destroy(rd_config);
601+
destroy_event_queues(L, event_queues);
602+
return 2;
603+
topic_error:
604+
rd_kafka_topic_conf_destroy(topic_conf);
605+
return 2;
573606
}
574607

575608
int

kafka/producer.h

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,6 @@
55
#include <lualib.h>
66
#include <lauxlib.h>
77

8-
#include <librdkafka/rdkafka.h>
9-
10-
#include <queue.h>
11-
12-
////////////////////////////////////////////////////////////////////////////////////////////////////
13-
/**
14-
* Producer
15-
*/
16-
17-
typedef struct {
18-
rd_kafka_t *rd_producer;
19-
pthread_t thread;
20-
pthread_attr_t attr;
21-
int should_stop;
22-
pthread_mutex_t lock;
23-
} producer_poller_t;
24-
25-
typedef struct {
26-
rd_kafka_topic_t **elements;
27-
int32_t count;
28-
int32_t capacity;
29-
} producer_topics_t;
30-
31-
producer_topics_t *new_producer_topics(int32_t capacity);
32-
33-
void
34-
add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element);
35-
36-
void
37-
destroy_producer_topics(producer_topics_t *topics);
38-
39-
typedef struct {
40-
rd_kafka_t *rd_producer;
41-
producer_topics_t *topics;
42-
event_queues_t *event_queues;
43-
producer_poller_t *poller;
44-
} producer_t;
45-
468
int
479
lua_producer_tostring(struct lua_State *L);
4810

0 commit comments

Comments
 (0)