Skip to content

Commit 0d0665c

Browse files
committed
Add documentation for producer, move classes into Producer
1 parent 1f9196b commit 0d0665c

File tree

5 files changed

+81
-37
lines changed

5 files changed

+81
-37
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ ext/tmp
44
ext/librdkafka.*
55
*.gem
66
.yardoc
7+
doc

.yardopts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
--no-private

lib/rdkafka/ffi.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require "logger"
33

44
module Rdkafka
5+
# @private
56
module FFI
67
extend ::FFI::Library
78

@@ -144,7 +145,7 @@ class TopicPartitionList < ::FFI::Struct
144145
:void, [:pointer, :pointer, :pointer]
145146
) do |client_ptr, message_ptr, opaque_ptr|
146147
message = Message.new(message_ptr)
147-
delivery_handle = Rdkafka::DeliveryHandle.new(message[:_private])
148+
delivery_handle = Rdkafka::Producer::DeliveryHandle.new(message[:_private])
148149
delivery_handle[:pending] = false
149150
delivery_handle[:response] = message[:err]
150151
delivery_handle[:partition] = message[:partition]

lib/rdkafka/producer.rb

+76-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Rdkafka
22
class Producer
3+
# @private
34
def initialize(native_kafka)
45
@closing = false
56
@native_kafka = native_kafka
@@ -16,13 +17,28 @@ def initialize(native_kafka)
1617
@polling_thread.abort_on_exception = true
1718
end
1819

20+
# Close this producer and wait for the internal poll queue to empty.
1921
def close
2022
# Indicate to polling thread that we're closing
2123
@closing = true
2224
# Wait for the polling thread to finish up
2325
@polling_thread.join
2426
end
2527

28+
# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
29+
#
30+
# When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used.
31+
# When a timestamp is provided this is used instead of the autogenerated timestamp.
32+
#
33+
# @param topic [String] The topic to produce to
34+
# @param payload [String] The message's payload
35+
# @param key [String] The message's key
36+
# @param partition [Integer] Optional partition to produce to
37+
# @param timestamp [Integer] Optional timestamp of this message
38+
#
39+
# @raise [RdkafkaError] When adding the message to rdkafka's queue failed
40+
#
41+
# @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message
2642
def produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil)
2743
# Start by checking and converting the input
2844

@@ -73,49 +89,74 @@ def produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil)
7389

7490
delivery_handle
7591
end
76-
end
77-
78-
class WaitTimeoutError < RuntimeError; end
79-
80-
class DeliveryHandle < ::FFI::Struct
81-
layout :pending, :bool,
82-
:response, :int,
83-
:partition, :int,
84-
:offset, :int64
8592

86-
def pending?
87-
self[:pending]
88-
end
93+
# Error that is raised when waiting for a delivery handle to complete
94+
# takes longer than the specified timeout.
95+
class WaitTimeoutError < RuntimeError; end
96+
97+
# Handle to wait for a delivery report which is returned when
98+
# producing a message.
99+
class DeliveryHandle < ::FFI::Struct
100+
layout :pending, :bool,
101+
:response, :int,
102+
:partition, :int,
103+
:offset, :int64
104+
105+
# Whether the delivery handle is still pending.
106+
#
107+
# @return [Boolean]
108+
def pending?
109+
self[:pending]
110+
end
89111

90-
# Wait for the delivery report
91-
def wait(timeout_in_seconds=60)
92-
timeout = if timeout_in_seconds
93-
Time.now.to_i + timeout_in_seconds
94-
else
95-
nil
96-
end
97-
loop do
98-
if pending?
99-
if timeout && timeout <= Time.now.to_i
100-
raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds")
112+
# Wait for the delivery report or raise an error if this takes longer than the timeout.
113+
# If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message.
114+
# In this case it is possible to call wait again.
115+
#
116+
# @param timeout_in_seconds [Integer] Number of seconds to wait before timing out. If this is nil it does not time out.
117+
#
118+
# @raise [RdkafkaError] When delivering the message failed
119+
# @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
120+
#
121+
# @return [DeliveryReport]
122+
def wait(timeout_in_seconds=60)
123+
timeout = if timeout_in_seconds
124+
Time.now.to_i + timeout_in_seconds
125+
else
126+
nil
127+
end
128+
loop do
129+
if pending?
130+
if timeout && timeout <= Time.now.to_i
131+
raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds")
132+
end
133+
sleep 0.1
134+
next
135+
elsif self[:response] != 0
136+
raise RdkafkaError.new(self[:response])
137+
else
138+
return DeliveryReport.new(self[:partition], self[:offset])
101139
end
102-
sleep 0.1
103-
next
104-
elsif self[:response] != 0
105-
raise RdkafkaError.new(self[:response])
106-
else
107-
return DeliveryReport.new(self[:partition], self[:offset])
108140
end
109141
end
110142
end
111-
end
112143

113-
class DeliveryReport
114-
attr_reader :partition, :offset
144+
# Delivery report for a succesfully produced message.
145+
class DeliveryReport
146+
# The partition this message was produced to.
147+
# @return [Integer]
148+
attr_reader :partition
149+
150+
# The offset of the produced message.
151+
# @return [Integer]
152+
attr_reader :offset
115153

116-
def initialize(partition, offset)
117-
@partition = partition
118-
@offset = offset
154+
private
155+
156+
def initialize(partition, offset)
157+
@partition = partition
158+
@offset = offset
159+
end
119160
end
120161
end
121162
end

spec/rdkafka/producer_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,6 @@
102102
)
103103
expect {
104104
handle.wait(0)
105-
}.to raise_error Rdkafka::WaitTimeoutError
105+
}.to raise_error Rdkafka::Producer::WaitTimeoutError
106106
end
107107
end

0 commit comments

Comments
 (0)