From 677ad66ce3ba41a70157ada94f59273a7de80386 Mon Sep 17 00:00:00 2001 From: Oleg Babin Date: Mon, 29 Apr 2024 15:57:16 +0300 Subject: [PATCH] fix memory leak in case of error on consumer/producer creation Before this patch we didn't free some resources in case of error in Consumer.create or Producer.create functions. This patch fixes it. Closes #108 --- kafka/callbacks.c | 2 ++ kafka/consumer.c | 46 +++++++++++++++++++++++++------- kafka/consumer.h | 27 ------------------- kafka/producer.c | 59 ++++++++++++++++++++++++++++++++---------- kafka/producer.h | 38 --------------------------- tests/consumer.lua | 35 +++++++++++++++++++++++++ tests/producer.lua | 36 ++++++++++++++++++++++++++ tests/test_consumer.py | 5 ++++ tests/test_producer.py | 4 +++ 9 files changed, 165 insertions(+), 87 deletions(-) diff --git a/kafka/callbacks.c b/kafka/callbacks.c index 38799a1..84985b9 100644 --- a/kafka/callbacks.c +++ b/kafka/callbacks.c @@ -317,6 +317,8 @@ new_event_queues() { void destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) { + if (event_queues == NULL) + return; if (event_queues->consume_queue != NULL) { msg_t *msg = NULL; while (true) { diff --git a/kafka/consumer.c b/kafka/consumer.c index dab6032..b23de6b 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -1,3 +1,5 @@ +#include "consumer.h" + #include #include #include @@ -10,14 +12,27 @@ #include #include -#include "consumer.h" - //////////////////////////////////////////////////////////////////////////////////////////////////// /** * Consumer poll thread */ +typedef struct consumer_poller_t { + rd_kafka_t *rd_consumer; + pthread_t thread; + pthread_attr_t attr; + int should_stop; + pthread_mutex_t lock; +} consumer_poller_t; + +typedef struct { + rd_kafka_t *rd_consumer; + rd_kafka_topic_partition_list_t *topics; + event_queues_t *event_queues; + consumer_poller_t *poller; +} consumer_t; + static void * consumer_poll_loop(void *arg) { set_thread_name("kafka_consumer"); @@ -605,7 +620,6 @@ lua_create_consumer(struct lua_State *L) { } char errstr[512]; - rd_kafka_conf_t *rd_config = rd_kafka_conf_new(); rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); lua_pushstring(L, "default_topic_options"); @@ -618,7 +632,7 @@ lua_create_consumer(struct lua_State *L) { if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); lua_pushliteral(L, "consumer config default topic options must contains only string keys and string values"); - return 2; + goto topic_error; } const char *value = lua_tostring(L, -1); @@ -626,7 +640,7 @@ lua_create_consumer(struct lua_State *L) { if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); lua_pushstring(L, errstr); - return 2; + goto topic_error; } // pop value, leaving original key @@ -636,6 +650,8 @@ lua_create_consumer(struct lua_State *L) { // stack now contains: -1 => table } lua_pop(L, 1); + + rd_kafka_conf_t *rd_config = rd_kafka_conf_new(); rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf); event_queues_t *event_queues = new_event_queues(); @@ -678,7 +694,7 @@ lua_create_consumer(struct lua_State *L) { if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); lua_pushliteral(L, "consumer config options must contains only string keys and string values"); - return 2; + goto config_error; } const char *value = lua_tostring(L, -1); @@ -686,7 +702,7 @@ lua_create_consumer(struct lua_State *L) { if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); lua_pushstring(L, errstr); - return 2; + goto config_error; } // pop value, leaving original key @@ -701,13 +717,14 @@ lua_create_consumer(struct lua_State *L) { if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) { lua_pushnil(L); lua_pushstring(L, errstr); - return 2; + goto config_error; } + rd_config = NULL; // was freed by rd_kafka_new if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) { lua_pushnil(L); lua_pushliteral(L, "No valid brokers specified"); - return 2; + goto broker_error; } rd_kafka_poll_set_consumer(rd_consumer); @@ -728,6 +745,17 @@ lua_create_consumer(struct lua_State *L) { luaL_getmetatable(L, consumer_label); lua_setmetatable(L, -2); return 1; + +broker_error: + rd_kafka_destroy(rd_consumer); +config_error: + if (rd_config != NULL) + rd_kafka_conf_destroy(rd_config); + destroy_event_queues(L, event_queues); + return 2; +topic_error: + rd_kafka_topic_conf_destroy(topic_conf); + return 2; } int diff --git a/kafka/consumer.h b/kafka/consumer.h index 558158c..1715bd4 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -5,33 +5,6 @@ #include #include -#include - -#include -#include -#include -#include - -//////////////////////////////////////////////////////////////////////////////////////////////////// -/** - * Consumer - */ - -typedef struct { - rd_kafka_t *rd_consumer; - pthread_t thread; - pthread_attr_t attr; - int should_stop; - pthread_mutex_t lock; -} consumer_poller_t; - -typedef struct { - rd_kafka_t *rd_consumer; - rd_kafka_topic_partition_list_t *topics; - event_queues_t *event_queues; - consumer_poller_t *poller; -} consumer_t; - int lua_consumer_subscribe(struct lua_State *L); diff --git a/kafka/producer.c b/kafka/producer.c index 101d31f..030e3ae 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -1,3 +1,5 @@ +#include "producer.h" + #include #include #include @@ -10,14 +12,33 @@ #include #include -#include "producer.h" - //////////////////////////////////////////////////////////////////////////////////////////////////// /** * Producer poll thread */ +typedef struct producer_poller_t { + rd_kafka_t *rd_producer; + pthread_t thread; + pthread_attr_t attr; + int should_stop; + pthread_mutex_t lock; +} producer_poller_t; + +typedef struct producer_topics_t { + rd_kafka_topic_t **elements; + int32_t count; + int32_t capacity; +} producer_topics_t; + +typedef struct { + rd_kafka_t *rd_producer; + producer_topics_t *topics; + event_queues_t *event_queues; + producer_poller_t *poller; +} producer_t; + static void * producer_poll_loop(void *arg) { set_thread_name("kafka_producer"); @@ -98,7 +119,7 @@ destroy_producer_poller(producer_poller_t *poller) { * Producer */ -producer_topics_t * +static producer_topics_t * new_producer_topics(int32_t capacity) { rd_kafka_topic_t **elements; elements = xmalloc(sizeof(rd_kafka_topic_t *) * capacity); @@ -111,7 +132,7 @@ new_producer_topics(int32_t capacity) { return topics; } -void +static void add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element) { if (topics->count >= topics->capacity) { rd_kafka_topic_t **new_elements = xrealloc(topics->elements, sizeof(rd_kafka_topic_t *) * topics->capacity * 2); @@ -133,7 +154,7 @@ find_producer_topic_by_name(producer_topics_t *topics, const char *name) { return NULL; } -void +static void destroy_producer_topics(producer_topics_t *topics) { rd_kafka_topic_t **topic_p; rd_kafka_topic_t **end = topics->elements + topics->count; @@ -448,8 +469,6 @@ lua_create_producer(struct lua_State *L) { char errstr[512]; - rd_kafka_conf_t *rd_config = rd_kafka_conf_new(); - rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); lua_pushstring(L, "default_topic_options"); lua_gettable(L, -2); @@ -461,7 +480,7 @@ lua_create_producer(struct lua_State *L) { if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); lua_pushliteral(L, "producer config default topic options must contains only string keys and string values"); - return 2; + goto topic_error; } const char *value = lua_tostring(L, -1); @@ -469,7 +488,7 @@ lua_create_producer(struct lua_State *L) { if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); lua_pushstring(L, errstr); - return 2; + goto topic_error; } // pop value, leaving original key @@ -479,6 +498,8 @@ lua_create_producer(struct lua_State *L) { // stack now contains: -1 => table } lua_pop(L, 1); + + rd_kafka_conf_t *rd_config = rd_kafka_conf_new(); rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf); event_queues_t *event_queues = new_event_queues(); @@ -522,7 +543,7 @@ lua_create_producer(struct lua_State *L) { if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); lua_pushliteral(L, "producer config options must contains only string keys and string values"); - return 2; + goto config_error; } const char *value = lua_tostring(L, -1); @@ -530,7 +551,7 @@ lua_create_producer(struct lua_State *L) { if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); lua_pushstring(L, errstr); - return 2; + goto config_error; } // pop value, leaving original key @@ -545,13 +566,14 @@ lua_create_producer(struct lua_State *L) { if (!(rd_producer = rd_kafka_new(RD_KAFKA_PRODUCER, rd_config, errstr, sizeof(errstr)))) { lua_pushnil(L); lua_pushstring(L, errstr); - return 2; + goto config_error; } + rd_config = NULL; // was freed by rd_kafka_new if (rd_kafka_brokers_add(rd_producer, brokers) == 0) { lua_pushnil(L); lua_pushliteral(L, "No valid brokers specified"); - return 2; + goto broker_error; } // creating background thread for polling consumer @@ -570,6 +592,17 @@ lua_create_producer(struct lua_State *L) { luaL_getmetatable(L, producer_label); lua_setmetatable(L, -2); return 1; + +broker_error: + rd_kafka_destroy(rd_producer); +config_error: + if (rd_config != NULL) + rd_kafka_conf_destroy(rd_config); + destroy_event_queues(L, event_queues); + return 2; +topic_error: + rd_kafka_topic_conf_destroy(topic_conf); + return 2; } int diff --git a/kafka/producer.h b/kafka/producer.h index a56089f..d06a93e 100644 --- a/kafka/producer.h +++ b/kafka/producer.h @@ -5,44 +5,6 @@ #include #include -#include - -#include - -//////////////////////////////////////////////////////////////////////////////////////////////////// -/** - * Producer - */ - -typedef struct { - rd_kafka_t *rd_producer; - pthread_t thread; - pthread_attr_t attr; - int should_stop; - pthread_mutex_t lock; -} producer_poller_t; - -typedef struct { - rd_kafka_topic_t **elements; - int32_t count; - int32_t capacity; -} producer_topics_t; - -producer_topics_t *new_producer_topics(int32_t capacity); - -void -add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element); - -void -destroy_producer_topics(producer_topics_t *topics); - -typedef struct { - rd_kafka_t *rd_producer; - producer_topics_t *topics; - event_queues_t *event_queues; - producer_poller_t *poller; -} producer_t; - int lua_producer_tostring(struct lua_State *L); diff --git a/tests/consumer.lua b/tests/consumer.lua index 621faa3..1e91a0e 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -214,6 +214,40 @@ local function test_seek_partitions() return messages end +local function test_create_errors() + log.info('Create without config') + local _, err = tnt_kafka.Consumer.create() + assert(err == 'config must not be nil') + + log.info('Create with empty config') + local _, err = tnt_kafka.Consumer.create({}) + assert(err == 'consumer config table must have non nil key \'brokers\' which contains string') + + log.info('Create with empty brokers') + local _, err = tnt_kafka.Consumer.create({brokers = ''}) + assert(err == 'No valid brokers specified') + + log.info('Create with invalid default_topic_options keys') + local _, err = tnt_kafka.Consumer.create({brokers = '', default_topic_options = {[{}] = 2}}) + assert(err == 'consumer config default topic options must contains only string keys and string values') + + log.info('Create with invalid default_topic_options property') + local _, err = tnt_kafka.Consumer.create({brokers = '', default_topic_options = {[2] = 2}}) + assert(err == 'No such configuration property: "2"') + + log.info('Create with invalid options keys') + local _, err = tnt_kafka.Consumer.create({brokers = '', options = {[{}] = 2}}) + assert(err == 'consumer config options must contains only string keys and string values') + + log.info('Create with invalid options property') + local _, err = tnt_kafka.Consumer.create({brokers = '', options = {[2] = 2}}) + assert(err == 'No such configuration property: "2"') + + log.info('Create with incompatible properties') + local _, err = tnt_kafka.Consumer.create({brokers = '', options = {['reconnect.backoff.max.ms'] = '2', ['reconnect.backoff.ms'] = '1000'}}) + assert(err == '`reconnect.backoff.max.ms` must be >= `reconnect.max.ms`') +end + return { create = create, subscribe = subscribe, @@ -231,4 +265,5 @@ return { resume = resume, test_seek_partitions = test_seek_partitions, + test_create_errors = test_create_errors, } diff --git a/tests/producer.lua b/tests/producer.lua index 266ddd7..7777dba 100644 --- a/tests/producer.lua +++ b/tests/producer.lua @@ -111,6 +111,40 @@ local function close() end end +local function test_create_errors() + log.info('Create without config') + local _, err = tnt_kafka.Producer.create() + assert(err == 'config must not be nil') + + log.info('Create with empty config') + local _, err = tnt_kafka.Producer.create({}) + assert(err == 'producer config table must have non nil key \'brokers\' which contains string') + + log.info('Create with empty brokers') + local _, err = tnt_kafka.Producer.create({brokers = ''}) + assert(err == 'No valid brokers specified') + + log.info('Create with invalid default_topic_options keys') + local _, err = tnt_kafka.Producer.create({brokers = '', default_topic_options = {[{}] = 2}}) + assert(err == 'producer config default topic options must contains only string keys and string values') + + log.info('Create with invalid default_topic_options property') + local _, err = tnt_kafka.Producer.create({brokers = '', default_topic_options = {[2] = 2}}) + assert(err == 'No such configuration property: "2"') + + log.info('Create with invalid options keys') + local _, err = tnt_kafka.Producer.create({brokers = '', options = {[{}] = 2}}) + assert(err == 'producer config options must contains only string keys and string values') + + log.info('Create with invalid options property') + local _, err = tnt_kafka.Producer.create({brokers = '', options = {[2] = 2}}) + assert(err == 'No such configuration property: "2"') + + log.info('Create with incompatible properties') + local _, err = tnt_kafka.Producer.create({brokers = '', options = {['reconnect.backoff.max.ms'] = '2', ['reconnect.backoff.ms'] = '1000'}}) + assert(err == '`reconnect.backoff.max.ms` must be >= `reconnect.max.ms`') +end + return { create = create, produce = produce, @@ -121,4 +155,6 @@ return { dump_conf = dump_conf, metadata = metadata, list_groups = list_groups, + + test_create_errors = test_create_errors, } diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 217af85..977db49 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -157,6 +157,11 @@ def test_consumer_seek_partitions(): assert item['value'] == value +def test_consumer_create_errors(): + server = get_server() + server.call("consumer.test_create_errors") + + def test_consumer_should_consume_msgs_from_multiple_topics(): message1 = { "key": "test1", diff --git a/tests/test_producer.py b/tests/test_producer.py index 23de4aa..a70b121 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -176,3 +176,7 @@ def test_producer_should_log_debug(): assert len(response[0]) > 0 server.call("producer.close", []) + +def test_producer_create_errors(): + server = get_server() + server.call("producer.test_create_errors")