Skip to content

Commit 5b8322f

Browse files
authored
Backport purge from karafka-rdkafka (karafka#335)
* backport purge from karafka-rdkafka * remove not needed changes * remove extra spec
1 parent 9709183 commit 5b8322f

File tree

6 files changed

+97
-0
lines changed

6 files changed

+97
-0
lines changed

.ruby-gemset

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
rdkafka-ruby

.ruby-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.2.2

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Rdkafka Changelog
22

33
## 0.14.0 (Unreleased)
4+
* [Enhancement] Provide `#purge` to remove any outstanding requests from the producer (mensfeld)
45
* [Enhancement] Update `librdkafka` to `2.2.0` (mensfeld)
56
* [Enhancement] Introduce producer partitions count metadata cache (mensfeld)
67
* [Enhancement] Increase metadata timeout request from `250 ms` to `2000 ms` default to allow for remote cluster operations via `rdkafka-ruby` (mensfeld)

lib/rdkafka/bindings.rb

+3
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,13 @@ class TopicPartitionList < FFI::Struct
249249
RD_KAFKA_VTYPE_TIMESTAMP = 8
250250
RD_KAFKA_VTYPE_HEADER = 9
251251
RD_KAFKA_VTYPE_HEADERS = 10
252+
RD_KAFKA_PURGE_F_QUEUE = 1
253+
RD_KAFKA_PURGE_F_INFLIGHT = 2
252254

253255
RD_KAFKA_MSG_F_COPY = 0x2
254256

255257
attach_function :rd_kafka_producev, [:pointer, :varargs], :int, blocking: true
258+
attach_function :rd_kafka_purge, [:pointer, :int], :int, blocking: true
256259
callback :delivery_cb, [:pointer, :pointer, :pointer], :void
257260
attach_function :rd_kafka_conf_set_dr_msg_cb, [:pointer, :delivery_cb], :void
258261

lib/rdkafka/producer.rb

+26
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,32 @@ def flush(timeout_ms=5_000)
105105
raise(error)
106106
end
107107

108+
# Purges the outgoing queue and releases all resources.
109+
#
110+
# Useful when closing the producer with outgoing messages to unstable clusters or when for
111+
# any other reasons waiting cannot go on anymore. This purges both the queue and all the
112+
# inflight requests + updates the delivery handles statuses so they can be materialized into
113+
# `purge_queue` errors.
114+
def purge
115+
closed_producer_check(__method__)
116+
117+
code = nil
118+
119+
@native_kafka.with_inner do |inner|
120+
code = Bindings.rd_kafka_purge(
121+
inner,
122+
Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT
123+
)
124+
end
125+
126+
code.zero? || raise(Rdkafka::RdkafkaError.new(code))
127+
128+
# Wait for the purge to affect everything
129+
sleep(0.001) until flush(100)
130+
131+
true
132+
end
133+
108134
# Partition count for a given topic.
109135
#
110136
# @param topic [String] The topic name.

spec/rdkafka/producer_spec.rb

+65
Original file line numberDiff line numberDiff line change
@@ -645,4 +645,69 @@ def call(_, handle)
645645
end
646646
end
647647
end
648+
649+
describe '#purge' do
650+
context 'when no outgoing messages' do
651+
it { expect(producer.purge).to eq(true) }
652+
end
653+
654+
context 'when librdkafka purge returns an error' do
655+
before { expect(Rdkafka::Bindings).to receive(:rd_kafka_purge).and_return(-153) }
656+
657+
it 'expect to raise an error' do
658+
expect { producer.purge }.to raise_error(Rdkafka::RdkafkaError, /retry/)
659+
end
660+
end
661+
662+
context 'when there are outgoing things in the queue' do
663+
let(:producer) do
664+
rdkafka_producer_config(
665+
"bootstrap.servers": "localhost:9093",
666+
"message.timeout.ms": 2_000
667+
).producer
668+
end
669+
670+
it "should should purge and move forward" do
671+
producer.produce(
672+
topic: "produce_test_topic",
673+
payload: "payload headers"
674+
)
675+
676+
expect(producer.purge).to eq(true)
677+
expect(producer.flush(1_000)).to eq(true)
678+
end
679+
680+
it "should materialize the delivery handles" do
681+
handle = producer.produce(
682+
topic: "produce_test_topic",
683+
payload: "payload headers"
684+
)
685+
686+
expect(producer.purge).to eq(true)
687+
688+
expect { handle.wait }.to raise_error(Rdkafka::RdkafkaError, /purge_queue/)
689+
end
690+
691+
context "when using delivery_callback" do
692+
let(:delivery_reports) { [] }
693+
694+
let(:delivery_callback) do
695+
->(delivery_report) { delivery_reports << delivery_report }
696+
end
697+
698+
before { producer.delivery_callback = delivery_callback }
699+
700+
it "should run the callback" do
701+
handle = producer.produce(
702+
topic: "produce_test_topic",
703+
payload: "payload headers"
704+
)
705+
706+
expect(producer.purge).to eq(true)
707+
# queue purge
708+
expect(delivery_reports[0].error).to eq(-152)
709+
end
710+
end
711+
end
712+
end
648713
end

0 commit comments

Comments
 (0)