Skip to content

Commit 0324fac

Browse files
committed
Added Test*Consumer classes instead of functions
1 parent 4917e15 commit 0324fac

File tree

9 files changed

+59
-55
lines changed

9 files changed

+59
-55
lines changed

tests/common/__init__.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,24 @@ def use_kraft():
3636
return use_group_protocol_consumer() or _trivup_cluster_type_kraft()
3737

3838

39-
def _get_consumer_generic(consumer_clazz, conf=None, **kwargs):
40-
if use_group_protocol_consumer():
41-
if conf is not None and 'group.id' in conf:
42-
conf['group.protocol'] = 'consumer'
43-
return consumer_clazz(conf, **kwargs)
39+
def _update_conf_group_protocol(conf=None):
40+
if conf is not None and 'group.id' in conf and use_group_protocol_consumer():
41+
conf['group.protocol'] = 'consumer'
4442

4543

46-
def get_consumer(conf=None, **kwargs):
47-
return _get_consumer_generic(Consumer, conf, **kwargs)
44+
class TestConsumer(Consumer):
45+
def __init__(self, conf, **kwargs):
46+
_update_conf_group_protocol(conf)
47+
super(TestConsumer, self).__init__(conf, **kwargs)
4848

4949

50-
def get_avro_consumer(conf=None, **kwargs):
51-
return _get_consumer_generic(AvroConsumer, conf, **kwargs)
50+
class TestDeserializingConsumer(DeserializingConsumer):
51+
def __init__(self, conf, **kwargs):
52+
_update_conf_group_protocol(conf)
53+
super(TestDeserializingConsumer, self).__init__(conf, **kwargs)
5254

5355

54-
def get_deserializing_consumer(conf=None, **kwargs):
55-
return _get_consumer_generic(DeserializingConsumer, conf, **kwargs)
56+
class TestAvroConsumer(AvroConsumer):
57+
def __init__(self, conf, **kwargs):
58+
_update_conf_group_protocol(conf)
59+
super(TestAvroConsumer, self).__init__(conf, **kwargs)

tests/integration/cluster_fixture.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from confluent_kafka import Producer, SerializingProducer
2424
from confluent_kafka.admin import AdminClient, NewTopic
2525
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
26-
from ..common import get_consumer, get_deserializing_consumer
26+
from ..common import TestDeserializingConsumer, TestConsumer
2727

2828

2929
class KafkaClusterFixture(object):
@@ -105,7 +105,7 @@ def cimpl_consumer(self, conf=None):
105105
if conf is not None:
106106
consumer_conf.update(conf)
107107

108-
return get_consumer(consumer_conf)
108+
return TestConsumer(consumer_conf)
109109

110110
def consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
111111
"""
@@ -138,7 +138,7 @@ def consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
138138
if value_deserializer is not None:
139139
consumer_conf['value.deserializer'] = value_deserializer
140140

141-
return get_deserializing_consumer(consumer_conf)
141+
return TestDeserializingConsumer(consumer_conf)
142142

143143
def admin(self, conf=None):
144144
if conf:

tests/integration/integration_test.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import gc
3030
import struct
3131
import re
32-
from ..common import get_consumer, get_avro_consumer
32+
from ..common import TestConsumer, TestAvroConsumer
3333

3434
try:
3535
# Memory tracker
@@ -374,7 +374,7 @@ def verify_consumer():
374374
'enable.partition.eof': True}
375375

376376
# Create consumer
377-
c = get_consumer(conf)
377+
c = TestConsumer(conf)
378378

379379
def print_wmark(consumer, topic_parts):
380380
# Verify #294: get_watermark_offsets() should not fail on the first call
@@ -484,7 +484,7 @@ def print_wmark(consumer, topic_parts):
484484
c.close()
485485

486486
# Start a new client and get the committed offsets
487-
c = get_consumer(conf)
487+
c = TestConsumer(conf)
488488
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3))))
489489
for tp in offsets:
490490
print(tp)
@@ -501,7 +501,7 @@ def verify_consumer_performance():
501501
'error_cb': error_cb,
502502
'auto.offset.reset': 'earliest'}
503503

504-
c = get_consumer(conf)
504+
c = TestConsumer(conf)
505505

506506
def my_on_assign(consumer, partitions):
507507
print('on_assign:', len(partitions), 'partitions:')
@@ -609,7 +609,7 @@ def verify_batch_consumer():
609609
'auto.offset.reset': 'earliest'}
610610

611611
# Create consumer
612-
c = get_consumer(conf)
612+
c = TestConsumer(conf)
613613

614614
# Subscribe to a list of topics
615615
c.subscribe([topic])
@@ -666,7 +666,7 @@ def verify_batch_consumer():
666666
c.close()
667667

668668
# Start a new client and get the committed offsets
669-
c = get_consumer(conf)
669+
c = TestConsumer(conf)
670670
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3))))
671671
for tp in offsets:
672672
print(tp)
@@ -683,7 +683,7 @@ def verify_batch_consumer_performance():
683683
'error_cb': error_cb,
684684
'auto.offset.reset': 'earliest'}
685685

686-
c = get_consumer(conf)
686+
c = TestConsumer(conf)
687687

688688
def my_on_assign(consumer, partitions):
689689
print('on_assign:', len(partitions), 'partitions:')
@@ -878,7 +878,7 @@ def run_avro_loop(producer_conf, consumer_conf):
878878
p.produce(**combo)
879879
p.flush()
880880

881-
c = get_avro_consumer(consumer_conf)
881+
c = TestAvroConsumer(consumer_conf)
882882
c.subscribe([(t['topic']) for t in combinations])
883883

884884
msgcount = 0
@@ -990,7 +990,7 @@ def stats_cb(stats_json_str):
990990
'statistics.interval.ms': 200,
991991
'auto.offset.reset': 'earliest'}
992992

993-
c = get_consumer(conf)
993+
c = TestConsumer(conf)
994994
c.subscribe([topic])
995995

996996
max_msgcnt = 1000000
@@ -1117,7 +1117,7 @@ def verify_avro_explicit_read_schema():
11171117
p.produce(topic=avro_topic, **combo)
11181118
p.flush()
11191119

1120-
c = get_avro_consumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema)
1120+
c = TestAvroConsumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema)
11211121
c.subscribe([avro_topic])
11221122

11231123
msgcount = 0

tests/integration/producer/test_transactions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import inspect
1919
import sys
2020
from uuid import uuid1
21-
from ...common import get_consumer
21+
from ...common import TestConsumer
2222

2323
from confluent_kafka import KafkaError
2424

@@ -115,7 +115,7 @@ def test_send_offsets_committed_transaction(kafka_cluster):
115115
'error_cb': error_cb
116116
}
117117
consumer_conf.update(kafka_cluster.client_conf())
118-
consumer = get_consumer(consumer_conf)
118+
consumer = TestConsumer(consumer_conf)
119119

120120
kafka_cluster.seed_topic(input_topic)
121121
consumer.subscribe([input_topic])
@@ -205,7 +205,7 @@ def consume_committed(conf, topic):
205205
'error_cb': prefixed_error_cb(called_by()), }
206206

207207
consumer_conf.update(conf)
208-
consumer = get_consumer(consumer_conf)
208+
consumer = TestConsumer(consumer_conf)
209209
consumer.subscribe([topic])
210210

211211
msg_cnt = read_all_msgs(consumer)

tests/soak/soakclient.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from confluent_kafka.admin import AdminClient, NewTopic
3131
from collections import defaultdict
3232
from builtins import int
33-
from common import get_consumer
33+
from common import TestConsumer
3434
import argparse
3535
import threading
3636
import time
@@ -445,7 +445,7 @@ def filter_config(conf, filter_out, strip_prefix):
445445
cconf['error_cb'] = self.consumer_error_cb
446446
cconf['on_commit'] = self.consumer_commit_cb
447447
self.logger.info("consumer: using group.id {}".format(cconf['group.id']))
448-
self.consumer = get_consumer(cconf)
448+
self.consumer = TestConsumer(cconf)
449449

450450
# Create and start producer thread
451451
self.producer_thread = threading.Thread(target=self.producer_thread_main)

tests/test_Consumer.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,21 @@
44
KafkaException, TIMESTAMP_NOT_AVAILABLE,
55
OFFSET_INVALID, libversion)
66
import pytest
7-
from .common import get_consumer
7+
from .common import TestConsumer
88

99

1010
def test_basic_api():
1111
""" Basic API tests, these wont really do anything since there is no
1212
broker configured. """
1313

1414
with pytest.raises(TypeError) as ex:
15-
kc = get_consumer()
15+
kc = TestConsumer()
1616
assert ex.match('expected configuration dict')
1717

1818
def dummy_commit_cb(err, partitions):
1919
pass
2020

21-
kc = get_consumer({'group.id': 'test', 'socket.timeout.ms': '100',
21+
kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100',
2222
'session.timeout.ms': 1000, # Avoid close() blocking too long
2323
'on_commit': dummy_commit_cb})
2424

@@ -120,7 +120,7 @@ def dummy_assign_revoke(consumer, partitions):
120120
def test_store_offsets():
121121
""" Basic store_offsets() tests """
122122

123-
c = get_consumer({'group.id': 'test',
123+
c = TestConsumer({'group.id': 'test',
124124
'enable.auto.commit': True,
125125
'enable.auto.offset.store': False,
126126
'socket.timeout.ms': 50,
@@ -162,7 +162,7 @@ def commit_cb(cs, err, ps):
162162

163163
cs = CommitState('test', 2)
164164

165-
c = get_consumer({'group.id': 'x',
165+
c = TestConsumer({'group.id': 'x',
166166
'enable.auto.commit': False, 'socket.timeout.ms': 50,
167167
'session.timeout.ms': 100,
168168
'on_commit': lambda err, ps: commit_cb(cs, err, ps)})
@@ -197,7 +197,7 @@ def poll(self, somearg):
197197
@pytest.mark.skipif(libversion()[1] < 0x000b0000,
198198
reason="requires librdkafka >=0.11.0")
199199
def test_offsets_for_times():
200-
c = get_consumer({'group.id': 'test',
200+
c = TestConsumer({'group.id': 'test',
201201
'enable.auto.commit': True,
202202
'enable.auto.offset.store': False,
203203
'socket.timeout.ms': 50,
@@ -217,7 +217,7 @@ def test_offsets_for_times():
217217
def test_multiple_close_does_not_throw_exception():
218218
""" Calling Consumer.close() multiple times should not throw Runtime Exception
219219
"""
220-
c = get_consumer({'group.id': 'test',
220+
c = TestConsumer({'group.id': 'test',
221221
'enable.auto.commit': True,
222222
'enable.auto.offset.store': False,
223223
'socket.timeout.ms': 50,
@@ -233,7 +233,7 @@ def test_multiple_close_does_not_throw_exception():
233233
def test_any_method_after_close_throws_exception():
234234
""" Calling any consumer method after close should throw a RuntimeError
235235
"""
236-
c = get_consumer({'group.id': 'test',
236+
c = TestConsumer({'group.id': 'test',
237237
'enable.auto.commit': True,
238238
'enable.auto.offset.store': False,
239239
'socket.timeout.ms': 50,
@@ -297,7 +297,7 @@ def test_any_method_after_close_throws_exception():
297297
def test_calling_store_offsets_after_close_throws_erro():
298298
""" calling store_offset after close should throw RuntimeError """
299299

300-
c = get_consumer({'group.id': 'test',
300+
c = TestConsumer({'group.id': 'test',
301301
'enable.auto.commit': True,
302302
'enable.auto.offset.store': False,
303303
'socket.timeout.ms': 50,
@@ -320,5 +320,5 @@ def test_consumer_without_groupid():
320320
""" Consumer should raise exception if group.id is not set """
321321

322322
with pytest.raises(ValueError) as ex:
323-
get_consumer({'bootstrap.servers': "mybroker:9092"})
323+
TestConsumer({'bootstrap.servers': "mybroker:9092"})
324324
assert ex.match('group.id must be set')

tests/test_Producer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from confluent_kafka import Producer, KafkaError, KafkaException, \
66
TopicPartition, libversion
77
from struct import pack
8-
from .common import get_consumer
8+
from .common import TestConsumer
99

1010

1111
def error_cb(err):
@@ -212,7 +212,7 @@ def test_transaction_api():
212212
assert ex.value.args[0].fatal() is False
213213
assert ex.value.args[0].txn_requires_abort() is False
214214

215-
consumer = get_consumer({"group.id": "testgroup"})
215+
consumer = TestConsumer({"group.id": "testgroup"})
216216
group_metadata = consumer.consumer_group_metadata()
217217
consumer.close()
218218

tests/test_log.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import confluent_kafka
55
import confluent_kafka.avro
66
import logging
7-
from .common import get_consumer, get_avro_consumer
7+
from .common import TestConsumer, TestAvroConsumer
88

99

1010
class CountingFilter(logging.Filter):
@@ -25,7 +25,7 @@ def test_logging_consumer():
2525
logger.setLevel(logging.DEBUG)
2626
f = CountingFilter('consumer')
2727
logger.addFilter(f)
28-
kc = get_consumer({'group.id': 'test',
28+
kc = TestConsumer({'group.id': 'test',
2929
'debug': 'all'},
3030
logger=logger)
3131
while f.cnt == 0:
@@ -44,10 +44,10 @@ def test_logging_avro_consumer():
4444
f = CountingFilter('avroconsumer')
4545
logger.addFilter(f)
4646

47-
kc = get_avro_consumer({'schema.registry.url': 'http://example.com',
48-
'group.id': 'test',
49-
'debug': 'all'},
50-
logger=logger)
47+
kc = TestAvroConsumer({'schema.registry.url': 'http://example.com',
48+
'group.id': 'test',
49+
'debug': 'all'},
50+
logger=logger)
5151
while f.cnt == 0:
5252
kc.poll(timeout=0.5)
5353

@@ -149,7 +149,7 @@ def test_consumer_logger_logging_in_given_format():
149149
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
150150
logger.addHandler(handler)
151151

152-
c = get_consumer(
152+
c = TestConsumer(
153153
{"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"})
154154
c.poll(0)
155155

0 commit comments

Comments
 (0)