@@ -74,7 +74,8 @@ def test_switch_leader(self):
74
74
self ._send_random_messages (producer , topic , partition , 10 )
75
75
76
76
# count number of messages
77
- count = self ._count_messages ('test_switch_leader group' , topic , partition )
77
+ count = self ._count_messages ('test_switch_leader group' , topic ,
78
+ partitions = (partition ,))
78
79
79
80
# Should be equal to 10 before + 1 recovery + 10 after
80
81
self .assertEquals (count , 21 )
@@ -108,7 +109,8 @@ def test_switch_leader_async(self):
108
109
producer .stop ()
109
110
110
111
# count number of messages
111
- count = self ._count_messages ('test_switch_leader_async group' , topic , partition )
112
+ count = self ._count_messages ('test_switch_leader_async group' , topic ,
113
+ partitions = (partition ,))
112
114
113
115
# Should be equal to 10 before + 1 recovery + 10 after
114
116
self .assertEquals (count , 21 )
@@ -128,18 +130,17 @@ def _kill_leader(self, topic, partition):
128
130
broker .close ()
129
131
return broker
130
132
131
- def _count_messages (self , group , topic , timeout = 1 ):
133
+ def _count_messages (self , group , topic , timeout = 1 , partitions = None ):
132
134
hosts = ',' .join (['%s:%d' % (broker .host , broker .port )
133
135
for broker in self .brokers ])
134
136
135
137
client = KafkaClient (hosts )
136
138
consumer = SimpleConsumer (client , group , topic ,
139
+ partitions = partitions ,
137
140
auto_commit = False ,
138
141
iter_timeout = timeout )
139
142
140
- all_messages = []
141
- for message in consumer :
142
- all_messages .append (message )
143
+ count = consumer .pending (partitions )
143
144
consumer .stop ()
144
145
client .close ()
145
- return len ( all_messages )
146
+ return count
0 commit comments