Skip to content

Commit 5e13ed7

Browse files
committed
PR comments
1 parent b89e51a commit 5e13ed7

14 files changed

+90
-40
lines changed

tests/integration/admin/test_basic_operations.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import confluent_kafka
1716
import struct
1817
import time
19-
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState
18+
19+
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState, KafkaError
2020
from confluent_kafka.admin import (NewPartitions, ConfigResource,
2121
AclBinding, AclBindingFilter, ResourceType,
2222
ResourcePatternType, AclOperation, AclPermissionType)
@@ -55,6 +55,7 @@ def verify_admin_acls(admin_client,
5555
"User:test-user-2", "*", AclOperation.ALL, AclPermissionType.ALLOW)
5656

5757
fs = admin_client.create_acls([acl_binding_1, acl_binding_2, acl_binding_3])
58+
time.sleep(1)
5859
for acl_binding, f in fs.items():
5960
f.result() # trigger exception if there was an error
6061

@@ -78,6 +79,7 @@ def verify_admin_acls(admin_client,
7879
#
7980
expected_acl_bindings = [acl_binding_2, acl_binding_3]
8081
fs = admin_client.delete_acls([acl_binding_filter2])
82+
time.sleep(1)
8183
deleted_acl_bindings = sorted(fs[acl_binding_filter2].result())
8284
assert deleted_acl_bindings == expected_acl_bindings, \
8385
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
@@ -89,6 +91,7 @@ def verify_admin_acls(admin_client,
8991
expected_acl_bindings = [[acl_binding_1], []]
9092
delete_acl_binding_filters = [acl_binding_filter3, acl_binding_filter4]
9193
fs = admin_client.delete_acls(delete_acl_binding_filters)
94+
time.sleep(1)
9295
for acl_binding, expected in zip(delete_acl_binding_filters, expected_acl_bindings):
9396
deleted_acl_bindings = sorted(fs[acl_binding].result())
9497
assert deleted_acl_bindings == expected, \
@@ -209,6 +212,7 @@ def test_basic_operations(kafka_cluster):
209212
},
210213
validate_only=validate
211214
)
215+
time.sleep(1)
212216

213217
admin_client = kafka_cluster.admin()
214218

@@ -270,7 +274,7 @@ def consume_messages(group_id, num_messages=None):
270274
print('Read all the required messages: exiting')
271275
break
272276
except ConsumeError as e:
273-
if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF:
277+
if msg is not None and e.code == KafkaError._PARTITION_EOF:
274278
print('Reached end of %s [%d] at offset %d' % (
275279
msg.topic(), msg.partition(), msg.offset()))
276280
eof_reached[(msg.topic(), msg.partition())] = True
@@ -343,6 +347,7 @@ def verify_config(expconfig, configs):
343347
resource.set_config(key, value)
344348

345349
fs = admin_client.alter_configs([resource])
350+
time.sleep(1)
346351
fs[resource].result() # will raise exception on failure
347352

348353
#

tests/integration/admin/test_describe_operations.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import time
1617
import pytest
18+
1719
from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
1820
ResourcePatternType, AclOperation, AclPermissionType)
1921
from confluent_kafka.error import ConsumeError
2022
from confluent_kafka import ConsumerGroupState, TopicCollection
2123

24+
from tests.common import TestUtils
25+
2226
topic_prefix = "test-topic"
2327

2428

@@ -82,10 +86,12 @@ def perform_admin_operation_sync(operation, *arg, **kwargs):
8286

8387
def create_acls(admin_client, acl_bindings):
8488
perform_admin_operation_sync(admin_client.create_acls, acl_bindings)
89+
time.sleep(1)
8590

8691

8792
def delete_acls(admin_client, acl_binding_filters):
8893
perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters)
94+
time.sleep(1)
8995

9096

9197
def verify_provided_describe_for_authorized_operations(
@@ -115,6 +121,7 @@ def verify_provided_describe_for_authorized_operations(
115121
acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL,
116122
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
117123
create_acls(admin_client, [acl_binding])
124+
time.sleep(1)
118125

119126
# Check with updated authorized operations
120127
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
@@ -126,6 +133,7 @@ def verify_provided_describe_for_authorized_operations(
126133
acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY,
127134
None, None, AclOperation.ANY, AclPermissionType.ANY)
128135
delete_acls(admin_client, [acl_binding_filter])
136+
time.sleep(1)
129137
return desc
130138

131139

@@ -204,12 +212,17 @@ def test_describe_operations(sasl_cluster):
204212
},
205213
validate_only=False
206214
)
215+
time.sleep(1)
207216

208217
# Verify Authorized Operations in Describe Topics
209218
verify_describe_topics(admin_client, our_topic)
210219

211220
# Verify Authorized Operations in Describe Groups
212-
verify_describe_groups(sasl_cluster, admin_client, our_topic)
221+
# Skip this test if using group protocol `consumer`
222+
# as there is new RPC for describe_groups() in
223+
# group protocol `consumer` case.
224+
if not TestUtils.use_group_protocol_consumer():
225+
verify_describe_groups(sasl_cluster, admin_client, our_topic)
213226

214227
# Delete Topic
215228
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)

tests/integration/admin/test_incremental_alter_configs.py

+6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import time
17+
1618
from confluent_kafka.admin import ConfigResource, \
1719
ConfigEntry, ResourceType, \
1820
AlterConfigOpType
@@ -58,12 +60,14 @@ def test_incremental_alter_configs(kafka_cluster):
5860
"config": topic_config,
5961
"replication_factor": 1,
6062
})
63+
time.sleep(1)
6164
our_topic2 = kafka_cluster.create_topic(topic_prefix2,
6265
{
6366
"num_partitions": num_partitions,
6467
"config": topic_config,
6568
"replication_factor": 1,
6669
})
70+
time.sleep(1)
6771

6872
admin_client = kafka_cluster.admin()
6973

@@ -100,6 +104,7 @@ def test_incremental_alter_configs(kafka_cluster):
100104
# Incrementally alter some configuration values
101105
#
102106
fs = admin_client.incremental_alter_configs([res1, res2])
107+
time.sleep(1)
103108

104109
assert_operation_succeeded(fs, 2)
105110

@@ -131,6 +136,7 @@ def test_incremental_alter_configs(kafka_cluster):
131136
# Incrementally alter some configuration values
132137
#
133138
fs = admin_client.incremental_alter_configs([res2])
139+
time.sleep(1)
134140

135141
assert_operation_succeeded(fs, 1)
136142

tests/integration/admin/test_list_offsets.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
16+
import time
17+
1718
from confluent_kafka import TopicPartition, IsolationLevel
19+
from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
1820

1921

2022
def test_list_offsets(kafka_cluster):
@@ -32,6 +34,7 @@ def test_list_offsets(kafka_cluster):
3234
"num_partitions": 1,
3335
"replication_factor": 1,
3436
})
37+
time.sleep(1)
3538

3639
# Create Producer instance
3740
p = kafka_cluster.producer()

tests/integration/admin/test_user_scram_credentials.py

+23-12
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import pytest
16+
import time
1717
import concurrent
18+
import pytest
19+
1820
from confluent_kafka.admin import UserScramCredentialsDescription, UserScramCredentialUpsertion, \
1921
UserScramCredentialDeletion, ScramCredentialInfo, \
2022
ScramMechanism
2123
from confluent_kafka.error import KafkaException, KafkaError
2224

25+
from tests.common import TestUtils
2326

2427
def test_user_scram_credentials(kafka_cluster):
2528
"""
@@ -47,22 +50,28 @@ def test_user_scram_credentials(kafka_cluster):
4750
futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(newuser,
4851
ScramCredentialInfo(mechanism, iterations),
4952
password, salt)])
53+
time.sleep(1)
5054
fut = futmap[newuser]
5155
result = fut.result()
5256
assert result is None
5357

5458
# Try upsertion for newuser,SCRAM_SHA_256 and add newuser,SCRAM_SHA_512
55-
futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(
56-
newuser,
57-
ScramCredentialInfo(
58-
mechanism, iterations),
59-
password, salt),
60-
UserScramCredentialUpsertion(
61-
newuser,
62-
ScramCredentialInfo(
63-
ScramMechanism.SCRAM_SHA_512, 10000),
64-
password)
65-
])
59+
request = [UserScramCredentialUpsertion(newuser,
60+
ScramCredentialInfo(
61+
mechanism, iterations),
62+
password, salt),
63+
UserScramCredentialUpsertion(newuser,
64+
ScramCredentialInfo(
65+
ScramMechanism.SCRAM_SHA_512, 10000),
66+
password)]
67+
68+
if TestUtils.use_kraft():
69+
futmap = admin_client.alter_user_scram_credentials([request[0]])
70+
time.sleep(1)
71+
futmap = admin_client.alter_user_scram_credentials([request[1]])
72+
else:
73+
futmap = admin_client.alter_user_scram_credentials(request)
74+
time.sleep(1)
6675
fut = futmap[newuser]
6776
result = fut.result()
6877
assert result is None
@@ -72,6 +81,7 @@ def test_user_scram_credentials(kafka_cluster):
7281
newuser,
7382
ScramMechanism.SCRAM_SHA_512)
7483
])
84+
time.sleep(1)
7585
fut = futmap[newuser]
7686
result = fut.result()
7787
assert result is None
@@ -101,6 +111,7 @@ def test_user_scram_credentials(kafka_cluster):
101111

102112
# Delete newuser
103113
futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(newuser, mechanism)])
114+
time.sleep(1)
104115
assert isinstance(futmap, dict)
105116
assert len(futmap) == 1
106117
assert newuser in futmap

tests/integration/cluster_fixture.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from confluent_kafka import Producer, SerializingProducer
2424
from confluent_kafka.admin import AdminClient, NewTopic
2525
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
26-
from ..common import TestDeserializingConsumer, TestConsumer
26+
27+
from tests.common import TestDeserializingConsumer, TestConsumer
2728

2829

2930
class KafkaClusterFixture(object):

tests/integration/conftest.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
#
1818

1919
import os
20-
from ..common import TestUtils
2120
import pytest
2221

22+
from tests.common import TestUtils
2323
from tests.integration.cluster_fixture import TrivupFixture
2424
from tests.integration.cluster_fixture import ByoFixture
2525

@@ -34,20 +34,24 @@ def _broker_conf():
3434
return broker_conf
3535

3636

37+
def _broker_version():
38+
return 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07' if TestUtils.use_group_protocol_consumer() else '3.7.0'
39+
40+
3741
def create_trivup_cluster(conf={}):
3842
trivup_fixture_conf = {'with_sr': True,
3943
'debug': True,
4044
'cp_version': '7.6.0',
4145
'kraft': TestUtils.use_kraft(),
42-
'version': 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07',
46+
'version': _broker_version(),
4347
'broker_conf': _broker_conf()}
4448
trivup_fixture_conf.update(conf)
4549
return TrivupFixture(trivup_fixture_conf)
4650

4751

4852
def create_sasl_cluster(conf={}):
4953
trivup_fixture_conf = {'with_sr': False,
50-
'version': 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07',
54+
'version': _broker_version(),
5155
'sasl_mechanism': "PLAIN",
5256
'kraft': TestUtils.use_kraft(),
5357
'sasl_users': 'sasl_user=sasl_user',

tests/integration/integration_test.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
""" Test script for confluent_kafka module """
2222

23-
import confluent_kafka
2423
import os
2524
import time
2625
import uuid
@@ -29,7 +28,10 @@
2928
import gc
3029
import struct
3130
import re
32-
from ..common import TestConsumer, TestAvroConsumer
31+
32+
import confluent_kafka
33+
34+
from tests.common import TestConsumer, TestAvroConsumer
3335

3436
try:
3537
# Memory tracker

tests/integration/producer/test_transactions.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
import inspect
1919
import sys
2020
from uuid import uuid1
21-
from ...common import TestConsumer
2221

2322
from confluent_kafka import KafkaError
2423

24+
from tests.common import TestConsumer
2525

2626
def called_by():
2727
if sys.version_info < (3, 5):

tests/test_Consumer.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#!/usr/bin/env python
22

3+
import pytest
4+
35
from confluent_kafka import (Consumer, TopicPartition, KafkaError,
46
KafkaException, TIMESTAMP_NOT_AVAILABLE,
57
OFFSET_INVALID, libversion)
6-
import pytest
7-
from .common import TestConsumer
8+
9+
from tests.common import TestConsumer
810

911

1012
def test_basic_api():

tests/test_Producer.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
import pytest
4+
from struct import pack
45

56
from confluent_kafka import Producer, KafkaError, KafkaException, \
67
TopicPartition, libversion
7-
from struct import pack
8-
from .common import TestConsumer
8+
9+
from tests.common import TestConsumer
910

1011

1112
def error_cb(err):

tests/test_log.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#!/usr/bin/env python
22

33
from io import StringIO
4+
import logging
5+
46
import confluent_kafka
57
import confluent_kafka.avro
6-
import logging
7-
from .common import TestConsumer, TestAvroConsumer
8+
9+
from tests.common import TestConsumer, TestAvroConsumer
810

911

1012
class CountingFilter(logging.Filter):

0 commit comments

Comments
 (0)