Skip to content

Commit 15b6965

Browse files
committed
[DEX-2307] feat: add instrumentations
1 parent 2076cbd commit 15b6965

File tree

10 files changed

+109
-25
lines changed

10 files changed

+109
-25
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1313

1414
### Fixed
1515

16+
## [2.6.0] - 2024-07-01
17+
18+
### Added
19+
20+
- Added instrumentation for methods `process_message` and `mark_as_consumed!`
21+
22+
### Fixed
23+
24+
- From `do_consume(message)` to `yield`
25+
1626
## [2.5.0] - 2024-06-24
1727

1828
### Added

lib/sbmt/kafka_consumer/base_consumer.rb

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def with_instrumentation(message)
5252
"consumer.consumed_one",
5353
caller: self, message: message, trace_id: trace_id
5454
) do
55-
do_consume(message)
55+
yield
5656
rescue SkipUndeserializableMessage => ex
5757
instrument_error(ex, message)
5858
logger.warn("skipping undeserializable message: #{ex.message}")
@@ -87,16 +87,37 @@ def with_batch_instrumentation(messages)
8787
end
8888
end
8989

90+
def with_common_instrumentation(name, message)
91+
@trace_id = SecureRandom.base58
92+
93+
logger.tagged(
94+
trace_id: trace_id
95+
) do
96+
::Sbmt::KafkaConsumer.monitor.instrument(
97+
"consumer.#{name}",
98+
caller: self,
99+
message: message,
100+
trace_id: trace_id
101+
) do
102+
yield
103+
end
104+
end
105+
end
106+
90107
def do_consume(message)
91108
log_message(message) if log_payload?
92109

93110
# deserialization process is lazy (and cached)
94111
# so we trigger it explicitly to catch undeserializable message early
95112
message.payload
96113

97-
call_middlewares(message, middlewares) { process_message(message) }
114+
with_common_instrumentation("process_message", message) do
115+
call_middlewares(message, middlewares) { process_message(message) }
116+
end
98117

99-
mark_as_consumed!(message)
118+
with_common_instrumentation("mark_as_consumed", message) do
119+
mark_as_consumed!(message)
120+
end
100121
end
101122

102123
def skip_on_error

lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ class BaseMonitor < Karafka::Instrumentation::Monitor
1010
consumer.consumed_one
1111
consumer.inbox.consumed_one
1212
consumer.consumed_batch
13+
consumer.process_message
14+
consumer.mark_as_consumed
1315
].freeze
1416

1517
def initialize

lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ def on_consumer_consumed_one(event)
3232
logger.info("Successfully consumed message in #{event.payload[:time]} ms")
3333
end
3434

35+
def on_consumer_mark_as_consumed(event)
36+
logger.info("Processing message in #{event.payload[:time]} ms")
37+
end
38+
39+
def on_consumer_process_message(event)
40+
logger.info("Commit offset in #{event.payload[:time]} ms")
41+
end
42+
3543
# InboxConsumer events
3644
def on_consumer_inbox_consumed_one(event)
3745
logger.tagged(status: event[:status]) do

lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ module Sbmt
66
module KafkaConsumer
77
module Instrumentation
88
class OpenTelemetryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer
9+
CONSUMED_EVENTS = %w[
10+
consumer.process_message
11+
consumer.mark_as_consumed
12+
].freeze
13+
914
class << self
1015
def enabled?
1116
!!@enabled
@@ -22,6 +27,7 @@ def trace(&block)
2227
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
2328
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
2429
return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one"
30+
return handle_common_event(&block) if CONSUMED_EVENTS.include?(@event_id)
2531
return handle_error(&block) if @event_id == "error.occurred"
2632

2733
yield
@@ -79,6 +85,16 @@ def handle_inbox_consumed_one
7985
end
8086
end
8187

88+
def handle_common_event(&block)
89+
return yield unless enabled?
90+
91+
if @payload[:inbox_name].present?
92+
handle_inbox_consumed_one(&block)
93+
else
94+
handle_consumed_one(&block)
95+
end
96+
end
97+
8298
def handle_error
8399
return yield unless enabled?
84100

lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@ class SentryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer
1313
consumer.inbox.consume_one
1414
].freeze
1515

16+
EVENTS = %w[
17+
consumer.consumed_one
18+
consumer.process_message
19+
consumer.mark_as_consumed
20+
].freeze
21+
1622
def trace(&block)
17-
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
23+
return handle_consumed_one(&block) if EVENTS.include?(@event_id)
1824
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
1925
return handle_error(&block) if @event_id == "error.occurred"
2026

lib/sbmt/kafka_consumer/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
module Sbmt
44
module KafkaConsumer
5-
VERSION = "2.5.0"
5+
VERSION = "2.6.0"
66
end
77
end

spec/sbmt/kafka_consumer/base_consumer_spec.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ def consumed?
7777

7878
it "logs message" do
7979
expect(Rails.logger).to receive(:info).with(/Successfully consumed message/)
80+
expect(Rails.logger).to receive(:info).with(/Processing message/)
81+
expect(Rails.logger).to receive(:info).with(/Commit offset/)
8082
expect(Rails.logger).to receive(:info).with(/#{payload}/)
8183

8284
consume_with_sbmt_karafka

spec/sbmt/kafka_consumer/inbox_consumer_spec.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
it "creates inbox item" do
4444
expect(kafka_client).to receive(:mark_as_consumed!)
4545
expect(Rails.logger).to receive(:info).with(/Successfully consumed/).twice
46+
expect(Rails.logger).to receive(:info).with(/Processing message/)
47+
expect(Rails.logger).to receive(:info).with(/Commit offset/)
4648
expect { consume_with_sbmt_karafka }
4749
.to change(TestInboxItem, :count).by(1)
4850
.and increment_yabeda_counter(Yabeda.kafka_consumer.inbox_consumes)
@@ -160,6 +162,8 @@
160162
context "with poisoned message" do
161163
before do
162164
allow(Rails.logger).to receive(:info).with(/Successfully consumed/)
165+
allow(Rails.logger).to receive(:info).with(/Processing message/)
166+
allow(Rails.logger).to receive(:info).with(/Commit offset/)
163167
allow(Rails.logger).to receive(:error)
164168
end
165169

@@ -249,6 +253,8 @@ def extra_message_attrs(_message)
249253
it "merges with default inbox-item attributes" do
250254
expect(kafka_client).to receive(:mark_as_consumed!)
251255
expect(Rails.logger).to receive(:info).with(/Successfully consumed/).twice
256+
expect(Rails.logger).to receive(:info).with(/Processing message/)
257+
expect(Rails.logger).to receive(:info).with(/Commit offset/)
252258
expect { consume_with_sbmt_karafka }.to change(TestInboxItem, :count).by(1)
253259
expect(TestInboxItem.last.event_name).to eq("custom-value")
254260
end

spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,34 @@
1515
let(:consumer_group) { OpenStruct.new(id: consumer_group_name) }
1616
let(:consumer_topic) { OpenStruct.new(consumer_group: consumer_group) }
1717
let(:consumer) { OpenStruct.new(topic: consumer_topic, inbox_name: "inbox/name", event_name: nil) }
18-
let(:event_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") }
18+
let(:event_payload) { OpenStruct.new(caller: consumer, message: message, event_name: nil, status: "failure") }
19+
let(:event_inbox_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") }
1920
let(:event_payload_with_batch) { OpenStruct.new(caller: consumer, messages: batch_messages, inbox_name: "inbox/name", event_name: nil, status: "failure") }
2021

22+
shared_examples "traces message" do |event_name, span_name|
23+
it "traces #{event_name} message" do
24+
expect(tracer).to receive(:in_span).with(span_name, links: nil, kind: :consumer, attributes: {
25+
"messaging.destination" => topic_name,
26+
"messaging.destination_kind" => "topic",
27+
"messaging.kafka.consumer_group" => consumer_group_name,
28+
"messaging.kafka.offset" => 0,
29+
"messaging.kafka.partition" => 1,
30+
"messaging.system" => "kafka"
31+
})
32+
described_class.new(event_name, event_payload).trace {}
33+
end
34+
end
35+
36+
shared_examples "traces message with inbox" do |event_name, span_name|
37+
it "traces #{event_name} message" do
38+
expect(tracer).to receive(:in_span).with(span_name, kind: :consumer, attributes: {
39+
"inbox.inbox_name" => "inbox/name",
40+
"inbox.status" => "failure"
41+
})
42+
described_class.new(event_name, event_inbox_payload).trace {}
43+
end
44+
end
45+
2146
describe "when disabled" do
2247
before { described_class.enabled = false }
2348

@@ -39,17 +64,13 @@
3964
allow(instrumentation_instance).to receive(:tracer).and_return(tracer)
4065
end
4166

42-
it "traces message" do
43-
expect(tracer).to receive(:in_span).with("consume topic", links: nil, kind: :consumer, attributes: {
44-
"messaging.destination" => topic_name,
45-
"messaging.destination_kind" => "topic",
46-
"messaging.kafka.consumer_group" => consumer_group_name,
47-
"messaging.kafka.offset" => 0,
48-
"messaging.kafka.partition" => 1,
49-
"messaging.system" => "kafka"
50-
})
51-
described_class.new("consumer.consumed_one", event_payload).trace {}
52-
end
67+
it_behaves_like "traces message", "consumer.consumed_one", "consume topic"
68+
it_behaves_like "traces message", "consumer.process_message", "consume topic"
69+
it_behaves_like "traces message", "consumer.mark_as_consumed", "consume topic"
70+
71+
it_behaves_like "traces message with inbox", "consumer.inbox.consumed_one", "inbox inbox/name process"
72+
it_behaves_like "traces message with inbox", "consumer.process_message", "inbox inbox/name process"
73+
it_behaves_like "traces message with inbox", "consumer.mark_as_consumed", "inbox inbox/name process"
5374

5475
it "traces messages" do
5576
expect(tracer).to receive(:in_span).with("consume batch", links: [], kind: :consumer, attributes: {
@@ -63,13 +84,5 @@
6384
})
6485
described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {}
6586
end
66-
67-
it "traces inbox message" do
68-
expect(tracer).to receive(:in_span).with("inbox inbox/name process", kind: :consumer, attributes: {
69-
"inbox.inbox_name" => "inbox/name",
70-
"inbox.status" => "failure"
71-
})
72-
described_class.new("consumer.inbox.consumed_one", event_payload).trace {}
73-
end
7487
end
7588
end

0 commit comments

Comments
 (0)