Skip to content

Commit bdb9fe1

Browse files
authored
Merge pull request karafka#196 from gvisokinskas/feature/handle_in_callback
Pass the delivery handle to the callback
2 parents 8f4e595 + 811d3b6 commit bdb9fe1

File tree

4 files changed

+73
-6
lines changed

4 files changed

+73
-6
lines changed

Diff for: lib/rdkafka/callbacks.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def self.call(_, message_ptr, opaque_ptr)
9797
delivery_handle[:pending] = false
9898
# Call delivery callback on opaque
9999
if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i]
100-
opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], message[:err]))
100+
opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], message[:err]), delivery_handle)
101101
end
102102
end
103103
end

Diff for: lib/rdkafka/config.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ class Opaque
278278
attr_accessor :producer
279279
attr_accessor :consumer_rebalance_listener
280280

281-
def call_delivery_callback(delivery_handle)
282-
producer.call_delivery_callback(delivery_handle) if producer
281+
def call_delivery_callback(delivery_report, delivery_handle)
282+
producer.call_delivery_callback(delivery_report, delivery_handle) if producer
283283
end
284284

285285
def call_on_partitions_assigned(consumer, list)

Diff for: lib/rdkafka/producer.rb

+19-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ class Producer
99
# @return [Proc, nil]
1010
attr_reader :delivery_callback
1111

12+
# @private
13+
# Returns the number of arguments accepted by the callback, by default this is nil.
14+
#
15+
# @return [Integer, nil]
16+
attr_reader :delivery_callback_arity
17+
1218
# @private
1319
def initialize(client, partitioner_name)
1420
@client = client
@@ -19,14 +25,15 @@ def initialize(client, partitioner_name)
1925
end
2026

2127
# Set a callback that will be called every time a message is successfully produced.
22-
# The callback is called with a {DeliveryReport}
28+
# The callback is called with a {DeliveryReport} and {DeliveryHandle}
2329
#
2430
# @param callback [Proc, #call] The callback
2531
#
2632
# @return [nil]
2733
def delivery_callback=(callback)
2834
raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
2935
@delivery_callback = callback
36+
@delivery_callback_arity = arity(callback)
3037
end
3138

3239
# Close this producer and wait for the internal poll queue to empty.
@@ -151,8 +158,17 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
151158
end
152159

153160
# @private
154-
def call_delivery_callback(delivery_handle)
155-
@delivery_callback.call(delivery_handle) if @delivery_callback
161+
def call_delivery_callback(delivery_report, delivery_handle)
162+
return unless @delivery_callback
163+
164+
args = [delivery_report, delivery_handle].take(@delivery_callback_arity)
165+
@delivery_callback.call(*args)
166+
end
167+
168+
def arity(callback)
169+
return callback.arity if callback.respond_to?(:arity)
170+
171+
callback.method(:call).arity
156172
end
157173

158174
def closed_producer_check(method)

Diff for: spec/rdkafka/producer_spec.rb

+51
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,27 @@
4949
# Callback should have been called
5050
expect(@callback_called).to be true
5151
end
52+
53+
it "should provide handle" do
54+
@callback_handle = nil
55+
56+
producer.delivery_callback = lambda { |_, handle| @callback_handle = handle }
57+
58+
# Produce a message
59+
handle = producer.produce(
60+
topic: "produce_test_topic",
61+
payload: "payload",
62+
key: "key"
63+
)
64+
65+
# Wait for it to be delivered
66+
handle.wait(max_wait_timeout: 15)
67+
68+
# Join the producer thread.
69+
producer.close
70+
71+
expect(handle).to be @callback_handle
72+
end
5273
end
5374

5475
context "with a callable object" do
@@ -93,6 +114,36 @@ def call(report)
93114
expect(called_report.first.partition).to eq 1
94115
expect(called_report.first.offset).to be >= 0
95116
end
117+
118+
it "should provide handle" do
119+
callback_handles = []
120+
callback = Class.new do
121+
def initialize(callback_handles)
122+
@callback_handles = callback_handles
123+
end
124+
125+
def call(_, handle)
126+
@callback_handles << handle
127+
end
128+
end
129+
producer.delivery_callback = callback.new(callback_handles)
130+
131+
# Produce a message
132+
handle = producer.produce(
133+
topic: "produce_test_topic",
134+
payload: "payload",
135+
key: "key"
136+
)
137+
138+
# Wait for it to be delivered
139+
handle.wait(max_wait_timeout: 15)
140+
141+
# Join the producer thread.
142+
producer.close
143+
144+
# Callback should have been called
145+
expect(handle).to be callback_handles.first
146+
end
96147
end
97148

98149
it "should not accept a callback that's not callable" do

0 commit comments

Comments
 (0)