@@ -48,13 +48,11 @@ def closed?
48
48
@native_kafka . closed?
49
49
end
50
50
51
- # Subscribe to one or more topics letting Kafka handle partition assignments.
51
+ # Subscribes to one or more topics letting Kafka handle partition assignments.
52
52
#
53
53
# @param topics [Array<String>] One or more topic names
54
- #
55
- # @raise [RdkafkaError] When subscribing fails
56
- #
57
54
# @return [nil]
55
+ # @raise [RdkafkaError] When subscribing fails
58
56
def subscribe ( *topics )
59
57
closed_consumer_check ( __method__ )
60
58
@@ -78,9 +76,8 @@ def subscribe(*topics)
78
76
79
77
# Unsubscribe from all subscribed topics.
80
78
#
81
- # @raise [RdkafkaError] When unsubscribing fails
82
- #
83
79
# @return [nil]
80
+ # @raise [RdkafkaError] When unsubscribing fails
84
81
def unsubscribe
85
82
closed_consumer_check ( __method__ )
86
83
@@ -95,10 +92,8 @@ def unsubscribe
95
92
# Pause producing or consumption for the provided list of partitions
96
93
#
97
94
# @param list [TopicPartitionList] The topic with partitions to pause
98
- #
99
- # @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
100
- #
101
95
# @return [nil]
96
+ # @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
102
97
def pause ( list )
103
98
closed_consumer_check ( __method__ )
104
99
@@ -122,13 +117,11 @@ def pause(list)
122
117
end
123
118
end
124
119
125
- # Resume producing consumption for the provided list of partitions
120
+ # Resumes producing consumption for the provided list of partitions
126
121
#
127
122
# @param list [TopicPartitionList] The topic with partitions to pause
128
- #
129
- # @raise [RdkafkaError] When resume subscription fails.
130
- #
131
123
# @return [nil]
124
+ # @raise [RdkafkaError] When resume subscription fails.
132
125
def resume ( list )
133
126
closed_consumer_check ( __method__ )
134
127
@@ -150,11 +143,10 @@ def resume(list)
150
143
end
151
144
end
152
145
153
- # Return the current subscription to topics and partitions
154
- #
155
- # @raise [RdkafkaError] When getting the subscription fails.
146
+ # Returns the current subscription to topics and partitions
156
147
#
157
148
# @return [TopicPartitionList]
149
+ # @raise [RdkafkaError] When getting the subscription fails.
158
150
def subscription
159
151
closed_consumer_check ( __method__ )
160
152
@@ -179,7 +171,6 @@ def subscription
179
171
# Atomic assignment of partitions to consume
180
172
#
181
173
# @param list [TopicPartitionList] The topic with partitions to assign
182
- #
183
174
# @raise [RdkafkaError] When assigning fails
184
175
def assign ( list )
185
176
closed_consumer_check ( __method__ )
@@ -204,9 +195,8 @@ def assign(list)
204
195
205
196
# Returns the current partition assignment.
206
197
#
207
- # @raise [RdkafkaError] When getting the assignment fails.
208
- #
209
198
# @return [TopicPartitionList]
199
+ # @raise [RdkafkaError] When getting the assignment fails.
210
200
def assignment
211
201
closed_consumer_check ( __method__ )
212
202
@@ -232,14 +222,14 @@ def assignment
232
222
end
233
223
234
224
# Return the current committed offset per partition for this consumer group.
235
- # The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
225
+ # The offset field of each requested partition will either be set to stored offset or to -1001
226
+ # in case there was no stored offset for that partition.
236
227
#
237
- # @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription.
228
+ # @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil
229
+ # to use the current subscription.
238
230
# @param timeout_ms [Integer] The timeout for fetching this information.
239
- #
240
- # @raise [RdkafkaError] When getting the committed positions fails.
241
- #
242
231
# @return [TopicPartitionList]
232
+ # @raise [RdkafkaError] When getting the committed positions fails.
243
233
def committed ( list = nil , timeout_ms = 1200 )
244
234
closed_consumer_check ( __method__ )
245
235
@@ -269,10 +259,8 @@ def committed(list=nil, timeout_ms=1200)
269
259
# @param topic [String] The topic to query
270
260
# @param partition [Integer] The partition to query
271
261
# @param timeout_ms [Integer] The timeout for querying the broker
272
- #
273
- # @raise [RdkafkaError] When querying the broker fails.
274
- #
275
262
# @return [Integer] The low and high watermark
263
+ # @raise [RdkafkaError] When querying the broker fails.
276
264
def query_watermark_offsets ( topic , partition , timeout_ms = 200 )
277
265
closed_consumer_check ( __method__ )
278
266
@@ -306,10 +294,9 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
306
294
#
307
295
# @param topic_partition_list [TopicPartitionList] The list to calculate lag for.
308
296
# @param watermark_timeout_ms [Integer] The timeout for each query watermark call.
309
- #
297
+ # @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag
298
+ # per partition
310
299
# @raise [RdkafkaError] When querying the broker fails.
311
- #
312
- # @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag per partition
313
300
def lag ( topic_partition_list , watermark_timeout_ms = 100 )
314
301
out = { }
315
302
@@ -358,10 +345,8 @@ def member_id
358
345
# When using this `enable.auto.offset.store` should be set to `false` in the config.
359
346
#
360
347
# @param message [Rdkafka::Consumer::Message] The message which offset will be stored
361
- #
362
- # @raise [RdkafkaError] When storing the offset fails
363
- #
364
348
# @return [nil]
349
+ # @raise [RdkafkaError] When storing the offset fails
365
350
def store_offset ( message )
366
351
closed_consumer_check ( __method__ )
367
352
@@ -392,10 +377,8 @@ def store_offset(message)
392
377
# message at the given offset.
393
378
#
394
379
# @param message [Rdkafka::Consumer::Message] The message to which to seek
395
- #
396
- # @raise [RdkafkaError] When seeking fails
397
- #
398
380
# @return [nil]
381
+ # @raise [RdkafkaError] When seeking fails
399
382
def seek ( message )
400
383
closed_consumer_check ( __method__ )
401
384
@@ -434,10 +417,8 @@ def seek(message)
434
417
#
435
418
# @param list [TopicPartitionList,nil] The topic with partitions to commit
436
419
# @param async [Boolean] Whether to commit async or wait for the commit to finish
437
- #
438
- # @raise [RdkafkaError] When committing fails
439
- #
440
420
# @return [nil]
421
+ # @raise [RdkafkaError] When committing fails
441
422
def commit ( list = nil , async = false )
442
423
closed_consumer_check ( __method__ )
443
424
@@ -462,10 +443,8 @@ def commit(list=nil, async=false)
462
443
# Poll for the next message on one of the subscribed topics
463
444
#
464
445
# @param timeout_ms [Integer] Timeout of this poll
465
- #
466
- # @raise [RdkafkaError] When polling fails
467
- #
468
446
# @return [Message, nil] A message or nil if there was no new message within the timeout
447
+ # @raise [RdkafkaError] When polling fails
469
448
def poll ( timeout_ms )
470
449
closed_consumer_check ( __method__ )
471
450
@@ -494,14 +473,11 @@ def poll(timeout_ms)
494
473
# Poll for new messages and yield for each received one. Iteration
495
474
# will end when the consumer is closed.
496
475
#
497
- # If `enable.partition.eof` is turned on in the config this will raise an
498
- # error when an eof is reached, so you probably want to disable that when
499
- # using this method of iteration.
476
+ # If `enable.partition.eof` is turned on in the config this will raise an error when an eof is
477
+ # reached, so you probably want to disable that when using this method of iteration.
500
478
#
501
479
# @raise [RdkafkaError] When polling fails
502
- #
503
480
# @yieldparam message [Message] Received message
504
- #
505
481
# @return [nil]
506
482
def each
507
483
loop do
@@ -554,9 +530,7 @@ def each
554
530
# that you may or may not see again.
555
531
#
556
532
# @param max_items [Integer] Maximum size of the yielded array of messages
557
- #
558
533
# @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages
559
- #
560
534
# @param timeout_ms [Integer] max time to wait for up to max_items
561
535
#
562
536
# @raise [RdkafkaError] When polling fails
0 commit comments