diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index e2d32b255..0c06a3e0b 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -138,12 +138,20 @@ blocks: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' jobs: - - name: Build + - name: Build and Tests with 'classic' group protocol + commands: + - sem-version python 3.8 + # use a virtualenv + - python3 -m venv _venv && source _venv/bin/activate + - chmod u+r+x tools/source-package-verification.sh + - tools/source-package-verification.sh + - name: Build and Tests with 'consumer' group protocol commands: - sem-version python 3.8 # use a virtualenv - python3 -m venv _venv && source _venv/bin/activate - chmod u+r+x tools/source-package-verification.sh + - export TEST_CONSUMER_GROUP_PROTOCOL=consumer - tools/source-package-verification.sh - name: "Source package verification with Python 3 (Linux arm64)" dependencies: [] diff --git a/CHANGELOG.md b/CHANGELOG.md index c9857786d..d65b3de2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Confluent's Python client for Apache Kafka + +## v2.6.0 + +v2.6.0 is a feature release with the following features, fixes and enhancements: + +- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#). + +confluent-kafka-python is based on librdkafka v2.6.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. + + ## v2.5.3 v2.5.3 is a maintenance release with the following fixes and enhancements: @@ -18,7 +30,7 @@ for a complete list of changes, enhancements, fixes and upgrade considerations. ## v2.5.0 > [!WARNING] -This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. +This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. > > You won't face any problem if: > * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability). @@ -26,7 +38,7 @@ This version has introduced a regression in which an assert is triggered during > * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`. > * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there. > * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client. -> +> > Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all. v2.5.0 is a feature release with the following features, fixes and enhancements: diff --git a/examples/adminapi.py b/examples/adminapi.py index 8442d6453..e2b075c5b 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -18,8 +18,9 @@ # Example use of AdminClient operations. from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions, - TopicPartition, ConsumerGroupState, TopicCollection, - IsolationLevel) + TopicPartition, ConsumerGroupState, + TopicCollection, IsolationLevel, + ConsumerGroupType) from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigEntry, ConfigSource, AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, @@ -30,6 +31,7 @@ import sys import threading import logging +import argparse logging.basicConfig() @@ -471,18 +473,47 @@ def example_list(a, args): print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host)) +def parse_list_consumer_groups_args(args, states, types): + parser = argparse.ArgumentParser(prog='list_consumer_groups') + parser.add_argument('-states') + parser.add_argument('-types') + parsed_args = parser.parse_args(args) + + def usage(message): + print(message) + parser.print_usage() + sys.exit(1) + + if parsed_args.states: + for arg in parsed_args.states.split(","): + try: + states.add(ConsumerGroupState[arg]) + except KeyError: + usage(f"Invalid state: {arg}") + if parsed_args.types: + for arg in parsed_args.types.split(","): + try: + types.add(ConsumerGroupType[arg]) + except KeyError: + usage(f"Invalid type: {arg}") + + def example_list_consumer_groups(a, args): """ List Consumer Groups """ - states = {ConsumerGroupState[state] for state in args} - future = a.list_consumer_groups(request_timeout=10, states=states) + + states = set() + types = set() + parse_list_consumer_groups_args(args, states, types) + + future = a.list_consumer_groups(request_timeout=10, states=states, types=types) try: list_consumer_groups_result = future.result() print("{} consumer groups".format(len(list_consumer_groups_result.valid))) for valid in list_consumer_groups_result.valid: - print(" id: {} is_simple: {} state: {}".format( - valid.group_id, valid.is_simple_consumer_group, valid.state)) + print(" id: {} is_simple: {} state: {} type: {}".format( + valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type)) print("{} errors".format(len(list_consumer_groups_result.errors))) for error in list_consumer_groups_result.errors: print(" error: {}".format(error)) @@ -900,7 +931,8 @@ def example_delete_records(a, args): sys.stderr.write(' delete_acls ' + ' ..\n') sys.stderr.write(' list []\n') - sys.stderr.write(' list_consumer_groups [ ..]\n') + sys.stderr.write(' list_consumer_groups [-states ,,..] ' + + '[-types ,,..]\n') sys.stderr.write(' describe_consumer_groups ..\n') sys.stderr.write(' describe_topics ..\n') sys.stderr.write(' describe_cluster \n') diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 6a24d6e9e..f3a8d1778 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -22,6 +22,7 @@ from ._model import (Node, # noqa: F401 ConsumerGroupTopicPartitions, ConsumerGroupState, + ConsumerGroupType, TopicCollection, TopicPartitionInfo, IsolationLevel) @@ -48,7 +49,8 @@ 'Producer', 'DeserializingConsumer', 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', - 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', + 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', + 'ConsumerGroupType', 'Uuid', 'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo'] __version__ = version()[0] diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 1c2ec89f0..93c810757 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -95,6 +95,23 @@ def __lt__(self, other): return self.value < other.value +class ConsumerGroupType(Enum): + """ + Enumerates the different types of Consumer Group Type. + """ + #: Type is not known or not set + UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN + #: Consumer Type + CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER + #: Classic Type + CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value + + class TopicCollection: """ Represents collection of topics in the form of different identifiers diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 9101b651f..5271a6101 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -56,7 +56,7 @@ from ._records import DeletedRecords # noqa: F401 -from .._model import TopicCollection as _TopicCollection +from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType from ..cimpl import (KafkaException, # noqa: F401 KafkaError, @@ -881,6 +881,8 @@ def list_consumer_groups(self, **kwargs): on broker, and response. Default: `socket.timeout.ms/1000.0` :param set(ConsumerGroupState) states: only list consumer groups which are currently in these states. + :param set(ConsumerGroupType) types: only list consumer groups of + these types. :returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`. @@ -900,6 +902,16 @@ def list_consumer_groups(self, **kwargs): raise TypeError("All elements of states must be of type ConsumerGroupState") kwargs["states_int"] = [state.value for state in states] kwargs.pop("states") + if "types" in kwargs: + types = kwargs["types"] + if types is not None: + if not isinstance(types, set): + raise TypeError("'types' must be a set") + for type in types: + if not isinstance(type, _ConsumerGroupType): + raise TypeError("All elements of types must be of type ConsumerGroupType") + kwargs["types_int"] = [type.value for type in types] + kwargs.pop("types") f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result) diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 82ab98f1d..964d62b2f 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -14,7 +14,7 @@ from .._util import ConversionUtil -from .._model import ConsumerGroupState +from .._model import ConsumerGroupState, ConsumerGroupType from ._acl import AclOperation @@ -31,13 +31,17 @@ class ConsumerGroupListing: Whether a consumer group is simple or not. state : ConsumerGroupState Current state of the consumer group. + type : ConsumerGroupType + Type of the consumer group. """ - def __init__(self, group_id, is_simple_consumer_group, state=None): + def __init__(self, group_id, is_simple_consumer_group, state=None, type=None): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if type is not None: + self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType) class ListConsumerGroupsResult: diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index ae6c55116..021e06427 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -222,8 +222,7 @@ def remove_by_subject(self, subject_name): Args: subject_name (str): Subject name the schema is registered under. """ - - + with self.lock: if subject_name in self.subject_schemas: for schema in self.subject_schemas[subject_name]: @@ -234,7 +233,6 @@ def remove_by_subject(self, subject_name): del self.subject_schemas[subject_name] - def get_schema(self, schema_id): """ Get the schema instance associated with schema_id from the cache. @@ -566,9 +564,9 @@ def delete_subject(self, subject_name, permanent=False): if permanent: self._rest_client.delete('subjects/{}?permanent=true' .format(_urlencode(subject_name))) - + self._cache.remove_by_subject(subject_name) - + return list def get_latest_version(self, subject_name): diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index c58166d6e..5fbeb780f 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -82,6 +82,8 @@ struct Admin_options { rd_kafka_IsolationLevel_t isolation_level; rd_kafka_consumer_group_state_t* states; int states_cnt; + rd_kafka_consumer_group_type_t* types; + int types_cnt; }; /**@brief "unset" value initializers for Admin_options @@ -96,6 +98,8 @@ struct Admin_options { Admin_options_def_int, \ Admin_options_def_ptr, \ Admin_options_def_cnt, \ + Admin_options_def_ptr, \ + Admin_options_def_cnt, \ } #define Admin_options_is_set_int(v) ((v) != Admin_options_def_int) @@ -185,6 +189,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api, goto err; } + if (Admin_options_is_set_ptr(options->types) && + (err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types( + c_options, options->types, options->types_cnt))) { + snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj)); + goto err; + } + return c_options; err: @@ -1698,24 +1709,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR( * @brief List consumer groups */ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) { - PyObject *future, *states_int = NULL; + PyObject *future, *states_int = NULL, *types_int = NULL; struct Admin_options options = Admin_options_INITIALIZER; rd_kafka_AdminOptions_t *c_options = NULL; CallState cs; rd_kafka_queue_t *rkqu; rd_kafka_consumer_group_state_t *c_states = NULL; + rd_kafka_consumer_group_type_t *c_types = NULL; int states_cnt = 0; + int types_cnt = 0; int i = 0; static char *kws[] = {"future", /* options */ "states_int", + "types_int", "request_timeout", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws, &future, &states_int, + &types_int, &options.request_timeout)) { goto err; } @@ -1736,7 +1751,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw PyObject *state = PyList_GET_ITEM(states_int, i); if(!cfl_PyInt_Check(state)) { PyErr_SetString(PyExc_ValueError, - "Element of states must be a valid state"); + "Element of states must be valid states"); goto err; } c_states[i] = (rd_kafka_consumer_group_state_t) cfl_PyInt_AsInt(state); @@ -1746,6 +1761,33 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw } } + if(types_int != NULL && types_int != Py_None) { + if(!PyList_Check(types_int)) { + PyErr_SetString(PyExc_ValueError, + "types must of type list"); + goto err; + } + + types_cnt = (int)PyList_Size(types_int); + + if(types_cnt > 0) { + c_types = (rd_kafka_consumer_group_type_t *) + malloc(types_cnt * + sizeof(rd_kafka_consumer_group_type_t)); + for(i = 0 ; i < types_cnt ; i++) { + PyObject *type = PyList_GET_ITEM(types_int, i); + if(!cfl_PyInt_Check(type)) { + PyErr_SetString(PyExc_ValueError, + "Element of types must be valid group types"); + goto err; + } + c_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(type); + } + options.types = c_types; + options.types_cnt = types_cnt; + } + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, &options, future); if (!c_options) { @@ -1774,14 +1816,19 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw if(c_states) { free(c_states); } + if(c_types) { + free(c_types); + } rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ rd_kafka_AdminOptions_destroy(c_options); - Py_RETURN_NONE; err: if(c_states) { free(c_states); } + if(c_types) { + free(c_types); + } if (c_options) { rd_kafka_AdminOptions_destroy(c_options); Py_DECREF(future); @@ -1789,7 +1836,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw return NULL; } const char Admin_list_consumer_groups_doc[] = PyDoc_STR( - ".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n" + ".. py:function:: list_consumer_groups(future, [states_int], [types_int], [request_timeout])\n" "\n" " List all the consumer groups.\n" "\n" @@ -3609,6 +3656,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py( cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i])); + cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i])); + args = PyTuple_New(0); valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs); diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 43ca665ba..ef9affb30 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) { PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY); } +static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) { + /* rd_kafka_consumer_group_type_t */ + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); +} + static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) { - /* rd_kafka_consumer_group_state_t */ PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET); PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE); PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND); @@ -612,6 +618,7 @@ void AdminTypes_AddObjects (PyObject *m) { AdminTypes_AddObjectsAclOperation(m); AdminTypes_AddObjectsAclPermissionType(m); AdminTypes_AddObjectsConsumerGroupStates(m); + AdminTypes_AddObjectsConsumerGroupTypes(m); AdminTypes_AddObjectsAlterConfigOpType(m); AdminTypes_AddObjectsScramMechanismType(m); AdminTypes_AddObjectsIsolationLevel(m); diff --git a/tests/common/__init__.py b/tests/common/__init__.py new file mode 100644 index 000000000..a0d54934f --- /dev/null +++ b/tests/common/__init__.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from confluent_kafka import Consumer, DeserializingConsumer +from confluent_kafka.avro import AvroConsumer + +_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL' +_TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE' + + +def _update_conf_group_protocol(conf=None): + if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): + conf['group.protocol'] = 'consumer' + + +def _trivup_cluster_type_kraft(): + return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft' + + +class TestUtils: + @staticmethod + def use_kraft(): + return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft() + + @staticmethod + def use_group_protocol_consumer(): + return _GROUP_PROTOCOL_ENV in os.environ and os.environ[_GROUP_PROTOCOL_ENV] == 'consumer' + + +class TestConsumer(Consumer): + def __init__(self, conf=None, **kwargs): + _update_conf_group_protocol(conf) + super(TestConsumer, self).__init__(conf, **kwargs) + + +class TestDeserializingConsumer(DeserializingConsumer): + def __init__(self, conf=None, **kwargs): + _update_conf_group_protocol(conf) + super(TestDeserializingConsumer, self).__init__(conf, **kwargs) + + +class TestAvroConsumer(AvroConsumer): + def __init__(self, conf=None, **kwargs): + _update_conf_group_protocol(conf) + super(TestAvroConsumer, self).__init__(conf, **kwargs) diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py index 820fd5228..eb7f41a39 100644 --- a/tests/integration/admin/test_basic_operations.py +++ b/tests/integration/admin/test_basic_operations.py @@ -13,15 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import confluent_kafka import struct import time -from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState + +from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState, KafkaError +from confluent_kafka import ConsumerGroupType from confluent_kafka.admin import (NewPartitions, ConfigResource, AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError - +from tests.common import TestUtils topic_prefix = "test-topic" @@ -58,6 +59,8 @@ def verify_admin_acls(admin_client, for acl_binding, f in fs.items(): f.result() # trigger exception if there was an error + time.sleep(1) + acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY, None, None, AclOperation.ANY, AclPermissionType.ANY) acl_binding_filter2 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.PREFIXED, @@ -83,6 +86,8 @@ def verify_admin_acls(admin_client, "Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings, expected_acl_bindings) + time.sleep(1) + # # Delete the ACLs with TOPIC and GROUP # @@ -94,6 +99,9 @@ def verify_admin_acls(admin_client, assert deleted_acl_bindings == expected, \ "Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings, expected) + + time.sleep(1) + # # All the ACLs should have been deleted # @@ -201,14 +209,14 @@ def test_basic_operations(kafka_cluster): # Second iteration: create topic. # for validate in (True, False): - our_topic = kafka_cluster.create_topic(topic_prefix, - { - "num_partitions": num_partitions, - "config": topic_config, - "replication_factor": 1, - }, - validate_only=validate - ) + our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + { + "num_partitions": num_partitions, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=validate + ) admin_client = kafka_cluster.admin() @@ -270,7 +278,7 @@ def consume_messages(group_id, num_messages=None): print('Read all the required messages: exiting') break except ConsumeError as e: - if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF: + if msg is not None and e.code == KafkaError._PARTITION_EOF: print('Reached end of %s [%d] at offset %d' % ( msg.topic(), msg.partition(), msg.offset())) eof_reached[(msg.topic(), msg.partition())] = True @@ -300,17 +308,34 @@ def consume_messages(group_id, num_messages=None): assert group2 in groups, "Consumer group {} not found".format(group2) # List Consumer Groups new API test + current_type = TestUtils.use_group_protocol_consumer() and \ + ConsumerGroupType.CONSUMER or ConsumerGroupType.CLASSIC + opposite_type = current_type == ConsumerGroupType.CONSUMER and \ + ConsumerGroupType.CLASSIC or ConsumerGroupType.CONSUMER future = admin_client.list_consumer_groups(request_timeout=10) result = future.result() group_ids = [group.group_id for group in result.valid] assert group1 in group_ids, "Consumer group {} not found".format(group1) assert group2 in group_ids, "Consumer group {} not found".format(group2) - future = admin_client.list_consumer_groups(request_timeout=10, states={ConsumerGroupState.STABLE}) + future = admin_client.list_consumer_groups(request_timeout=10, + states={ConsumerGroupState.STABLE}, + types={current_type}) result = future.result() assert isinstance(result.valid, list) assert not result.valid + # List Consumer Groups with group type option, should not return any group + # of those being tested. + if TestUtils.use_group_protocol_consumer(): + future = admin_client.list_consumer_groups(request_timeout=10, types={opposite_type}) + result = future.result() + group_ids = [group.group_id for group in result.valid] + assert group1 not in group_ids, f"Consumer group {group1} was found despite passing {opposite_type}" + assert group2 not in group_ids, f"Consumer group {group2} was found despite passing {opposite_type}" + for group in group_ids: + assert group.type == ConsumerGroupType.CLASSIC + def verify_config(expconfig, configs): """ Verify that the config key,values in expconfig are found @@ -345,6 +370,8 @@ def verify_config(expconfig, configs): fs = admin_client.alter_configs([resource]) fs[resource].result() # will raise exception on failure + time.sleep(1) + # # Read the config back again and verify. # diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index fdbaca749..2c58e36c4 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -25,11 +25,11 @@ def test_delete_records(kafka_cluster): admin_client = kafka_cluster.admin() # Create a topic with a single partition - topic = kafka_cluster.create_topic("test-del-records", - { - "num_partitions": 1, - "replication_factor": 1, - }) + topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records", + { + "num_partitions": 1, + "replication_factor": 1, + }) # Create Producer instance p = kafka_cluster.producer() @@ -73,16 +73,17 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster): admin_client = kafka_cluster.admin() num_partitions = 3 # Create two topics with a single partition - topic = kafka_cluster.create_topic("test-del-records", - { - "num_partitions": num_partitions, - "replication_factor": 1, - }) - topic2 = kafka_cluster.create_topic("test-del-records2", - { - "num_partitions": num_partitions, - "replication_factor": 1, - }) + topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records", + { + "num_partitions": num_partitions, + "replication_factor": 1, + }) + topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2", + { + "num_partitions": num_partitions, + "replication_factor": 1, + }) + topics = [topic, topic2] partitions = list(range(num_partitions)) # Create Producer instance diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index ef5d94987..3bfbfd88a 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import pytest + from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError from confluent_kafka import ConsumerGroupState, TopicCollection +from tests.common import TestUtils + topic_prefix = "test-topic" @@ -82,10 +86,12 @@ def perform_admin_operation_sync(operation, *arg, **kwargs): def create_acls(admin_client, acl_bindings): perform_admin_operation_sync(admin_client.create_acls, acl_bindings) + time.sleep(1) def delete_acls(admin_client, acl_binding_filters): perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters) + time.sleep(1) def verify_provided_describe_for_authorized_operations( @@ -115,6 +121,7 @@ def verify_provided_describe_for_authorized_operations( acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL, "User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW) create_acls(admin_client, [acl_binding]) + time.sleep(1) # Check with updated authorized operations desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs) @@ -126,6 +133,7 @@ def verify_provided_describe_for_authorized_operations( acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY, None, None, AclOperation.ANY, AclPermissionType.ANY) delete_acls(admin_client, [acl_binding_filter]) + time.sleep(1) return desc @@ -196,20 +204,24 @@ def test_describe_operations(sasl_cluster): # Create Topic topic_config = {"compression.type": "gzip"} - our_topic = sasl_cluster.create_topic(topic_prefix, - { - "num_partitions": 1, - "config": topic_config, - "replication_factor": 1, - }, - validate_only=False - ) + our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix, + { + "num_partitions": 1, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=False + ) # Verify Authorized Operations in Describe Topics verify_describe_topics(admin_client, our_topic) # Verify Authorized Operations in Describe Groups - verify_describe_groups(sasl_cluster, admin_client, our_topic) + # Skip this test if using group protocol `consumer` + # as there is new RPC for describe_groups() in + # group protocol `consumer` case. + if not TestUtils.use_group_protocol_consumer(): + verify_describe_groups(sasl_cluster, admin_client, our_topic) # Delete Topic perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10) diff --git a/tests/integration/admin/test_incremental_alter_configs.py b/tests/integration/admin/test_incremental_alter_configs.py index ad19a1164..24428e6f1 100644 --- a/tests/integration/admin/test_incremental_alter_configs.py +++ b/tests/integration/admin/test_incremental_alter_configs.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + from confluent_kafka.admin import ConfigResource, \ ConfigEntry, ResourceType, \ AlterConfigOpType @@ -52,18 +54,18 @@ def test_incremental_alter_configs(kafka_cluster): num_partitions = 2 topic_config = {"compression.type": "gzip"} - our_topic = kafka_cluster.create_topic(topic_prefix, - { - "num_partitions": num_partitions, - "config": topic_config, - "replication_factor": 1, - }) - our_topic2 = kafka_cluster.create_topic(topic_prefix2, - { - "num_partitions": num_partitions, - "config": topic_config, - "replication_factor": 1, - }) + our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + { + "num_partitions": num_partitions, + "config": topic_config, + "replication_factor": 1, + }) + our_topic2 = kafka_cluster.create_topic_and_wait_propogation(topic_prefix2, + { + "num_partitions": num_partitions, + "config": topic_config, + "replication_factor": 1, + }) admin_client = kafka_cluster.admin() @@ -103,6 +105,8 @@ def test_incremental_alter_configs(kafka_cluster): assert_operation_succeeded(fs, 2) + time.sleep(1) + # # Get current topic config # @@ -134,6 +138,8 @@ def test_incremental_alter_configs(kafka_cluster): assert_operation_succeeded(fs, 1) + time.sleep(1) + # # Get current topic config # diff --git a/tests/integration/admin/test_list_offsets.py b/tests/integration/admin/test_list_offsets.py index 6a2e0a46a..f1448b267 100644 --- a/tests/integration/admin/test_list_offsets.py +++ b/tests/integration/admin/test_list_offsets.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec from confluent_kafka import TopicPartition, IsolationLevel +from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec def test_list_offsets(kafka_cluster): @@ -27,11 +27,11 @@ def test_list_offsets(kafka_cluster): admin_client = kafka_cluster.admin() # Create a topic with a single partition - topic = kafka_cluster.create_topic("test-topic-verify-list-offsets", - { - "num_partitions": 1, - "replication_factor": 1, - }) + topic = kafka_cluster.create_topic_and_wait_propogation("test-topic-verify-list-offsets", + { + "num_partitions": 1, + "replication_factor": 1, + }) # Create Producer instance p = kafka_cluster.producer() diff --git a/tests/integration/admin/test_user_scram_credentials.py b/tests/integration/admin/test_user_scram_credentials.py index 21c15bc07..6324ea9d3 100644 --- a/tests/integration/admin/test_user_scram_credentials.py +++ b/tests/integration/admin/test_user_scram_credentials.py @@ -13,13 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest +import time import concurrent +import pytest + from confluent_kafka.admin import UserScramCredentialsDescription, UserScramCredentialUpsertion, \ - UserScramCredentialDeletion, ScramCredentialInfo, \ - ScramMechanism + UserScramCredentialDeletion, ScramCredentialInfo, \ + ScramMechanism from confluent_kafka.error import KafkaException, KafkaError +from tests.common import TestUtils + def test_user_scram_credentials(kafka_cluster): """ @@ -51,31 +55,42 @@ def test_user_scram_credentials(kafka_cluster): result = fut.result() assert result is None + time.sleep(1) + # Try upsertion for newuser,SCRAM_SHA_256 and add newuser,SCRAM_SHA_512 - futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - mechanism, iterations), - password, salt), - UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - ScramMechanism.SCRAM_SHA_512, 10000), - password) - ]) + request = [UserScramCredentialUpsertion(newuser, + ScramCredentialInfo( + mechanism, iterations), + password, salt), + UserScramCredentialUpsertion(newuser, + ScramCredentialInfo( + ScramMechanism.SCRAM_SHA_512, 10000), + password)] + + if TestUtils.use_kraft(): + futmap = admin_client.alter_user_scram_credentials([request[0]]) + result = futmap[newuser].result() + assert result is None + futmap = admin_client.alter_user_scram_credentials([request[1]]) + else: + futmap = admin_client.alter_user_scram_credentials(request) fut = futmap[newuser] result = fut.result() assert result is None + time.sleep(1) + # Delete newuser,SCRAM_SHA_512 futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion( - newuser, - ScramMechanism.SCRAM_SHA_512) - ]) + newuser, + ScramMechanism.SCRAM_SHA_512) + ]) fut = futmap[newuser] result = fut.result() assert result is None + time.sleep(1) + # Describe all users for args in [[], [None]]: f = admin_client.describe_user_scram_credentials(*args) @@ -108,6 +123,8 @@ def test_user_scram_credentials(kafka_cluster): result = fut.result() assert result is None + time.sleep(1) + # newuser isn't found futmap = admin_client.describe_user_scram_credentials([newuser]) assert isinstance(futmap, dict) diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index e23a0547e..b76b9892e 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -16,15 +16,17 @@ # limitations under the License. # +import time from uuid import uuid1 from trivup.clusters.KafkaCluster import KafkaCluster -from confluent_kafka import Consumer, Producer, DeserializingConsumer, \ - SerializingProducer +from confluent_kafka import Producer, SerializingProducer from confluent_kafka.admin import AdminClient, NewTopic from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient +from tests.common import TestDeserializingConsumer, TestConsumer + class KafkaClusterFixture(object): __slots__ = ['_cluster', '_admin', '_producer'] @@ -105,7 +107,7 @@ def cimpl_consumer(self, conf=None): if conf is not None: consumer_conf.update(conf) - return Consumer(consumer_conf) + return TestConsumer(consumer_conf) def consumer(self, conf=None, key_deserializer=None, value_deserializer=None): """ @@ -138,7 +140,7 @@ def consumer(self, conf=None, key_deserializer=None, value_deserializer=None): if value_deserializer is not None: consumer_conf['value.deserializer'] = value_deserializer - return DeserializingConsumer(consumer_conf) + return TestDeserializingConsumer(consumer_conf) def admin(self, conf=None): if conf: @@ -169,6 +171,25 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs): future_topic.get(name).result() return name + def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs): + """ + Creates a new topic with this cluster. Wait for the topic to be propogated to all brokers. + + :param str prefix: topic name + :param dict conf: additions/overrides to topic configuration. + :returns: The topic's name + :rtype: str + """ + name = self.create_topic(prefix, conf, **create_topic_kwargs) + + # wait for topic propogation across all the brokers. + # FIXME: find a better way to wait for topic creation + # for all languages, given option to send request to + # a specific broker isn't present everywhere. + time.sleep(1) + + return name + def seed_topic(self, topic, value_source=None, key_source=None, header_source=None): """ Populates a topic with data asynchronously. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 3945a2954..e224c59bb 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -17,35 +17,47 @@ # import os - import pytest +from tests.common import TestUtils from tests.integration.cluster_fixture import TrivupFixture from tests.integration.cluster_fixture import ByoFixture work_dir = os.path.dirname(os.path.realpath(__file__)) +def _broker_conf(): + broker_conf = ['transaction.state.log.replication.factor=1', + 'transaction.state.log.min.isr=1'] + if TestUtils.use_group_protocol_consumer(): + broker_conf.append('group.coordinator.rebalance.protocols=classic,consumer') + return broker_conf + + +def _broker_version(): + return 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07' if TestUtils.use_group_protocol_consumer() else '3.8.0' + + def create_trivup_cluster(conf={}): trivup_fixture_conf = {'with_sr': True, 'debug': True, - 'cp_version': '7.4.0', - 'version': '3.4.0', - 'broker_conf': ['transaction.state.log.replication.factor=1', - 'transaction.state.log.min.isr=1']} + 'cp_version': '7.6.0', + 'kraft': TestUtils.use_kraft(), + 'version': _broker_version(), + 'broker_conf': _broker_conf()} trivup_fixture_conf.update(conf) return TrivupFixture(trivup_fixture_conf) def create_sasl_cluster(conf={}): trivup_fixture_conf = {'with_sr': False, - 'version': '3.4.0', + 'version': _broker_version(), 'sasl_mechanism': "PLAIN", + 'kraft': TestUtils.use_kraft(), 'sasl_users': 'sasl_user=sasl_user', 'debug': True, 'cp_version': 'latest', - 'broker_conf': ['transaction.state.log.replication.factor=1', - 'transaction.state.log.min.isr=1']} + 'broker_conf': _broker_conf()} trivup_fixture_conf.update(conf) return TrivupFixture(trivup_fixture_conf) @@ -106,13 +118,13 @@ def sasl_cluster_fixture( cluster.stop() -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def kafka_cluster(): for fixture in kafka_cluster_fixture(): yield fixture -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def sasl_cluster(request): for fixture in sasl_cluster_fixture(request.param): yield fixture diff --git a/tests/integration/consumer/test_consumer_error.py b/tests/integration/consumer/test_consumer_error.py index 4b75ad7b8..0ca1902f4 100644 --- a/tests/integration/consumer/test_consumer_error.py +++ b/tests/integration/consumer/test_consumer_error.py @@ -28,7 +28,7 @@ def test_consume_error(kafka_cluster): Tests to ensure librdkafka errors are propagated as an instance of ConsumeError. """ - topic = kafka_cluster.create_topic("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'enable.partition.eof': True} producer = kafka_cluster.producer() @@ -50,7 +50,7 @@ def test_consume_error_commit(kafka_cluster): """ Tests to ensure that we handle messages with errors when commiting. """ - topic = kafka_cluster.create_topic("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'session.timeout.ms': 100} @@ -74,7 +74,7 @@ def test_consume_error_store_offsets(kafka_cluster): """ Tests to ensure that we handle messages with errors when storing offsets. """ - topic = kafka_cluster.create_topic("test_commit_transaction") + topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction") consumer_conf = {'group.id': 'pytest', 'session.timeout.ms': 100, 'enable.auto.offset.store': True, diff --git a/tests/integration/consumer/test_consumer_memberid.py b/tests/integration/consumer/test_consumer_memberid.py index cf002707d..50a686962 100644 --- a/tests/integration/consumer/test_consumer_memberid.py +++ b/tests/integration/consumer/test_consumer_memberid.py @@ -27,7 +27,7 @@ def test_consumer_memberid(kafka_cluster): topic = "testmemberid" - kafka_cluster.create_topic(topic) + kafka_cluster.create_topic_and_wait_propogation(topic) consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_consumer_topicpartition_metadata.py b/tests/integration/consumer/test_consumer_topicpartition_metadata.py index 4c01c1df7..0ac2b4466 100644 --- a/tests/integration/consumer/test_consumer_topicpartition_metadata.py +++ b/tests/integration/consumer/test_consumer_topicpartition_metadata.py @@ -30,7 +30,7 @@ def commit_and_check(consumer, topic, metadata): def test_consumer_topicpartition_metadata(kafka_cluster): - topic = kafka_cluster.create_topic("test_topicpartition") + topic = kafka_cluster.create_topic_and_wait_propogation("test_topicpartition") consumer_conf = {'group.id': 'pytest'} c = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_cooperative_rebalance_1.py b/tests/integration/consumer/test_cooperative_rebalance_1.py index d9a94a85f..2645c7028 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_1.py +++ b/tests/integration/consumer/test_cooperative_rebalance_1.py @@ -44,7 +44,13 @@ def __init__(self): def on_assign(self, consumer, partitions): self.assign_count += 1 - assert 1 == len(partitions) + if self.assign_count == 3: + # Assigning both partitions again after assignment lost + # due to max poll interval timeout exceeded + assert 2 == len(partitions) + else: + # Incremental assign cases + assert 1 == len(partitions) def on_revoke(self, consumer, partitions): self.revoke_count += 1 @@ -54,8 +60,8 @@ def on_lost(self, consumer, partitions): reb = RebalanceState() - topic1 = kafka_cluster.create_topic("topic1") - topic2 = kafka_cluster.create_topic("topic2") + topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") + topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2") consumer = kafka_cluster.consumer(consumer_conf) @@ -97,10 +103,11 @@ def on_lost(self, consumer, partitions): assert e.value.args[0].code() == KafkaError._MAX_POLL_EXCEEDED # And poll again to trigger rebalance callbacks - msg4 = consumer.poll(1) - assert msg4 is None + # It will trigger on_lost and then on_assign during rejoin + msg4 = consumer.poll(10) + assert msg4 is not None # Reading messages again after rejoin - assert 2 == reb.assign_count + assert 3 == reb.assign_count assert 1 == reb.lost_count assert 0 == reb.revoke_count diff --git a/tests/integration/consumer/test_cooperative_rebalance_2.py b/tests/integration/consumer/test_cooperative_rebalance_2.py index 8437e7c1f..f3a6df91b 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_2.py +++ b/tests/integration/consumer/test_cooperative_rebalance_2.py @@ -52,7 +52,7 @@ def on_revoke(self, consumer, partitions): reb = RebalanceState() - topic1 = kafka_cluster.create_topic("topic1") + topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") consumer = kafka_cluster.consumer(consumer_conf) diff --git a/tests/integration/consumer/test_incremental_assign.py b/tests/integration/consumer/test_incremental_assign.py index 9703ea82f..e2d9e8941 100644 --- a/tests/integration/consumer/test_incremental_assign.py +++ b/tests/integration/consumer/test_incremental_assign.py @@ -30,8 +30,8 @@ def test_incremental_assign(kafka_cluster): 'enable.auto.commit': 'false', 'auto.offset.reset': 'error'} - topic1 = kafka_cluster.create_topic("topic1") - topic2 = kafka_cluster.create_topic("topic2") + topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1") + topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2") kafka_cluster.seed_topic(topic1, value_source=[b'a']) kafka_cluster.seed_topic(topic2, value_source=[b'b']) diff --git a/tests/integration/integration_test.py b/tests/integration/integration_test.py index e4ae6deca..bc0efa1a7 100755 --- a/tests/integration/integration_test.py +++ b/tests/integration/integration_test.py @@ -20,7 +20,6 @@ """ Test script for confluent_kafka module """ -import confluent_kafka import os import time import uuid @@ -30,6 +29,10 @@ import struct import re +import confluent_kafka + +from tests.common import TestConsumer, TestAvroConsumer + try: # Memory tracker from pympler import tracker @@ -373,7 +376,7 @@ def verify_consumer(): 'enable.partition.eof': True} # Create consumer - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) def print_wmark(consumer, topic_parts): # Verify #294: get_watermark_offsets() should not fail on the first call @@ -483,7 +486,7 @@ def print_wmark(consumer, topic_parts): c.close() # Start a new client and get the committed offsets - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3)))) for tp in offsets: print(tp) @@ -500,7 +503,7 @@ def verify_consumer_performance(): 'error_cb': error_cb, 'auto.offset.reset': 'earliest'} - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) def my_on_assign(consumer, partitions): print('on_assign:', len(partitions), 'partitions:') @@ -608,7 +611,7 @@ def verify_batch_consumer(): 'auto.offset.reset': 'earliest'} # Create consumer - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) # Subscribe to a list of topics c.subscribe([topic]) @@ -665,7 +668,7 @@ def verify_batch_consumer(): c.close() # Start a new client and get the committed offsets - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3)))) for tp in offsets: print(tp) @@ -682,7 +685,7 @@ def verify_batch_consumer_performance(): 'error_cb': error_cb, 'auto.offset.reset': 'earliest'} - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) def my_on_assign(consumer, partitions): print('on_assign:', len(partitions), 'partitions:') @@ -877,7 +880,7 @@ def run_avro_loop(producer_conf, consumer_conf): p.produce(**combo) p.flush() - c = avro.AvroConsumer(consumer_conf) + c = TestAvroConsumer(consumer_conf) c.subscribe([(t['topic']) for t in combinations]) msgcount = 0 @@ -989,7 +992,7 @@ def stats_cb(stats_json_str): 'statistics.interval.ms': 200, 'auto.offset.reset': 'earliest'} - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) c.subscribe([topic]) max_msgcnt = 1000000 @@ -1116,7 +1119,7 @@ def verify_avro_explicit_read_schema(): p.produce(topic=avro_topic, **combo) p.flush() - c = avro.AvroConsumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema) + c = TestAvroConsumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema) c.subscribe([avro_topic]) msgcount = 0 diff --git a/tests/integration/producer/__init__.py b/tests/integration/producer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/producer/test_transactions.py b/tests/integration/producer/test_transactions.py index b43f81437..ff83fea70 100644 --- a/tests/integration/producer/test_transactions.py +++ b/tests/integration/producer/test_transactions.py @@ -19,7 +19,9 @@ import sys from uuid import uuid1 -from confluent_kafka import Consumer, KafkaError +from confluent_kafka import KafkaError + +from tests.common import TestConsumer def called_by(): @@ -49,7 +51,7 @@ def delivery_err(err, msg): def test_commit_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -64,7 +66,7 @@ def test_commit_transaction(kafka_cluster): def test_abort_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -79,7 +81,7 @@ def test_abort_transaction(kafka_cluster): def test_abort_retry_commit_transaction(kafka_cluster): - output_topic = kafka_cluster.create_topic("output_topic") + output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") producer = kafka_cluster.producer({ 'transactional.id': 'example_transactional_id', @@ -97,8 +99,8 @@ def test_abort_retry_commit_transaction(kafka_cluster): def test_send_offsets_committed_transaction(kafka_cluster): - input_topic = kafka_cluster.create_topic("input_topic") - output_topic = kafka_cluster.create_topic("output_topic") + input_topic = kafka_cluster.create_topic_and_wait_propogation("input_topic") + output_topic = kafka_cluster.create_topic_and_wait_propogation("output_topic") error_cb = prefixed_error_cb('test_send_offsets_committed_transaction') producer = kafka_cluster.producer({ 'client.id': 'producer1', @@ -114,7 +116,7 @@ def test_send_offsets_committed_transaction(kafka_cluster): 'error_cb': error_cb } consumer_conf.update(kafka_cluster.client_conf()) - consumer = Consumer(consumer_conf) + consumer = TestConsumer(consumer_conf) kafka_cluster.seed_topic(input_topic) consumer.subscribe([input_topic]) @@ -204,7 +206,7 @@ def consume_committed(conf, topic): 'error_cb': prefixed_error_cb(called_by()), } consumer_conf.update(conf) - consumer = Consumer(consumer_conf) + consumer = TestConsumer(consumer_conf) consumer.subscribe([topic]) msg_cnt = read_all_msgs(consumer) diff --git a/tests/integration/schema_registry/test_avro_serializers.py b/tests/integration/schema_registry/test_avro_serializers.py index 882af40db..4140ad600 100644 --- a/tests/integration/schema_registry/test_avro_serializers.py +++ b/tests/integration/schema_registry/test_avro_serializers.py @@ -151,7 +151,7 @@ def _references_test_common(kafka_cluster, awarded_user, serializer_schema, dese Args: kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic("reference-avro") + topic = kafka_cluster.create_topic_and_wait_propogation("reference-avro") sr = kafka_cluster.schema_registry() value_serializer = AvroSerializer(sr, serializer_schema, @@ -207,7 +207,7 @@ def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, record_ data (object): data to be serialized """ - topic = kafka_cluster.create_topic("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") sr = kafka_cluster.schema_registry() schema_str = load_file(avsc) @@ -251,7 +251,7 @@ def test_delivery_report_serialization(kafka_cluster, load_file, avsc, data, rec data (object): data to be serialized """ - topic = kafka_cluster.create_topic("serialization-avro-dr") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro-dr") sr = kafka_cluster.schema_registry() schema_str = load_file(avsc) @@ -298,7 +298,7 @@ def test_avro_record_serialization_custom(kafka_cluster): kafka_cluster (KafkaClusterFixture): cluster fixture """ - topic = kafka_cluster.create_topic("serialization-avro") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-avro") sr = kafka_cluster.schema_registry() user = User('Bowie', 47, 'purple') diff --git a/tests/integration/schema_registry/test_json_serializers.py b/tests/integration/schema_registry/test_json_serializers.py index 3a60598d4..69b2b0339 100644 --- a/tests/integration/schema_registry/test_json_serializers.py +++ b/tests/integration/schema_registry/test_json_serializers.py @@ -257,7 +257,7 @@ def test_json_record_serialization(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -305,7 +305,7 @@ def test_json_record_serialization_incompatible(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -350,7 +350,7 @@ def test_json_record_serialization_custom(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") @@ -394,7 +394,7 @@ def test_json_record_deserialization_mismatch(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - topic = kafka_cluster.create_topic("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") sr = kafka_cluster.schema_registry() schema_str = load_file("contractor.json") @@ -435,7 +435,7 @@ def _register_referenced_schemas(sr, load_file): def test_json_reference(kafka_cluster, load_file): - topic = kafka_cluster.create_topic("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") sr = kafka_cluster.schema_registry() product = {"productId": 1, @@ -474,7 +474,7 @@ def test_json_reference(kafka_cluster, load_file): def test_json_reference_custom(kafka_cluster, load_file): - topic = kafka_cluster.create_topic("serialization-json") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-json") sr = kafka_cluster.schema_registry() product = _TestProduct(product_id=1, diff --git a/tests/integration/schema_registry/test_proto_serializers.py b/tests/integration/schema_registry/test_proto_serializers.py index 621beac43..16de4ea6b 100644 --- a/tests/integration/schema_registry/test_proto_serializers.py +++ b/tests/integration/schema_registry/test_proto_serializers.py @@ -52,7 +52,7 @@ def test_protobuf_message_serialization(kafka_cluster, pb2, data): Validates that we get the same message back that we put in. """ - topic = kafka_cluster.create_topic("serialization-proto") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto") sr = kafka_cluster.schema_registry() value_serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) @@ -85,7 +85,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): """ sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) producer = kafka_cluster.producer(key_serializer=serializer) @@ -106,7 +106,7 @@ def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_2 = NestedTestProto_pb2.NestedMessage sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) producer = kafka_cluster.producer(key_serializer=serializer) @@ -127,7 +127,7 @@ def test_protobuf_deserializer_type_mismatch(kafka_cluster): pb2_2 = metadata_proto_pb2.HDFSOptions sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic("serialization-proto-refs") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) deserializer = ProtobufDeserializer(pb2_2, {'use.deprecated.format': False}) diff --git a/tests/integration/serialization/test_serializers.py b/tests/integration/serialization/test_serializers.py index 7c4f796c2..f8ac10d0e 100644 --- a/tests/integration/serialization/test_serializers.py +++ b/tests/integration/serialization/test_serializers.py @@ -45,7 +45,7 @@ def test_numeric_serialization(kafka_cluster, serializer, deserializer, data): data(object): input data """ - topic = kafka_cluster.create_topic("serialization-numeric") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-numeric") producer = kafka_cluster.producer(value_serializer=serializer) producer.produce(topic, value=data) @@ -77,7 +77,7 @@ def test_string_serialization(kafka_cluster, data, codec): codec (str): encoding type """ - topic = kafka_cluster.create_topic("serialization-string") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-string") producer = kafka_cluster.producer(value_serializer=StringSerializer(codec)) @@ -119,7 +119,7 @@ def test_mixed_serialization(kafka_cluster, key_serializer, value_serializer, value (object): value data """ - topic = kafka_cluster.create_topic("serialization-numeric") + topic = kafka_cluster.create_topic_and_wait_propogation("serialization-numeric") producer = kafka_cluster.producer(key_serializer=key_serializer, value_serializer=value_serializer) diff --git a/tests/requirements.txt b/tests/requirements.txt index b5e20efb2..3b113a623 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -5,7 +5,7 @@ pytest==4.6.9;python_version<="3.0" pytest>=6.0.0;python_version>="3.0" pytest-timeout requests-mock -trivup>=0.8.3 +tests/trivup/trivup-0.12.6.tar.gz fastavro<1.8.0;python_version=="3.7" fastavro>=1.8.4;python_version>"3.7" fastavro diff --git a/tests/soak/soakclient.py b/tests/soak/soakclient.py index 453c7e5a5..027cc2e83 100755 --- a/tests/soak/soakclient.py +++ b/tests/soak/soakclient.py @@ -26,11 +26,12 @@ # from confluent_kafka import KafkaError, KafkaException, version -from confluent_kafka import Producer, Consumer +from confluent_kafka import Producer from confluent_kafka.admin import AdminClient, NewTopic from collections import defaultdict from builtins import int from opentelemetry import metrics +from common import TestConsumer import argparse import threading import time @@ -446,7 +447,7 @@ def filter_config(conf, filter_out, strip_prefix): cconf['error_cb'] = self.consumer_error_cb cconf['on_commit'] = self.consumer_commit_cb self.logger.info("consumer: using group.id {}".format(cconf['group.id'])) - self.consumer = Consumer(cconf) + self.consumer = TestConsumer(cconf) # Create and start producer thread self.producer_thread = threading.Thread(target=self.producer_thread_main) diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 5e8ff2e6c..ca9832f87 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -6,10 +6,11 @@ ResourcePatternType, AclOperation, AclPermissionType, AlterConfigOpType, \ ScramCredentialInfo, ScramMechanism, \ UserScramCredentialAlteration, UserScramCredentialDeletion, \ - UserScramCredentialUpsertion, OffsetSpec + UserScramCredentialUpsertion, OffsetSpec, \ + ListConsumerGroupsResult from confluent_kafka import KafkaException, KafkaError, libversion, \ TopicPartition, ConsumerGroupTopicPartitions, ConsumerGroupState, \ - IsolationLevel, TopicCollection + IsolationLevel, TopicCollection, ConsumerGroupType import concurrent.futures @@ -619,6 +620,26 @@ def test_list_consumer_groups_api(): with pytest.raises(TypeError): a.list_consumer_groups(states=[ConsumerGroupState.EMPTY, ConsumerGroupState.STABLE]) + with pytest.raises(TypeError): + a.list_consumer_groups(types=[ConsumerGroupType.CLASSIC]) + + with pytest.raises(TypeError): + a.list_consumer_groups(types="UNKNOWN") + + with pytest.raises(ValueError): + a.list_consumer_groups(types={ConsumerGroupType.UNKNOWN}) + + f = a.list_consumer_groups(types={ConsumerGroupType.CLASSIC, + ConsumerGroupType.CLASSIC}, + request_timeout=0.5) + r = f.result(timeout=1) + assert isinstance(r, ListConsumerGroupsResult) + assert len(r.errors) == 1 + assert len(r.valid) == 0 + e = r.errors[0] + assert isinstance(e, KafkaError) + assert e.code() == KafkaError._TIMED_OUT + def test_describe_consumer_groups_api(): a = AdminClient({"socket.timeout.ms": 10}) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 48a8d3f8e..73cc999e3 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1,9 +1,12 @@ #!/usr/bin/env python +import pytest + from confluent_kafka import (Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE, OFFSET_INVALID, libversion) -import pytest + +from tests.common import TestConsumer def test_basic_api(): @@ -11,15 +14,15 @@ def test_basic_api(): broker configured. """ with pytest.raises(TypeError) as ex: - kc = Consumer() + kc = TestConsumer() assert ex.match('expected configuration dict') def dummy_commit_cb(err, partitions): pass - kc = Consumer({'group.id': 'test', 'socket.timeout.ms': '100', - 'session.timeout.ms': 1000, # Avoid close() blocking too long - 'on_commit': dummy_commit_cb}) + kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'on_commit': dummy_commit_cb}) kc.subscribe(["test"]) kc.unsubscribe() @@ -119,11 +122,11 @@ def dummy_assign_revoke(consumer, partitions): def test_store_offsets(): """ Basic store_offsets() tests """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) @@ -161,10 +164,10 @@ def commit_cb(cs, err, ps): cs = CommitState('test', 2) - c = Consumer({'group.id': 'x', - 'enable.auto.commit': False, 'socket.timeout.ms': 50, - 'session.timeout.ms': 100, - 'on_commit': lambda err, ps: commit_cb(cs, err, ps)}) + c = TestConsumer({'group.id': 'x', + 'enable.auto.commit': False, 'socket.timeout.ms': 50, + 'session.timeout.ms': 100, + 'on_commit': lambda err, ps: commit_cb(cs, err, ps)}) c.assign([TopicPartition(cs.topic, cs.partition)]) @@ -196,11 +199,11 @@ def poll(self, somearg): @pytest.mark.skipif(libversion()[1] < 0x000b0000, reason="requires librdkafka >=0.11.0") def test_offsets_for_times(): - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) # Query broker for timestamps for partition try: test_topic_partition = TopicPartition("test", 0, 100) @@ -216,11 +219,11 @@ def test_offsets_for_times(): def test_multiple_close_does_not_throw_exception(): """ Calling Consumer.close() multiple times should not throw Runtime Exception """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) @@ -232,11 +235,11 @@ def test_multiple_close_does_not_throw_exception(): def test_any_method_after_close_throws_exception(): """ Calling any consumer method after close should throw a RuntimeError """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) c.unsubscribe() @@ -296,11 +299,11 @@ def test_any_method_after_close_throws_exception(): def test_calling_store_offsets_after_close_throws_erro(): """ calling store_offset after close should throw RuntimeError """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) c.unsubscribe() @@ -319,5 +322,5 @@ def test_consumer_without_groupid(): """ Consumer should raise exception if group.id is not set """ with pytest.raises(ValueError) as ex: - Consumer({'bootstrap.servers': "mybroker:9092"}) + TestConsumer({'bootstrap.servers': "mybroker:9092"}) assert ex.match('group.id must be set') diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 7b81d2125..0f0d69e1d 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1,10 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import pytest +from struct import pack -from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, \ +from confluent_kafka import Producer, KafkaError, KafkaException, \ TopicPartition, libversion -from struct import pack + +from tests.common import TestConsumer def error_cb(err): @@ -211,7 +213,7 @@ def test_transaction_api(): assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False - consumer = Consumer({"group.id": "testgroup"}) + consumer = TestConsumer({"group.id": "testgroup"}) group_metadata = consumer.consumer_group_metadata() consumer.close() diff --git a/tests/test_log.py b/tests/test_log.py index b28fd798c..36368a61b 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -1,10 +1,13 @@ #!/usr/bin/env python from io import StringIO +import logging + import confluent_kafka import confluent_kafka.avro import confluent_kafka.admin -import logging + +from tests.common import TestConsumer, TestAvroConsumer class CountingFilter(logging.Filter): @@ -35,10 +38,9 @@ def test_logging_consumer(): logger.setLevel(logging.DEBUG) f = CountingFilter('consumer') logger.addFilter(f) - - kc = confluent_kafka.Consumer({'group.id': 'test', - 'debug': 'all'}, - logger=logger) + kc = TestConsumer({'group.id': 'test', + 'debug': 'all'}, + logger=logger) while f.cnt == 0: kc.poll(timeout=0.5) @@ -55,10 +57,10 @@ def test_logging_avro_consumer(): f = CountingFilter('avroconsumer') logger.addFilter(f) - kc = confluent_kafka.avro.AvroConsumer({'schema.registry.url': 'http://example.com', - 'group.id': 'test', - 'debug': 'all'}, - logger=logger) + kc = TestAvroConsumer({'schema.registry.url': 'http://example.com', + 'group.id': 'test', + 'debug': 'all'}, + logger=logger) while f.cnt == 0: kc.poll(timeout=0.5) @@ -150,7 +152,7 @@ def test_consumer_logger_logging_in_given_format(): stringBuffer, logger = _setup_string_buffer_logger('Consumer') - c = confluent_kafka.Consumer( + c = TestConsumer( {"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"}) c.poll(0) diff --git a/tests/test_misc.py b/tests/test_misc.py index aca7b5a4f..4db5cff5a 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1,14 +1,17 @@ #!/usr/bin/env python -import confluent_kafka -from confluent_kafka import Consumer, Producer -from confluent_kafka.admin import AdminClient import json import pytest import os import time import sys +import confluent_kafka +from confluent_kafka import Consumer, Producer +from confluent_kafka.admin import AdminClient + +from tests.common import TestConsumer + def test_version(): print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version()) @@ -40,7 +43,7 @@ def error_cb(error_msg): 'error_cb': error_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) kc.subscribe(["test"]) while not seen_error_cb: kc.poll(timeout=0.1) @@ -64,7 +67,7 @@ def stats_cb(stats_json_str): 'stats_cb': stats_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) kc.subscribe(["test"]) while not seen_stats_cb: @@ -137,7 +140,7 @@ def oauth_cb(oauth_config): 'oauth_cb': oauth_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) while not seen_oauth_cb: kc.poll(timeout=0.1) @@ -162,7 +165,7 @@ def oauth_cb(oauth_config): 'oauth_cb': oauth_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) while not seen_oauth_cb: kc.poll(timeout=0.1) @@ -189,7 +192,7 @@ def oauth_cb(oauth_config): 'oauth_cb': oauth_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) while oauth_cb_count < 2: kc.poll(timeout=0.1) @@ -267,7 +270,7 @@ def on_delivery(err, msg): def test_set_sasl_credentials_api(): clients = [ AdminClient({}), - confluent_kafka.Consumer({"group.id": "dummy"}), + TestConsumer({"group.id": "dummy"}), confluent_kafka.Producer({})] for c in clients: diff --git a/tests/trivup/trivup-0.12.6.tar.gz b/tests/trivup/trivup-0.12.6.tar.gz new file mode 100644 index 000000000..5417120a6 Binary files /dev/null and b/tests/trivup/trivup-0.12.6.tar.gz differ diff --git a/tools/source-package-verification.sh b/tools/source-package-verification.sh index eb7506061..58ccefddc 100755 --- a/tools/source-package-verification.sh +++ b/tools/source-package-verification.sh @@ -18,8 +18,10 @@ export DYLD_LIBRARY_PATH="$DYLD_LIBRARY_PATH:$PWD/$lib_dir" python setup.py build && python setup.py install if [[ $OS_NAME == linux && $ARCH == x64 ]]; then - flake8 --exclude ./_venv,*_pb2.py - make docs + if [[ -z $TEST_CONSUMER_GROUP_PROTOCOL ]]; then + flake8 --exclude ./_venv,*_pb2.py + make docs + fi python -m pytest --timeout 1200 --ignore=dest else python -m pytest --timeout 1200 --ignore=dest --ignore=tests/integration