Skip to content

Commit abde37a

Browse files
feat: update gem version requirement of rdkafka (#1442)
* chore: update gem version requirement of rdkafka * fix: consider `each_batch` removed in rdkafka-0.20.0 * chore: delete redundant `::` * fix: delete `each_batch` method from targets of instrumentation * fix: fix version constraint of rdkafka * Revert "fix: delete `each_batch` method from targets of instrumentation" This reverts commit de95123. * ci: update Appraisals of rdkafka to support recent versions * fix: loosen version constraints to support old versions * fix: Make the unmaintained versions of rdkafka-ruby unsupported --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent 07d188e commit abde37a

File tree

4 files changed

+88
-85
lines changed

4 files changed

+88
-85
lines changed

instrumentation/rdkafka/Appraisals

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7-
%w[0.12.0 0.13.0 0.14.0 0.15.0 0.16.0 0.17.0 0.18.0 0.19.0].each do |version|
7+
%w[0.18.0 0.19.0 0.20.0 0.21.0].each do |version|
88
appraise "rdkafka-#{version}" do
99
gem 'rdkafka', "~> #{version}"
1010
end

instrumentation/rdkafka/lib/opentelemetry/instrumentation/rdkafka/instrumentation.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module Rdkafka
1111
class Instrumentation < OpenTelemetry::Instrumentation::Base
1212
compatible do
1313
gem_version = Gem::Version.new(::Rdkafka::VERSION)
14-
Gem::Requirement.new('>= 0.10.0', '< 0.20.0').satisfied_by?(gem_version)
14+
Gem::Requirement.new('>= 0.18.0').satisfied_by?(gem_version)
1515
end
1616

1717
install do |_config|

instrumentation/rdkafka/lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@ module Rdkafka
1010
module Patches
1111
# The Consumer module contains the instrumentation patch for the Consumer class
1212
module Consumer
13-
GETTER = if Gem::Version.new(::Rdkafka::VERSION) >= Gem::Version.new('0.13.0')
14-
Context::Propagation.text_map_getter
15-
else
16-
OpenTelemetry::Common::Propagation.symbol_key_getter
17-
end
13+
GETTER = Context::Propagation.text_map_getter
1814
private_constant :GETTER
1915

2016
def each
@@ -42,26 +38,30 @@ def each
4238
end
4339
end
4440

45-
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
46-
super do |messages, error|
47-
if messages.empty?
48-
yield messages, error
49-
else
50-
attributes = {
51-
'messaging.system' => 'kafka',
52-
'messaging.destination_kind' => 'topic',
53-
'messaging.kafka.message_count' => messages.size
54-
}
41+
# each_batch method is deleted in rdkafka-ruby-0.20.0
42+
# But, rdkafka-ruby-0.19.x and 0.18.x are still maintained
43+
if Gem::Version.new(::Rdkafka::VERSION) < Gem::Version.new('0.20.0')
44+
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
45+
super do |messages, error|
46+
if messages.empty?
47+
yield messages, error
48+
else
49+
attributes = {
50+
'messaging.system' => 'kafka',
51+
'messaging.destination_kind' => 'topic',
52+
'messaging.kafka.message_count' => messages.size
53+
}
5554

56-
links = messages.map do |message|
57-
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
58-
span_context = OpenTelemetry::Trace.current_span(trace_context).context
59-
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
60-
end
61-
links.compact!
55+
links = messages.map do |message|
56+
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
57+
span_context = OpenTelemetry::Trace.current_span(trace_context).context
58+
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
59+
end
60+
links.compact!
6261

63-
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
64-
yield messages, error
62+
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
63+
yield messages, error
64+
end
6565
end
6666
end
6767
end

instrumentation/rdkafka/test/opentelemetry/instrumentation/rdkafka/patches/consumer_test.rb

Lines changed: 63 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -166,68 +166,71 @@
166166
end
167167
end
168168

169-
describe '#each_batch' do
170-
it 'traces each_batch call' do
171-
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
172-
173-
rand_hash = SecureRandom.hex(10)
174-
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
175-
config = { 'bootstrap.servers': "#{host}:#{port}" }
176-
177-
producer = Rdkafka::Config.new(config).producer
178-
delivery_handles = []
179-
180-
delivery_handles << producer.produce(
181-
topic: topic_name,
182-
payload: 'wow',
183-
key: 'Key 1'
184-
)
185-
186-
delivery_handles << producer.produce(
187-
topic: topic_name,
188-
payload: 'super',
189-
key: 'Key 2'
190-
)
191-
192-
delivery_handles.each(&:wait)
193-
194-
consumer_config = config.merge(
195-
'group.id': 'me',
196-
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
197-
)
198-
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
199-
consumer.subscribe(topic_name)
200-
201-
begin
202-
consumer.each_batch(max_items: 2) do |messages|
203-
raise 'oops' unless messages.empty?
169+
# each_batch method is deleted in rdkafka 0.20.0
170+
if Gem::Version.new(Rdkafka::VERSION) < Gem::Version.new('0.20.0')
171+
describe '#each_batch' do
172+
it 'traces each_batch call' do
173+
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
174+
175+
rand_hash = SecureRandom.hex(10)
176+
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
177+
config = { 'bootstrap.servers': "#{host}:#{port}" }
178+
179+
producer = Rdkafka::Config.new(config).producer
180+
delivery_handles = []
181+
182+
delivery_handles << producer.produce(
183+
topic: topic_name,
184+
payload: 'wow',
185+
key: 'Key 1'
186+
)
187+
188+
delivery_handles << producer.produce(
189+
topic: topic_name,
190+
payload: 'super',
191+
key: 'Key 2'
192+
)
193+
194+
delivery_handles.each(&:wait)
195+
196+
consumer_config = config.merge(
197+
'group.id': 'me',
198+
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
199+
)
200+
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
201+
consumer.subscribe(topic_name)
202+
203+
begin
204+
consumer.each_batch(max_items: 2) do |messages|
205+
raise 'oops' unless messages.empty?
206+
end
207+
rescue StandardError
204208
end
205-
rescue StandardError
206-
end
207-
208-
span = spans.find { |s| s.name == 'batch process' }
209-
_(span.kind).must_equal(:consumer)
210-
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
211209

212-
event = span.events.first
213-
_(event.name).must_equal('exception')
214-
_(event.attributes['exception.type']).must_equal('RuntimeError')
215-
_(event.attributes['exception.message']).must_equal('oops')
216-
217-
first_link = span.links[0]
218-
linked_span_context = first_link.span_context
219-
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
220-
_(linked_span_context.span_id).must_equal(spans[0].span_id)
221-
222-
second_link = span.links[1]
223-
linked_span_context = second_link.span_context
224-
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
225-
_(linked_span_context.span_id).must_equal(spans[1].span_id)
226-
227-
_(spans.size).must_equal(3)
228-
ensure
229-
begin; producer&.close; rescue StandardError; end
230-
begin; consumer&.close; rescue StandardError; end
210+
span = spans.find { |s| s.name == 'batch process' }
211+
_(span.kind).must_equal(:consumer)
212+
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
213+
214+
event = span.events.first
215+
_(event.name).must_equal('exception')
216+
_(event.attributes['exception.type']).must_equal('RuntimeError')
217+
_(event.attributes['exception.message']).must_equal('oops')
218+
219+
first_link = span.links[0]
220+
linked_span_context = first_link.span_context
221+
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
222+
_(linked_span_context.span_id).must_equal(spans[0].span_id)
223+
224+
second_link = span.links[1]
225+
linked_span_context = second_link.span_context
226+
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
227+
_(linked_span_context.span_id).must_equal(spans[1].span_id)
228+
229+
_(spans.size).must_equal(3)
230+
ensure
231+
begin; producer&.close; rescue StandardError; end
232+
begin; consumer&.close; rescue StandardError; end
233+
end
231234
end
232235
end
233236
end

0 commit comments

Comments
 (0)