Skip to content

Commit 5202af1

Browse files
authored
Introduce partition count cache key for partition_key usage (karafka#309)
1 parent 5ea1244 commit 5202af1

File tree

4 files changed

+103
-4
lines changed

4 files changed

+103
-4
lines changed

CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# 0.14.0 (Unreleased)
2-
* [Change] Remove support for Ruby 2.6 due to it being EOL and WeakMap incompatibilities.
2+
* [Enhancement] Introduce producer partitions count metadata cache (mensfeld)
3+
* [Enhancement] Increase metadata timeout request from `250 ms` to `2000 ms` default to allow for remote cluster operations via `rdkafka-ruby` (mensfeld)
4+
* [Change] Remove support for Ruby 2.6 due to it being EOL and WeakMap incompatibilities (mensfeld)
35

46
# 0.13.0
57
* Support cooperative sticky partition assignment in the rebalance callback (methodmissing)

lib/rdkafka/metadata.rb

+22-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,18 @@ module Rdkafka
44
class Metadata
55
attr_reader :brokers, :topics
66

7-
def initialize(native_client, topic_name = nil, timeout_ms = 250)
7+
# Errors upon which we retry the metadata fetch
8+
RETRIED_ERRORS = %i[
9+
timed_out
10+
leader_not_available
11+
].freeze
12+
13+
private_constant :RETRIED_ERRORS
14+
15+
def initialize(native_client, topic_name = nil, timeout_ms = 2_000)
16+
attempt ||= 0
17+
attempt += 1
18+
819
native_topic = if topic_name
920
Rdkafka::Bindings.rd_kafka_topic_new(native_client, topic_name, nil)
1021
end
@@ -22,6 +33,16 @@ def initialize(native_client, topic_name = nil, timeout_ms = 250)
2233
raise Rdkafka::RdkafkaError.new(result) unless result.zero?
2334

2435
metadata_from_native(ptr.read_pointer)
36+
rescue ::Rdkafka::RdkafkaError => e
37+
raise unless RETRIED_ERRORS.include?(e.code)
38+
raise if attempt > 10
39+
40+
backoff_factor = 2**attempt
41+
timeout = backoff_factor * 0.1
42+
43+
sleep(timeout)
44+
45+
retry
2546
ensure
2647
Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic) if topic_name
2748
Rdkafka::Bindings.rd_kafka_metadata_destroy(ptr.read_pointer)

lib/rdkafka/producer.rb

+36-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
module Rdkafka
66
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
77
class Producer
8+
# Cache partitions count for 30 seconds
9+
PARTITIONS_COUNT_TTL = 30
10+
11+
private_constant :PARTITIONS_COUNT_TTL
12+
813
# @private
914
# Returns the current delivery callback, by default this is nil.
1015
#
@@ -24,6 +29,19 @@ def initialize(native_kafka, partitioner_name)
2429

2530
# Makes sure, that native kafka gets closed before it gets GCed by Ruby
2631
ObjectSpace.define_finalizer(self, native_kafka.finalizer)
32+
33+
@_partitions_count_cache = Hash.new do |cache, topic|
34+
topic_metadata = nil
35+
36+
@native_kafka.with_inner do |inner|
37+
topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first
38+
end
39+
40+
cache[topic] = [
41+
monotonic_now,
42+
topic_metadata ? topic_metadata[:partition_count] : nil
43+
]
44+
end
2745
end
2846

2947
# Set a callback that will be called every time a message is successfully produced.
@@ -68,11 +86,21 @@ def flush(timeout_ms=5_000)
6886
# @param topic [String] The topic name.
6987
#
7088
# @return partition count [Integer,nil]
89+
#
90+
# We cache the partition count for a given topic for given time
91+
# This prevents us in case someone uses `partition_key` from querying for the count with
92+
# each message. Instead we query once every 30 seconds at most
93+
#
94+
# @param topic [String] topic name
95+
# @return [Integer] partition count for a given topic
7196
def partition_count(topic)
7297
closed_producer_check(__method__)
73-
@native_kafka.with_inner do |inner|
74-
Rdkafka::Metadata.new(inner, topic).topics&.first[:partition_count]
98+
99+
@_partitions_count_cache.delete_if do |_, cached|
100+
monotonic_now - cached.first > PARTITIONS_COUNT_TTL
75101
end
102+
103+
@_partitions_count_cache[topic].last
76104
end
77105

78106
# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.
@@ -193,6 +221,12 @@ def arity(callback)
193221
end
194222

195223
private
224+
225+
def monotonic_now
226+
# needed because Time.now can go backwards
227+
Process.clock_gettime(Process::CLOCK_MONOTONIC)
228+
end
229+
196230
def closed_producer_check(method)
197231
raise Rdkafka::ClosedProducerError.new(method) if closed?
198232
end

spec/rdkafka/producer_spec.rb

+42
Original file line numberDiff line numberDiff line change
@@ -554,4 +554,46 @@ def call(_, handle)
554554
end
555555
end
556556
end
557+
558+
describe '#partition_count' do
559+
it { expect(producer.partition_count('consume_test_topic')).to eq(3) }
560+
561+
context 'when the partition count value is already cached' do
562+
before do
563+
producer.partition_count('consume_test_topic')
564+
allow(::Rdkafka::Metadata).to receive(:new).and_call_original
565+
end
566+
567+
it 'expect not to query it again' do
568+
producer.partition_count('consume_test_topic')
569+
expect(::Rdkafka::Metadata).not_to have_received(:new)
570+
end
571+
end
572+
573+
context 'when the partition count value was cached but time expired' do
574+
before do
575+
allow(::Process).to receive(:clock_gettime).and_return(0, 30.02)
576+
producer.partition_count('consume_test_topic')
577+
allow(::Rdkafka::Metadata).to receive(:new).and_call_original
578+
end
579+
580+
it 'expect not to query it again' do
581+
producer.partition_count('consume_test_topic')
582+
expect(::Rdkafka::Metadata).to have_received(:new)
583+
end
584+
end
585+
586+
context 'when the partition count value was cached and time did not expire' do
587+
before do
588+
allow(::Process).to receive(:clock_gettime).and_return(0, 29.001)
589+
producer.partition_count('consume_test_topic')
590+
allow(::Rdkafka::Metadata).to receive(:new).and_call_original
591+
end
592+
593+
it 'expect not to query it again' do
594+
producer.partition_count('consume_test_topic')
595+
expect(::Rdkafka::Metadata).not_to have_received(:new)
596+
end
597+
end
598+
end
557599
end

0 commit comments

Comments
 (0)